diff --git a/crates/vector_store/src/vector_store.rs b/crates/vector_store/src/vector_store.rs index 1bdc0127b7..5189993eee 100644 --- a/crates/vector_store/src/vector_store.rs +++ b/crates/vector_store/src/vector_store.rs @@ -33,6 +33,8 @@ use util::{ }; use workspace::{Workspace, WorkspaceCreated}; +const REINDEXING_DELAY: u64 = 30; + #[derive(Debug)] pub struct Document { pub offset: usize, @@ -58,10 +60,10 @@ pub fn init( let vector_store = VectorStore::new( fs, db_file_path, - Arc::new(embedding::DummyEmbeddings {}), - // Arc::new(OpenAIEmbeddings { - // client: http_client, - // }), + // Arc::new(embedding::DummyEmbeddings {}), + Arc::new(OpenAIEmbeddings { + client: http_client, + }), language_registry, cx.clone(), ) @@ -121,7 +123,9 @@ pub struct VectorStore { embedding_provider: Arc, language_registry: Arc, db_update_tx: channel::Sender, + paths_tx: channel::Sender<(i64, PathBuf, Arc, SystemTime)>, _db_update_task: Task<()>, + _paths_update_task: Task<()>, projects: HashMap, ProjectState>, } @@ -203,14 +207,50 @@ impl VectorStore { } }); + let (paths_tx, paths_rx) = + channel::unbounded::<(i64, PathBuf, Arc, SystemTime)>(); + + let fs_clone = fs.clone(); + let db_update_tx_clone = db_update_tx.clone(); + let embedding_provider_clone = embedding_provider.clone(); + + let _paths_update_task = cx.background().spawn(async move { + let mut parser = Parser::new(); + let mut cursor = QueryCursor::new(); + while let Ok((worktree_id, file_path, language, mtime)) = paths_rx.recv().await { + log::info!("Parsing File: {:?}", &file_path); + if let Some(indexed_file) = Self::index_file( + &mut cursor, + &mut parser, + embedding_provider_clone.as_ref(), + &fs_clone, + language, + file_path, + mtime, + ) + .await + .log_err() + { + db_update_tx_clone + .try_send(DbWrite::InsertFile { + worktree_id, + indexed_file, + }) + .unwrap(); + } + } + }); + Self { fs, database_url, db_update_tx, + paths_tx, embedding_provider, language_registry, projects: HashMap::new(), _db_update_task, + _paths_update_task, } })) } @@ -315,9 +355,9 @@ impl VectorStore { let fs = self.fs.clone(); let language_registry = self.language_registry.clone(); - let embedding_provider = self.embedding_provider.clone(); let database_url = self.database_url.clone(); let db_update_tx = self.db_update_tx.clone(); + let paths_tx = self.paths_tx.clone(); cx.spawn(|this, mut cx| async move { futures::future::join_all(worktree_scans_complete).await; @@ -356,8 +396,6 @@ impl VectorStore { }) .await?; - let (paths_tx, paths_rx) = - channel::unbounded::<(i64, PathBuf, Arc, SystemTime)>(); cx.background() .spawn({ let db_ids_by_worktree_id = db_ids_by_worktree_id.clone(); @@ -415,42 +453,8 @@ impl VectorStore { }) .detach(); - cx.background() - .scoped(|scope| { - for _ in 0..cx.background().num_cpus() { - scope.spawn(async { - let mut parser = Parser::new(); - let mut cursor = QueryCursor::new(); - while let Ok((worktree_id, file_path, language, mtime)) = - paths_rx.recv().await - { - if let Some(indexed_file) = Self::index_file( - &mut cursor, - &mut parser, - embedding_provider.as_ref(), - &fs, - language, - file_path, - mtime, - ) - .await - .log_err() - { - db_update_tx - .try_send(DbWrite::InsertFile { - worktree_id, - indexed_file, - }) - .unwrap(); - } - } - }); - } - }) - .await; - this.update(&mut cx, |this, cx| { - let _subscription = cx.subscribe(&project, |this, project, event, cx| { + let _subscription = cx.subscribe(&project, |this, project, event, _cx| { if let Some(project_state) = this.projects.get(&project.downgrade()) { let worktree_db_ids = project_state.worktree_db_ids.clone(); @@ -488,6 +492,7 @@ impl VectorStore { } let file_mtimes = file_mtimes.unwrap(); + let paths_tx = this.paths_tx.clone(); smol::block_on(async move { for change in changes.into_iter() { @@ -504,7 +509,6 @@ impl VectorStore { { continue; } - log::info!("Language found: {:?}: ", language.name()); // TODO: Make this a bit more defensive let modified_time = @@ -515,7 +519,7 @@ impl VectorStore { existing_time.map_or(false, |existing_time| { if &modified_time != existing_time && existing_time.elapsed().unwrap().as_secs() - > 30 + > REINDEXING_DELAY { false } else { @@ -525,14 +529,14 @@ impl VectorStore { if !already_stored { log::info!("Need to reindex: {:?}", &change_path); - // paths_tx - // .try_send(( - // worktree_db_id, - // change_path.to_path_buf(), - // language, - // modified_time, - // )) - // .unwrap(); + paths_tx + .try_send(( + worktree_db_id, + change_path.to_path_buf(), + language, + modified_time, + )) + .unwrap(); } } } diff --git a/crates/vector_store/src/vector_store_tests.rs b/crates/vector_store/src/vector_store_tests.rs index 51065c0ee4..e25b737b06 100644 --- a/crates/vector_store/src/vector_store_tests.rs +++ b/crates/vector_store/src/vector_store_tests.rs @@ -5,7 +5,7 @@ use anyhow::Result; use async_trait::async_trait; use gpui::{Task, TestAppContext}; use language::{Language, LanguageConfig, LanguageRegistry}; -use project::{FakeFs, Fs, Project}; +use project::{FakeFs, Project}; use rand::Rng; use serde_json::json; use unindent::Unindent;