fix async calls on project updated entries to ensure that all files are updating appropriately

This commit is contained in:
KCaverly 2023-08-23 22:28:30 +02:00
parent 09fd99b1e3
commit e42b9e910e
2 changed files with 108 additions and 54 deletions

View file

@ -156,7 +156,7 @@ impl ProjectState {
fn update_queue(queue: &mut Vec<IndexOperation>, operation: IndexOperation) {
match operation {
IndexOperation::FlushQueue => {
for op in queue.pop() {
while let Some(op) = queue.pop() {
match op {
IndexOperation::IndexFile { payload, tx } => {
tx.try_send(payload);
@ -652,51 +652,103 @@ impl SemanticIndex {
})
}
// pub fn project_entries_changed(
// &self,
// project: ModelHandle<Project>,
// changes: &Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
// cx: &ModelContext<SemanticIndex>,
// worktree_id: &WorktreeId,
// ) -> Result<()> {
// let parsing_files_tx = self.parsing_files_tx.clone();
// let db_update_tx = self.db_update_tx.clone();
// let (job_queue_tx, outstanding_job_tx, worktree_db_id) = {
// let state = self.projects.get(&project.downgrade());
// if state.is_none() {
// return anyhow::Error(anyhow!("Project not yet initialized"));
// }
// let state = state.unwrap();
// (
// state.job_queue_tx.clone(),
// state._outstanding_job_count_tx,
// state.db_id_for_worktree_id(worktree_id),
// )
// };
fn project_entries_changed(
&self,
project: ModelHandle<Project>,
changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
cx: &mut ModelContext<'_, SemanticIndex>,
worktree_id: &WorktreeId,
) -> Result<()> {
let parsing_files_tx = self.parsing_files_tx.clone();
let db_update_tx = self.db_update_tx.clone();
let (job_queue_tx, outstanding_job_tx, worktree_db_id) = {
let state = self
.projects
.get(&project.downgrade())
.ok_or(anyhow!("Project not yet initialized"))?;
let worktree_db_id = state
.db_id_for_worktree_id(*worktree_id)
.ok_or(anyhow!("Worktree ID in Database Not Available"))?;
(
state.job_queue_tx.clone(),
state._outstanding_job_count_tx.clone(),
worktree_db_id,
)
};
// for (path, entry_id, path_change) in changes.iter() {
// match path_change {
// PathChange::AddedOrUpdated => {
// let job_handle = JobHandle::new(&outstanding_job_tx);
// job_queue_tx.try_send(IndexOperation::IndexFile {
// payload: PendingFile {
// worktree_db_id,
// relative_path: path,
// absolute_path,
// language,
// modified_time,
// job_handle,
// },
// tx: parsing_files_tx,
// })
// }
// PathChange::Removed => {}
// _ => {}
// }
// }
let language_registry = self.language_registry.clone();
let parsing_files_tx = parsing_files_tx.clone();
let db_update_tx = db_update_tx.clone();
// Ok(())
// }
let worktree = project
.read(cx)
.worktree_for_id(worktree_id.clone(), cx)
.ok_or(anyhow!("Worktree not available"))?
.read(cx)
.snapshot();
cx.spawn(|this, mut cx| async move {
let worktree = worktree.clone();
for (path, entry_id, path_change) in changes.iter() {
let relative_path = path.to_path_buf();
let absolute_path = worktree.absolutize(path);
let Some(entry) = worktree.entry_for_id(*entry_id) else {
continue;
};
if entry.is_ignored || entry.is_symlink || entry.is_external {
continue;
}
match path_change {
PathChange::AddedOrUpdated | PathChange::Updated => {
log::trace!("File Updated: {:?}", path);
if let Ok(language) = language_registry
.language_for_file(&relative_path, None)
.await
{
if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
&& &language.name().as_ref() != &"Markdown"
&& language
.grammar()
.and_then(|grammar| grammar.embedding_config.as_ref())
.is_none()
{
continue;
}
let job_handle = JobHandle::new(&outstanding_job_tx);
let new_operation = IndexOperation::IndexFile {
payload: PendingFile {
worktree_db_id,
relative_path,
absolute_path,
language,
modified_time: entry.mtime,
job_handle,
},
tx: parsing_files_tx.clone(),
};
job_queue_tx.try_send(new_operation);
}
}
PathChange::Removed => {
let new_operation = IndexOperation::DeleteFile {
payload: DbOperation::Delete {
worktree_id: worktree_db_id,
path: relative_path,
},
tx: db_update_tx.clone(),
};
job_queue_tx.try_send(new_operation);
}
_ => {}
}
}
})
.detach();
Ok(())
}
pub fn initialize_project(
&mut self,
@ -724,9 +776,8 @@ impl SemanticIndex {
let _subscription = cx.subscribe(&project, |this, project, event, cx| {
if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
todo!();
// this.project_entries_changed(project, changes, cx, worktree_id);
}
this.project_entries_changed(project, changes.clone(), cx, worktree_id);
};
});
let language_registry = self.language_registry.clone();
@ -775,6 +826,10 @@ impl SemanticIndex {
for file in worktree.files(false, 0) {
let absolute_path = worktree.absolutize(&file.path);
if file.is_external || file.is_ignored || file.is_symlink {
continue;
}
if let Ok(language) = language_registry
.language_for_file(&absolute_path, None)
.await
@ -827,10 +882,8 @@ impl SemanticIndex {
job_count_tx_longlived,
);
if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
for op in worktree_files {
project_state.job_queue_tx.try_send(op);
}
for op in worktree_files {
project_state.job_queue_tx.try_send(op);
}
this.projects.insert(project.downgrade(), project_state);
@ -864,10 +917,10 @@ impl SemanticIndex {
return;
};
state.job_queue_tx.try_send(IndexOperation::FlushQueue);
})
});
});
Task::Ready(Some(Ok((count, job_count_rx))))
Ok((count, job_count_rx))
})
}
pub fn outstanding_job_count_rx(