diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 7e3585656a..ca317c0ded 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -849,7 +849,7 @@ impl ProjectSearchView { let model = model.read(cx); project = model.project.clone(); SemanticIndex::global(cx).map(|semantic| { - semantic.update(cx, |this, cx| this.initialize_project(project, cx)) + semantic.update(cx, |this, cx| this.initialize_project(project.clone(), cx)); }); } diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 2b803e36ac..8849b643c5 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -96,6 +96,7 @@ struct ProjectState { subscription: gpui::Subscription, outstanding_job_count_rx: watch::Receiver, _outstanding_job_count_tx: Arc>>, + queue: HashMap>, } #[derive(Clone)] @@ -130,9 +131,25 @@ impl ProjectState { outstanding_job_count_rx, _outstanding_job_count_tx, subscription, + queue: HashMap::new(), } } + fn add_to_queue(&mut self, worktree_id: WorktreeId, operation: IndexOperation) { + if let Some(worktree_queue) = self.queue.get_mut(&worktree_id) { + worktree_queue.push(operation); + } else { + self.queue.insert(worktree_id, vec![operation]); + } + } + + fn pop(&mut self) -> Option { + self.queue + .iter_mut() + .next() + .and_then(|(_, mut entry)| entry.pop()) + } + fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option { self.worktree_db_ids .iter() @@ -158,6 +175,7 @@ impl ProjectState { } } +#[derive(Clone)] pub struct PendingFile { worktree_db_id: i64, relative_path: PathBuf, @@ -167,6 +185,12 @@ pub struct PendingFile { job_handle: JobHandle, } +#[derive(Clone)] +enum IndexOperation { + IndexFile { file: PendingFile }, + DeleteFile { file: PendingFile }, +} + pub struct SearchResult { pub buffer: ModelHandle, pub range: Range, @@ -628,102 +652,12 @@ impl SemanticIndex { } }); - cx.spawn(|this, mut cx| async move { - futures::future::join_all(worktree_scans_complete).await; - - let worktree_db_ids = futures::future::join_all(worktree_db_ids).await; - let worktrees = project.read_with(&cx, |project, cx| { - project - .worktrees(cx) - .map(|worktree| worktree.read(cx).snapshot()) - .collect::>() - }); - - let mut worktree_file_mtimes = HashMap::new(); - let mut db_ids_by_worktree_id = HashMap::new(); - - for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) { - let db_id = db_id?; - db_ids_by_worktree_id.insert(worktree.id(), db_id); - worktree_file_mtimes.insert( - worktree.id(), - this.read_with(&cx, |this, _| this.get_file_mtimes(db_id)) - .await?, - ); - } - - let worktree_db_ids = db_ids_by_worktree_id - .iter() - .map(|(a, b)| (*a, *b)) - .collect(); - - let (job_count_tx, job_count_rx) = watch::channel_with(0); - let job_count_tx = Arc::new(Mutex::new(job_count_tx)); - this.update(&mut cx, |this, _| { - let project_state = ProjectState::new( - _subscription, - worktree_db_ids, - worktree_file_mtimes.clone(), - job_count_rx, - job_count_tx, - ); - this.projects.insert(project.downgrade(), project_state); - }); - - anyhow::Ok(()) - }) - .detach_and_log_err(cx) - } - - pub fn index_project( - &mut self, - project: ModelHandle, - cx: &mut ModelContext, - ) -> Task)>> { - let t0 = Instant::now(); - let worktree_scans_complete = project - .read(cx) - .worktrees(cx) - .map(|worktree| { - let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete(); - async move { - scan_complete.await; - } - }) - .collect::>(); - let worktree_db_ids = project - .read(cx) - .worktrees(cx) - .map(|worktree| { - self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf()) - }) - .collect::>(); - let language_registry = self.language_registry.clone(); - let db_update_tx = self.db_update_tx.clone(); - let parsing_files_tx = self.parsing_files_tx.clone(); - - let state = self.projects.get(&project.downgrade()); - let state = if state.is_none() { - return Task::Ready(Some(Err(anyhow!("Project not yet initialized")))); - } else { - state.unwrap() - }; - - let state = state.clone().to_owned(); - - let _subscription = cx.subscribe(&project, |this, project, event, _cx| { - if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event { - todo!(); - // this.project_entries_changed(project, changes, cx, worktree_id); - } - }); cx.spawn(|this, mut cx| async move { futures::future::join_all(worktree_scans_complete).await; let worktree_db_ids = futures::future::join_all(worktree_db_ids).await; - let worktrees = project.read_with(&cx, |project, cx| { project .worktrees(cx) @@ -733,6 +667,7 @@ impl SemanticIndex { let mut worktree_file_mtimes = HashMap::new(); let mut db_ids_by_worktree_id = HashMap::new(); + for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) { let db_id = db_id?; db_ids_by_worktree_id.insert(worktree.id(), db_id); @@ -761,10 +696,12 @@ impl SemanticIndex { this.projects.insert(project.downgrade(), project_state); }); - cx.background() + let worktree_files = cx + .background() .spawn(async move { - let mut count = 0; + let mut worktree_files = HashMap::new(); for worktree in worktrees.into_iter() { + let mut candidate_files = Vec::new(); let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap(); for file in worktree.files(false, 0) { let absolute_path = worktree.absolutize(&file.path); @@ -773,6 +710,7 @@ impl SemanticIndex { .language_for_file(&absolute_path, None) .await { + // Test if file is valid parseable file if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) && &language.name().as_ref() != &"Markdown" && language @@ -789,40 +727,186 @@ impl SemanticIndex { .map_or(false, |existing_mtime| existing_mtime == file.mtime); if !already_stored { - count += 1; - let job_handle = JobHandle::new(&job_count_tx); - parsing_files_tx - .try_send(PendingFile { + candidate_files.push(IndexOperation::IndexFile { + file: PendingFile { worktree_db_id: db_ids_by_worktree_id[&worktree.id()], relative_path: path_buf, absolute_path, language, job_handle, modified_time: file.mtime, - }) - .unwrap(); + }, + }); } } } - for file in file_mtimes.keys() { - db_update_tx - .try_send(DbOperation::Delete { - worktree_id: db_ids_by_worktree_id[&worktree.id()], - path: file.to_owned(), - }) - .unwrap(); - } + + worktree_files.insert(worktree.id(), candidate_files); } - log::trace!( - "walking worktree took {:?} milliseconds", - t0.elapsed().as_millis() - ); - anyhow::Ok((count, job_count_rx)) + anyhow::Ok(worktree_files) }) - .await + .await?; + + this.update(&mut cx, |this, cx| { + if let Some(project_state) = this.projects.get_mut(&project.downgrade()) { + for (worktree_id, index_operations) in &worktree_files { + for op in index_operations { + project_state.add_to_queue(*worktree_id, op.clone()); + } + } + } + }); + + cx.background().spawn(async move { anyhow::Ok(()) }).await }) + .detach_and_log_err(cx) + } + + pub fn index_project( + &mut self, + project: ModelHandle, + cx: &mut ModelContext, + ) -> Task)>> { + let state = self.projects.get_mut(&project.downgrade()); + let state = if state.is_none() { + return Task::Ready(Some(Err(anyhow!("Project not yet initialized")))); + } else { + state.unwrap() + }; + + let parsing_files_tx = self.parsing_files_tx.clone(); + let db_update_tx = self.db_update_tx.clone(); + let job_count_rx = state.outstanding_job_count_rx.clone(); + let count = state.queue.values().map(Vec::len).sum(); + cx.spawn(|this, mut cx| async move { + this.update(&mut cx, |this, cx| { + let Some(mut state) = this.projects.get_mut(&project.downgrade()) else { + return; + }; + let Some(mut index_operation) = state.pop() else { return;}; + let _ = match index_operation { + IndexOperation::IndexFile { file } => { + parsing_files_tx.try_send(file); + } + IndexOperation::DeleteFile { file } => { + db_update_tx.try_send(DbOperation::Delete { + worktree_id: file.worktree_db_id, + path: file.relative_path, + }); + } + }; + }); + }) + .detach(); + + Task::Ready(Some(Ok((count, job_count_rx)))) + + // cx.spawn(|this, mut cx| async move { + // futures::future::join_all(worktree_scans_complete).await; + + // let worktree_db_ids = futures::future::join_all(worktree_db_ids).await; + + // let worktrees = project.read_with(&cx, |project, cx| { + // project + // .worktrees(cx) + // .map(|worktree| worktree.read(cx).snapshot()) + // .collect::>() + // }); + + // let mut worktree_file_mtimes = HashMap::new(); + // let mut db_ids_by_worktree_id = HashMap::new(); + // for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) { + // let db_id = db_id?; + // db_ids_by_worktree_id.insert(worktree.id(), db_id); + // worktree_file_mtimes.insert( + // worktree.id(), + // this.read_with(&cx, |this, _| this.get_file_mtimes(db_id)) + // .await?, + // ); + // } + + // let worktree_db_ids = db_ids_by_worktree_id + // .iter() + // .map(|(a, b)| (*a, *b)) + // .collect(); + + // let (job_count_tx, job_count_rx) = watch::channel_with(0); + // let job_count_tx = Arc::new(Mutex::new(job_count_tx)); + // this.update(&mut cx, |this, _| { + // let project_state = ProjectState::new( + // _subscription, + // worktree_db_ids, + // worktree_file_mtimes.clone(), + // job_count_rx.clone(), + // job_count_tx.clone(), + // ); + // this.projects.insert(project.downgrade(), project_state); + // }); + + // cx.background() + // .spawn(async move { + // let mut count = 0; + // for worktree in worktrees.into_iter() { + // let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap(); + // for file in worktree.files(false, 0) { + // let absolute_path = worktree.absolutize(&file.path); + + // if let Ok(language) = language_registry + // .language_for_file(&absolute_path, None) + // .await + // { + // if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) + // && &language.name().as_ref() != &"Markdown" + // && language + // .grammar() + // .and_then(|grammar| grammar.embedding_config.as_ref()) + // .is_none() + // { + // continue; + // } + + // let path_buf = file.path.to_path_buf(); + // let stored_mtime = file_mtimes.remove(&file.path.to_path_buf()); + // let already_stored = stored_mtime + // .map_or(false, |existing_mtime| existing_mtime == file.mtime); + + // if !already_stored { + // count += 1; + + // let job_handle = JobHandle::new(&job_count_tx); + // parsing_files_tx + // .try_send(PendingFile { + // worktree_db_id: db_ids_by_worktree_id[&worktree.id()], + // relative_path: path_buf, + // absolute_path, + // language, + // job_handle, + // modified_time: file.mtime, + // }) + // .unwrap(); + // } + // } + // } + // for file in file_mtimes.keys() { + // db_update_tx + // .try_send(DbOperation::Delete { + // worktree_id: db_ids_by_worktree_id[&worktree.id()], + // path: file.to_owned(), + // }) + // .unwrap(); + // } + // } + + // log::trace!( + // "walking worktree took {:?} milliseconds", + // t0.elapsed().as_millis() + // ); + // anyhow::Ok((count, job_count_rx)) + // }) + // .await + // }) } pub fn outstanding_job_count_rx(