diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 736f2c98a8..2da0d84baf 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -16,16 +16,18 @@ use language::{Anchor, Buffer, Language, LanguageRegistry}; use parking_lot::Mutex; use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES}; use postage::watch; -use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, WorktreeId}; +use project::{ + search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, ProjectPath, Worktree, WorktreeId, +}; use smol::channel; use std::{ cmp::Ordering, - collections::HashMap, + collections::{BTreeMap, HashMap}, mem, ops::Range, path::{Path, PathBuf}, sync::{Arc, Weak}, - time::{Instant, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use util::{ channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME}, @@ -37,6 +39,7 @@ use workspace::WorkspaceCreated; const SEMANTIC_INDEX_VERSION: usize = 7; const EMBEDDINGS_BATCH_SIZE: usize = 80; +const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(600); pub fn init( fs: Arc, @@ -77,6 +80,7 @@ pub fn init( let semantic_index = SemanticIndex::new( fs, db_file_path, + // Arc::new(embedding::DummyEmbeddings {}), Arc::new(OpenAIEmbeddings { client: http_client, executor: cx.background(), @@ -113,9 +117,14 @@ struct ProjectState { worktree_db_ids: Vec<(WorktreeId, i64)>, _subscription: gpui::Subscription, outstanding_job_count_rx: watch::Receiver, - _outstanding_job_count_tx: Arc>>, - job_queue_tx: channel::Sender, - _queue_update_task: Task<()>, + outstanding_job_count_tx: Arc>>, + changed_paths: BTreeMap, +} + +struct ChangedPathInfo { + changed_at: Instant, + mtime: SystemTime, + is_deleted: bool, } #[derive(Clone)] @@ -133,31 +142,21 @@ impl JobHandle { } } } + impl ProjectState { fn new( - cx: &mut AppContext, subscription: gpui::Subscription, worktree_db_ids: Vec<(WorktreeId, i64)>, - outstanding_job_count_rx: watch::Receiver, - _outstanding_job_count_tx: Arc>>, + changed_paths: BTreeMap, ) -> Self { - let (job_queue_tx, job_queue_rx) = channel::unbounded(); - let _queue_update_task = cx.background().spawn({ - let mut worktree_queue = HashMap::new(); - async move { - while let Ok(operation) = job_queue_rx.recv().await { - Self::update_queue(&mut worktree_queue, operation); - } - } - }); - + let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0); + let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx)); Self { worktree_db_ids, outstanding_job_count_rx, - _outstanding_job_count_tx, + outstanding_job_count_tx, + changed_paths, _subscription: subscription, - _queue_update_task, - job_queue_tx, } } @@ -165,41 +164,6 @@ impl ProjectState { self.outstanding_job_count_rx.borrow().clone() } - fn update_queue(queue: &mut HashMap, operation: IndexOperation) { - match operation { - IndexOperation::FlushQueue => { - let queue = std::mem::take(queue); - for (_, op) in queue { - match op { - IndexOperation::IndexFile { - absolute_path: _, - payload, - tx, - } => { - let _ = tx.try_send(payload); - } - IndexOperation::DeleteFile { - absolute_path: _, - payload, - tx, - } => { - let _ = tx.try_send(payload); - } - _ => {} - } - } - } - IndexOperation::IndexFile { - ref absolute_path, .. - } - | IndexOperation::DeleteFile { - ref absolute_path, .. - } => { - queue.insert(absolute_path.clone(), operation); - } - } - } - fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option { self.worktree_db_ids .iter() @@ -230,23 +194,10 @@ pub struct PendingFile { worktree_db_id: i64, relative_path: PathBuf, absolute_path: PathBuf, - language: Arc, + language: Option>, modified_time: SystemTime, job_handle: JobHandle, } -enum IndexOperation { - IndexFile { - absolute_path: PathBuf, - payload: PendingFile, - tx: channel::Sender, - }, - DeleteFile { - absolute_path: PathBuf, - payload: DbOperation, - tx: channel::Sender, - }, - FlushQueue, -} pub struct SearchResult { pub buffer: ModelHandle, @@ -582,13 +533,13 @@ impl SemanticIndex { parsing_files_rx: &channel::Receiver, db_update_tx: &channel::Sender, ) { + let Some(language) = pending_file.language else { + return; + }; + if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() { if let Some(documents) = retriever - .parse_file_with_template( - &pending_file.relative_path, - &content, - pending_file.language, - ) + .parse_file_with_template(&pending_file.relative_path, &content, language) .log_err() { log::trace!( @@ -679,103 +630,50 @@ impl SemanticIndex { } fn project_entries_changed( - &self, + &mut self, project: ModelHandle, changes: Arc<[(Arc, ProjectEntryId, PathChange)]>, cx: &mut ModelContext<'_, SemanticIndex>, worktree_id: &WorktreeId, - ) -> Result<()> { - let parsing_files_tx = self.parsing_files_tx.clone(); - let db_update_tx = self.db_update_tx.clone(); - let (job_queue_tx, outstanding_job_tx, worktree_db_id) = { - let state = self - .projects - .get(&project.downgrade()) - .ok_or(anyhow!("Project not yet initialized"))?; - let worktree_db_id = state - .db_id_for_worktree_id(*worktree_id) - .ok_or(anyhow!("Worktree ID in Database Not Available"))?; - ( - state.job_queue_tx.clone(), - state._outstanding_job_count_tx.clone(), - worktree_db_id, - ) + ) { + let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else { + return; + }; + let project = project.downgrade(); + let Some(project_state) = self.projects.get_mut(&project) else { + return; }; - let language_registry = self.language_registry.clone(); - let parsing_files_tx = parsing_files_tx.clone(); - let db_update_tx = db_update_tx.clone(); + let worktree = worktree.read(cx); + let change_time = Instant::now(); + for (path, entry_id, change) in changes.iter() { + let Some(entry) = worktree.entry_for_id(*entry_id) else { + continue; + }; + if entry.is_ignored || entry.is_symlink || entry.is_external { + continue; + } + let project_path = ProjectPath { + worktree_id: *worktree_id, + path: path.clone(), + }; + project_state.changed_paths.insert( + project_path, + ChangedPathInfo { + changed_at: change_time, + mtime: entry.mtime, + is_deleted: *change == PathChange::Removed, + }, + ); + } - let worktree = project - .read(cx) - .worktree_for_id(worktree_id.clone(), cx) - .ok_or(anyhow!("Worktree not available"))? - .read(cx) - .snapshot(); - cx.spawn(|_, _| async move { - let worktree = worktree.clone(); - for (path, entry_id, path_change) in changes.iter() { - let relative_path = path.to_path_buf(); - let absolute_path = worktree.absolutize(path); - - let Some(entry) = worktree.entry_for_id(*entry_id) else { - continue; - }; - if entry.is_ignored || entry.is_symlink || entry.is_external { - continue; - } - - log::trace!("File Event: {:?}, Path: {:?}", &path_change, &path); - match path_change { - PathChange::AddedOrUpdated | PathChange::Updated | PathChange::Added => { - if let Ok(language) = language_registry - .language_for_file(&relative_path, None) - .await - { - if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) - && &language.name().as_ref() != &"Markdown" - && language - .grammar() - .and_then(|grammar| grammar.embedding_config.as_ref()) - .is_none() - { - continue; - } - - let job_handle = JobHandle::new(&outstanding_job_tx); - let new_operation = IndexOperation::IndexFile { - absolute_path: absolute_path.clone(), - payload: PendingFile { - worktree_db_id, - relative_path, - absolute_path, - language, - modified_time: entry.mtime, - job_handle, - }, - tx: parsing_files_tx.clone(), - }; - let _ = job_queue_tx.try_send(new_operation); - } - } - PathChange::Removed => { - let new_operation = IndexOperation::DeleteFile { - absolute_path, - payload: DbOperation::Delete { - worktree_id: worktree_db_id, - path: relative_path, - }, - tx: db_update_tx.clone(), - }; - let _ = job_queue_tx.try_send(new_operation); - } - _ => {} - } + cx.spawn_weak(|this, mut cx| async move { + cx.background().timer(BACKGROUND_INDEXING_DELAY).await; + if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) { + Self::reindex_changed_paths(this, project, Some(change_time), &mut cx).await; } }) .detach(); - - Ok(()) } pub fn initialize_project( @@ -805,14 +703,11 @@ impl SemanticIndex { let _subscription = cx.subscribe(&project, |this, project, event, cx| { if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event { - let _ = - this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id); + this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id); }; }); let language_registry = self.language_registry.clone(); - let parsing_files_tx = self.parsing_files_tx.clone(); - let db_update_tx = self.db_update_tx.clone(); cx.spawn(|this, mut cx| async move { futures::future::join_all(worktree_scans_complete).await; @@ -843,17 +738,13 @@ impl SemanticIndex { .map(|(a, b)| (*a, *b)) .collect(); - let (job_count_tx, job_count_rx) = watch::channel_with(0); - let job_count_tx = Arc::new(Mutex::new(job_count_tx)); - let job_count_tx_longlived = job_count_tx.clone(); - - let worktree_files = cx + let changed_paths = cx .background() .spawn(async move { - let mut worktree_files = Vec::new(); + let mut changed_paths = BTreeMap::new(); + let now = Instant::now(); for worktree in worktrees.into_iter() { let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap(); - let worktree_db_id = db_ids_by_worktree_id[&worktree.id()]; for file in worktree.files(false, 0) { let absolute_path = worktree.absolutize(&file.path); @@ -876,59 +767,51 @@ impl SemanticIndex { continue; } - let path_buf = file.path.to_path_buf(); let stored_mtime = file_mtimes.remove(&file.path.to_path_buf()); let already_stored = stored_mtime .map_or(false, |existing_mtime| existing_mtime == file.mtime); if !already_stored { - let job_handle = JobHandle::new(&job_count_tx); - worktree_files.push(IndexOperation::IndexFile { - absolute_path: absolute_path.clone(), - payload: PendingFile { - worktree_db_id, - relative_path: path_buf, - absolute_path, - language, - job_handle, - modified_time: file.mtime, + changed_paths.insert( + ProjectPath { + worktree_id: worktree.id(), + path: file.path.clone(), }, - tx: parsing_files_tx.clone(), - }); + ChangedPathInfo { + changed_at: now, + mtime: file.mtime, + is_deleted: false, + }, + ); } } } + // Clean up entries from database that are no longer in the worktree. - for (path, _) in file_mtimes { - worktree_files.push(IndexOperation::DeleteFile { - absolute_path: worktree.absolutize(path.as_path()), - payload: DbOperation::Delete { - worktree_id: worktree_db_id, - path, + for (path, mtime) in file_mtimes { + changed_paths.insert( + ProjectPath { + worktree_id: worktree.id(), + path: path.into(), }, - tx: db_update_tx.clone(), - }); + ChangedPathInfo { + changed_at: now, + mtime, + is_deleted: true, + }, + ); } } - anyhow::Ok(worktree_files) + anyhow::Ok(changed_paths) }) .await?; - this.update(&mut cx, |this, cx| { - let project_state = ProjectState::new( - cx, - _subscription, - worktree_db_ids, - job_count_rx, - job_count_tx_longlived, + this.update(&mut cx, |this, _| { + this.projects.insert( + project.downgrade(), + ProjectState::new(_subscription, worktree_db_ids, changed_paths), ); - - for op in worktree_files { - let _ = project_state.job_queue_tx.try_send(op); - } - - this.projects.insert(project.downgrade(), project_state); }); Result::<(), _>::Ok(()) }) @@ -939,27 +822,17 @@ impl SemanticIndex { project: ModelHandle, cx: &mut ModelContext, ) -> Task)>> { - let state = self.projects.get_mut(&project.downgrade()); - let state = if state.is_none() { - return Task::Ready(Some(Err(anyhow!("Project not yet initialized")))); - } else { - state.unwrap() - }; - - // let parsing_files_tx = self.parsing_files_tx.clone(); - // let db_update_tx = self.db_update_tx.clone(); - let job_count_rx = state.outstanding_job_count_rx.clone(); - let count = state.get_outstanding_count(); - cx.spawn(|this, mut cx| async move { - this.update(&mut cx, |this, _| { - let Some(state) = this.projects.get_mut(&project.downgrade()) else { - return; - }; - let _ = state.job_queue_tx.try_send(IndexOperation::FlushQueue); - }); + Self::reindex_changed_paths(this.clone(), project.clone(), None, &mut cx).await; - Ok((count, job_count_rx)) + this.update(&mut cx, |this, _cx| { + let Some(state) = this.projects.get(&project.downgrade()) else { + return Err(anyhow!("Project not yet initialized")); + }; + let job_count_rx = state.outstanding_job_count_rx.clone(); + let count = state.get_outstanding_count(); + Ok((count, job_count_rx)) + }) }) } @@ -1110,6 +983,93 @@ impl SemanticIndex { .collect::>()) }) } + + async fn reindex_changed_paths( + this: ModelHandle, + project: ModelHandle, + last_changed_before: Option, + cx: &mut AsyncAppContext, + ) { + let mut pending_files = Vec::new(); + let (language_registry, parsing_files_tx) = this.update(cx, |this, cx| { + if let Some(project_state) = this.projects.get_mut(&project.downgrade()) { + let outstanding_job_count_tx = &project_state.outstanding_job_count_tx; + let db_ids = &project_state.worktree_db_ids; + let mut worktree: Option> = None; + + project_state.changed_paths.retain(|path, info| { + if let Some(last_changed_before) = last_changed_before { + if info.changed_at > last_changed_before { + return true; + } + } + + if worktree + .as_ref() + .map_or(true, |tree| tree.read(cx).id() != path.worktree_id) + { + worktree = project.read(cx).worktree_for_id(path.worktree_id, cx); + } + let Some(worktree) = &worktree else { + return false; + }; + + let Some(worktree_db_id) = db_ids + .iter() + .find_map(|entry| (entry.0 == path.worktree_id).then_some(entry.1)) + else { + return false; + }; + + if info.is_deleted { + this.db_update_tx + .try_send(DbOperation::Delete { + worktree_id: worktree_db_id, + path: path.path.to_path_buf(), + }) + .ok(); + } else { + let absolute_path = worktree.read(cx).absolutize(&path.path); + let job_handle = JobHandle::new(&outstanding_job_count_tx); + pending_files.push(PendingFile { + absolute_path, + relative_path: path.path.to_path_buf(), + language: None, + job_handle, + modified_time: info.mtime, + worktree_db_id, + }); + } + + false + }); + } + + ( + this.language_registry.clone(), + this.parsing_files_tx.clone(), + ) + }); + + for mut pending_file in pending_files { + if let Ok(language) = language_registry + .language_for_file(&pending_file.relative_path, None) + .await + { + if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) + && &language.name().as_ref() != &"Markdown" + && language + .grammar() + .and_then(|grammar| grammar.embedding_config.as_ref()) + .is_none() + { + continue; + } + pending_file.language = Some(language); + } + parsing_files_tx.try_send(pending_file).ok(); + } + } } impl Entity for SemanticIndex {