Eager Semantic Indexing Queue (#2886)
Optimization to the Semantic Indexing Engine. We've transitioned from a framework in which the entire project tree is walked at each index command, to an eager queuing method, in which an initial queue of outstanding indexing work is initialized upon workspace creation, and then subscriptions are leveraged for file change events to continually keep an updated view on outstanding work. This optimization contributes towards quicker user feedback, when initializing or using Semantic Search functionality. It also opens the doors towards better transparency across the system on outstanding indexing work. Release Notes: - Refactored index operation queue to an eager queuing framework. - Moved semantic search initialization to workspace creation. - Adjusted rate limiting strategy on api delays to reduce time spent waiting for rate limits.
This commit is contained in:
commit
bc7e9088fe
8 changed files with 588 additions and 284 deletions
481
Cargo.lock
generated
481
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -640,6 +640,7 @@ impl ProjectSearchView {
|
|||
self.search_options = SearchOptions::none();
|
||||
|
||||
let project = self.model.read(cx).project.clone();
|
||||
|
||||
let index_task = semantic_index.update(cx, |semantic_index, cx| {
|
||||
semantic_index.index_project(project, cx)
|
||||
});
|
||||
|
@ -1635,6 +1636,12 @@ impl ToolbarItemView for ProjectSearchBar {
|
|||
self.subscription = None;
|
||||
self.active_project_search = None;
|
||||
if let Some(search) = active_pane_item.and_then(|i| i.downcast::<ProjectSearchView>()) {
|
||||
search.update(cx, |search, cx| {
|
||||
if search.current_mode == SearchMode::Semantic {
|
||||
search.index_project(cx);
|
||||
}
|
||||
});
|
||||
|
||||
self.subscription = Some(cx.observe(&search, |_, _, cx| cx.notify()));
|
||||
self.active_project_search = Some(search);
|
||||
ToolbarItemLocation::PrimaryLeft {
|
||||
|
|
|
@ -38,6 +38,7 @@ parking_lot.workspace = true
|
|||
rand.workspace = true
|
||||
schemars.workspace = true
|
||||
globset.workspace = true
|
||||
sha1 = "0.10.5"
|
||||
|
||||
[dev-dependencies]
|
||||
gpui = { path = "../gpui", features = ["test-support"] }
|
||||
|
|
|
@ -26,6 +26,9 @@ pub struct FileRecord {
|
|||
#[derive(Debug)]
|
||||
struct Embedding(pub Vec<f32>);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Sha1(pub Vec<u8>);
|
||||
|
||||
impl FromSql for Embedding {
|
||||
fn column_result(value: ValueRef) -> FromSqlResult<Self> {
|
||||
let bytes = value.as_blob()?;
|
||||
|
@ -37,6 +40,17 @@ impl FromSql for Embedding {
|
|||
}
|
||||
}
|
||||
|
||||
impl FromSql for Sha1 {
|
||||
fn column_result(value: ValueRef) -> FromSqlResult<Self> {
|
||||
let bytes = value.as_blob()?;
|
||||
let sha1: Result<Vec<u8>, Box<bincode::ErrorKind>> = bincode::deserialize(bytes);
|
||||
if sha1.is_err() {
|
||||
return Err(rusqlite::types::FromSqlError::Other(sha1.unwrap_err()));
|
||||
}
|
||||
return Ok(Sha1(sha1.unwrap()));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VectorDatabase {
|
||||
db: rusqlite::Connection,
|
||||
}
|
||||
|
@ -132,6 +146,7 @@ impl VectorDatabase {
|
|||
end_byte INTEGER NOT NULL,
|
||||
name VARCHAR NOT NULL,
|
||||
embedding BLOB NOT NULL,
|
||||
sha1 BLOB NOT NULL,
|
||||
FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
|
||||
)",
|
||||
[],
|
||||
|
@ -182,15 +197,17 @@ impl VectorDatabase {
|
|||
// I imagine we can speed this up with a bulk insert of some kind.
|
||||
for document in documents {
|
||||
let embedding_blob = bincode::serialize(&document.embedding)?;
|
||||
let sha_blob = bincode::serialize(&document.sha1)?;
|
||||
|
||||
self.db.execute(
|
||||
"INSERT INTO documents (file_id, start_byte, end_byte, name, embedding) VALUES (?1, ?2, ?3, ?4, $5)",
|
||||
"INSERT INTO documents (file_id, start_byte, end_byte, name, embedding, sha1) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
|
||||
params![
|
||||
file_id,
|
||||
document.range.start.to_string(),
|
||||
document.range.end.to_string(),
|
||||
document.name,
|
||||
embedding_blob
|
||||
embedding_blob,
|
||||
sha_blob
|
||||
],
|
||||
)?;
|
||||
}
|
||||
|
|
|
@ -106,8 +106,8 @@ impl OpenAIEmbeddings {
|
|||
#[async_trait]
|
||||
impl EmbeddingProvider for OpenAIEmbeddings {
|
||||
async fn embed_batch(&self, spans: Vec<&str>) -> Result<Vec<Vec<f32>>> {
|
||||
const BACKOFF_SECONDS: [usize; 3] = [45, 75, 125];
|
||||
const MAX_RETRIES: usize = 3;
|
||||
const BACKOFF_SECONDS: [usize; 4] = [3, 5, 15, 45];
|
||||
const MAX_RETRIES: usize = 4;
|
||||
|
||||
let api_key = OPENAI_API_KEY
|
||||
.as_ref()
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use anyhow::{anyhow, Ok, Result};
|
||||
use language::{Grammar, Language};
|
||||
use sha1::{Digest, Sha1};
|
||||
use std::{
|
||||
cmp::{self, Reverse},
|
||||
collections::HashSet,
|
||||
|
@ -15,6 +16,7 @@ pub struct Document {
|
|||
pub range: Range<usize>,
|
||||
pub content: String,
|
||||
pub embedding: Vec<f32>,
|
||||
pub sha1: [u8; 20],
|
||||
}
|
||||
|
||||
const CODE_CONTEXT_TEMPLATE: &str =
|
||||
|
@ -63,11 +65,15 @@ impl CodeContextRetriever {
|
|||
.replace("<language>", language_name.as_ref())
|
||||
.replace("<item>", &content);
|
||||
|
||||
let mut sha1 = Sha1::new();
|
||||
sha1.update(&document_span);
|
||||
|
||||
Ok(vec![Document {
|
||||
range: 0..content.len(),
|
||||
content: document_span,
|
||||
embedding: Vec::new(),
|
||||
name: language_name.to_string(),
|
||||
sha1: sha1.finalize().into(),
|
||||
}])
|
||||
}
|
||||
|
||||
|
@ -76,11 +82,15 @@ impl CodeContextRetriever {
|
|||
.replace("<path>", relative_path.to_string_lossy().as_ref())
|
||||
.replace("<item>", &content);
|
||||
|
||||
let mut sha1 = Sha1::new();
|
||||
sha1.update(&document_span);
|
||||
|
||||
Ok(vec![Document {
|
||||
range: 0..content.len(),
|
||||
content: document_span,
|
||||
embedding: Vec::new(),
|
||||
name: "Markdown".to_string(),
|
||||
sha1: sha1.finalize().into(),
|
||||
}])
|
||||
}
|
||||
|
||||
|
@ -253,11 +263,15 @@ impl CodeContextRetriever {
|
|||
);
|
||||
}
|
||||
|
||||
let mut sha1 = Sha1::new();
|
||||
sha1.update(&document_content);
|
||||
|
||||
documents.push(Document {
|
||||
name,
|
||||
content: document_content,
|
||||
range: item_range.clone(),
|
||||
embedding: vec![],
|
||||
sha1: sha1.finalize().into(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ use language::{Anchor, Buffer, Language, LanguageRegistry};
|
|||
use parking_lot::Mutex;
|
||||
use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
|
||||
use postage::watch;
|
||||
use project::{search::PathMatcher, Fs, Project, WorktreeId};
|
||||
use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, WorktreeId};
|
||||
use smol::channel;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
|
@ -33,8 +33,9 @@ use util::{
|
|||
paths::EMBEDDINGS_DIR,
|
||||
ResultExt,
|
||||
};
|
||||
use workspace::WorkspaceCreated;
|
||||
|
||||
const SEMANTIC_INDEX_VERSION: usize = 6;
|
||||
const SEMANTIC_INDEX_VERSION: usize = 7;
|
||||
const EMBEDDINGS_BATCH_SIZE: usize = 80;
|
||||
|
||||
pub fn init(
|
||||
|
@ -54,6 +55,22 @@ pub fn init(
|
|||
return;
|
||||
}
|
||||
|
||||
cx.subscribe_global::<WorkspaceCreated, _>({
|
||||
move |event, cx| {
|
||||
let Some(semantic_index) = SemanticIndex::global(cx) else { return; };
|
||||
let workspace = &event.0;
|
||||
if let Some(workspace) = workspace.upgrade(cx) {
|
||||
let project = workspace.read(cx).project().clone();
|
||||
if project.read(cx).is_local() {
|
||||
semantic_index.update(cx, |index, cx| {
|
||||
index.initialize_project(project, cx).detach_and_log_err(cx)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
cx.spawn(move |mut cx| async move {
|
||||
let semantic_index = SemanticIndex::new(
|
||||
fs,
|
||||
|
@ -92,8 +109,11 @@ pub struct SemanticIndex {
|
|||
|
||||
struct ProjectState {
|
||||
worktree_db_ids: Vec<(WorktreeId, i64)>,
|
||||
_subscription: gpui::Subscription,
|
||||
outstanding_job_count_rx: watch::Receiver<usize>,
|
||||
_outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
|
||||
job_queue_tx: channel::Sender<IndexOperation>,
|
||||
_queue_update_task: Task<()>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -112,6 +132,72 @@ impl JobHandle {
|
|||
}
|
||||
}
|
||||
impl ProjectState {
|
||||
fn new(
|
||||
cx: &mut AppContext,
|
||||
subscription: gpui::Subscription,
|
||||
worktree_db_ids: Vec<(WorktreeId, i64)>,
|
||||
outstanding_job_count_rx: watch::Receiver<usize>,
|
||||
_outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
|
||||
) -> Self {
|
||||
let (job_queue_tx, job_queue_rx) = channel::unbounded();
|
||||
let _queue_update_task = cx.background().spawn({
|
||||
let mut worktree_queue = HashMap::new();
|
||||
async move {
|
||||
while let Ok(operation) = job_queue_rx.recv().await {
|
||||
Self::update_queue(&mut worktree_queue, operation);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
worktree_db_ids,
|
||||
outstanding_job_count_rx,
|
||||
_outstanding_job_count_tx,
|
||||
_subscription: subscription,
|
||||
_queue_update_task,
|
||||
job_queue_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_outstanding_count(&self) -> usize {
|
||||
self.outstanding_job_count_rx.borrow().clone()
|
||||
}
|
||||
|
||||
fn update_queue(queue: &mut HashMap<PathBuf, IndexOperation>, operation: IndexOperation) {
|
||||
match operation {
|
||||
IndexOperation::FlushQueue => {
|
||||
let queue = std::mem::take(queue);
|
||||
for (_, op) in queue {
|
||||
match op {
|
||||
IndexOperation::IndexFile {
|
||||
absolute_path: _,
|
||||
payload,
|
||||
tx,
|
||||
} => {
|
||||
let _ = tx.try_send(payload);
|
||||
}
|
||||
IndexOperation::DeleteFile {
|
||||
absolute_path: _,
|
||||
payload,
|
||||
tx,
|
||||
} => {
|
||||
let _ = tx.try_send(payload);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
IndexOperation::IndexFile {
|
||||
ref absolute_path, ..
|
||||
}
|
||||
| IndexOperation::DeleteFile {
|
||||
ref absolute_path, ..
|
||||
} => {
|
||||
queue.insert(absolute_path.clone(), operation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
|
||||
self.worktree_db_ids
|
||||
.iter()
|
||||
|
@ -137,6 +223,7 @@ impl ProjectState {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PendingFile {
|
||||
worktree_db_id: i64,
|
||||
relative_path: PathBuf,
|
||||
|
@ -145,6 +232,19 @@ pub struct PendingFile {
|
|||
modified_time: SystemTime,
|
||||
job_handle: JobHandle,
|
||||
}
|
||||
enum IndexOperation {
|
||||
IndexFile {
|
||||
absolute_path: PathBuf,
|
||||
payload: PendingFile,
|
||||
tx: channel::Sender<PendingFile>,
|
||||
},
|
||||
DeleteFile {
|
||||
absolute_path: PathBuf,
|
||||
payload: DbOperation,
|
||||
tx: channel::Sender<DbOperation>,
|
||||
},
|
||||
FlushQueue,
|
||||
}
|
||||
|
||||
pub struct SearchResult {
|
||||
pub buffer: ModelHandle<Buffer>,
|
||||
|
@ -576,12 +676,112 @@ impl SemanticIndex {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn index_project(
|
||||
fn project_entries_changed(
|
||||
&self,
|
||||
project: ModelHandle<Project>,
|
||||
changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
|
||||
cx: &mut ModelContext<'_, SemanticIndex>,
|
||||
worktree_id: &WorktreeId,
|
||||
) -> Result<()> {
|
||||
let parsing_files_tx = self.parsing_files_tx.clone();
|
||||
let db_update_tx = self.db_update_tx.clone();
|
||||
let (job_queue_tx, outstanding_job_tx, worktree_db_id) = {
|
||||
let state = self
|
||||
.projects
|
||||
.get(&project.downgrade())
|
||||
.ok_or(anyhow!("Project not yet initialized"))?;
|
||||
let worktree_db_id = state
|
||||
.db_id_for_worktree_id(*worktree_id)
|
||||
.ok_or(anyhow!("Worktree ID in Database Not Available"))?;
|
||||
(
|
||||
state.job_queue_tx.clone(),
|
||||
state._outstanding_job_count_tx.clone(),
|
||||
worktree_db_id,
|
||||
)
|
||||
};
|
||||
|
||||
let language_registry = self.language_registry.clone();
|
||||
let parsing_files_tx = parsing_files_tx.clone();
|
||||
let db_update_tx = db_update_tx.clone();
|
||||
|
||||
let worktree = project
|
||||
.read(cx)
|
||||
.worktree_for_id(worktree_id.clone(), cx)
|
||||
.ok_or(anyhow!("Worktree not available"))?
|
||||
.read(cx)
|
||||
.snapshot();
|
||||
cx.spawn(|_, _| async move {
|
||||
let worktree = worktree.clone();
|
||||
for (path, entry_id, path_change) in changes.iter() {
|
||||
let relative_path = path.to_path_buf();
|
||||
let absolute_path = worktree.absolutize(path);
|
||||
|
||||
let Some(entry) = worktree.entry_for_id(*entry_id) else {
|
||||
continue;
|
||||
};
|
||||
if entry.is_ignored || entry.is_symlink || entry.is_external {
|
||||
continue;
|
||||
}
|
||||
|
||||
log::trace!("File Event: {:?}, Path: {:?}", &path_change, &path);
|
||||
match path_change {
|
||||
PathChange::AddedOrUpdated | PathChange::Updated | PathChange::Added => {
|
||||
if let Ok(language) = language_registry
|
||||
.language_for_file(&relative_path, None)
|
||||
.await
|
||||
{
|
||||
if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
|
||||
&& &language.name().as_ref() != &"Markdown"
|
||||
&& language
|
||||
.grammar()
|
||||
.and_then(|grammar| grammar.embedding_config.as_ref())
|
||||
.is_none()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let job_handle = JobHandle::new(&outstanding_job_tx);
|
||||
let new_operation = IndexOperation::IndexFile {
|
||||
absolute_path: absolute_path.clone(),
|
||||
payload: PendingFile {
|
||||
worktree_db_id,
|
||||
relative_path,
|
||||
absolute_path,
|
||||
language,
|
||||
modified_time: entry.mtime,
|
||||
job_handle,
|
||||
},
|
||||
tx: parsing_files_tx.clone(),
|
||||
};
|
||||
let _ = job_queue_tx.try_send(new_operation);
|
||||
}
|
||||
}
|
||||
PathChange::Removed => {
|
||||
let new_operation = IndexOperation::DeleteFile {
|
||||
absolute_path,
|
||||
payload: DbOperation::Delete {
|
||||
worktree_id: worktree_db_id,
|
||||
path: relative_path,
|
||||
},
|
||||
tx: db_update_tx.clone(),
|
||||
};
|
||||
let _ = job_queue_tx.try_send(new_operation);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn initialize_project(
|
||||
&mut self,
|
||||
project: ModelHandle<Project>,
|
||||
cx: &mut ModelContext<Self>,
|
||||
) -> Task<Result<(usize, watch::Receiver<usize>)>> {
|
||||
let t0 = Instant::now();
|
||||
) -> Task<Result<()>> {
|
||||
log::trace!("Initializing Project for Semantic Index");
|
||||
let worktree_scans_complete = project
|
||||
.read(cx)
|
||||
.worktrees(cx)
|
||||
|
@ -592,6 +792,7 @@ impl SemanticIndex {
|
|||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let worktree_db_ids = project
|
||||
.read(cx)
|
||||
.worktrees(cx)
|
||||
|
@ -600,15 +801,21 @@ impl SemanticIndex {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let _subscription = cx.subscribe(&project, |this, project, event, cx| {
|
||||
if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
|
||||
let _ =
|
||||
this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id);
|
||||
};
|
||||
});
|
||||
|
||||
let language_registry = self.language_registry.clone();
|
||||
let db_update_tx = self.db_update_tx.clone();
|
||||
let parsing_files_tx = self.parsing_files_tx.clone();
|
||||
let db_update_tx = self.db_update_tx.clone();
|
||||
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
futures::future::join_all(worktree_scans_complete).await;
|
||||
|
||||
let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
|
||||
|
||||
let worktrees = project.read_with(&cx, |project, cx| {
|
||||
project
|
||||
.worktrees(cx)
|
||||
|
@ -618,6 +825,7 @@ impl SemanticIndex {
|
|||
|
||||
let mut worktree_file_mtimes = HashMap::new();
|
||||
let mut db_ids_by_worktree_id = HashMap::new();
|
||||
|
||||
for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
|
||||
let db_id = db_id?;
|
||||
db_ids_by_worktree_id.insert(worktree.id(), db_id);
|
||||
|
@ -628,34 +836,34 @@ impl SemanticIndex {
|
|||
);
|
||||
}
|
||||
|
||||
let worktree_db_ids = db_ids_by_worktree_id
|
||||
.iter()
|
||||
.map(|(a, b)| (*a, *b))
|
||||
.collect();
|
||||
|
||||
let (job_count_tx, job_count_rx) = watch::channel_with(0);
|
||||
let job_count_tx = Arc::new(Mutex::new(job_count_tx));
|
||||
this.update(&mut cx, |this, _| {
|
||||
this.projects.insert(
|
||||
project.downgrade(),
|
||||
ProjectState {
|
||||
worktree_db_ids: db_ids_by_worktree_id
|
||||
.iter()
|
||||
.map(|(a, b)| (*a, *b))
|
||||
.collect(),
|
||||
outstanding_job_count_rx: job_count_rx.clone(),
|
||||
_outstanding_job_count_tx: job_count_tx.clone(),
|
||||
},
|
||||
);
|
||||
});
|
||||
let job_count_tx_longlived = job_count_tx.clone();
|
||||
|
||||
cx.background()
|
||||
let worktree_files = cx
|
||||
.background()
|
||||
.spawn(async move {
|
||||
let mut count = 0;
|
||||
let mut worktree_files = Vec::new();
|
||||
for worktree in worktrees.into_iter() {
|
||||
let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
|
||||
let worktree_db_id = db_ids_by_worktree_id[&worktree.id()];
|
||||
for file in worktree.files(false, 0) {
|
||||
let absolute_path = worktree.absolutize(&file.path);
|
||||
|
||||
if file.is_external || file.is_ignored || file.is_symlink {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Ok(language) = language_registry
|
||||
.language_for_file(&absolute_path, None)
|
||||
.await
|
||||
{
|
||||
// Test if file is valid parseable file
|
||||
if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
|
||||
&& &language.name().as_ref() != &"Markdown"
|
||||
&& language
|
||||
|
@ -672,39 +880,84 @@ impl SemanticIndex {
|
|||
.map_or(false, |existing_mtime| existing_mtime == file.mtime);
|
||||
|
||||
if !already_stored {
|
||||
count += 1;
|
||||
|
||||
let job_handle = JobHandle::new(&job_count_tx);
|
||||
parsing_files_tx
|
||||
.try_send(PendingFile {
|
||||
worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
|
||||
worktree_files.push(IndexOperation::IndexFile {
|
||||
absolute_path: absolute_path.clone(),
|
||||
payload: PendingFile {
|
||||
worktree_db_id,
|
||||
relative_path: path_buf,
|
||||
absolute_path,
|
||||
language,
|
||||
job_handle,
|
||||
modified_time: file.mtime,
|
||||
})
|
||||
.unwrap();
|
||||
},
|
||||
tx: parsing_files_tx.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
for file in file_mtimes.keys() {
|
||||
db_update_tx
|
||||
.try_send(DbOperation::Delete {
|
||||
worktree_id: db_ids_by_worktree_id[&worktree.id()],
|
||||
path: file.to_owned(),
|
||||
})
|
||||
.unwrap();
|
||||
// Clean up entries from database that are no longer in the worktree.
|
||||
for (path, _) in file_mtimes {
|
||||
worktree_files.push(IndexOperation::DeleteFile {
|
||||
absolute_path: worktree.absolutize(path.as_path()),
|
||||
payload: DbOperation::Delete {
|
||||
worktree_id: worktree_db_id,
|
||||
path,
|
||||
},
|
||||
tx: db_update_tx.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
log::trace!(
|
||||
"walking worktree took {:?} milliseconds",
|
||||
t0.elapsed().as_millis()
|
||||
);
|
||||
anyhow::Ok((count, job_count_rx))
|
||||
anyhow::Ok(worktree_files)
|
||||
})
|
||||
.await
|
||||
.await?;
|
||||
|
||||
this.update(&mut cx, |this, cx| {
|
||||
let project_state = ProjectState::new(
|
||||
cx,
|
||||
_subscription,
|
||||
worktree_db_ids,
|
||||
job_count_rx,
|
||||
job_count_tx_longlived,
|
||||
);
|
||||
|
||||
for op in worktree_files {
|
||||
let _ = project_state.job_queue_tx.try_send(op);
|
||||
}
|
||||
|
||||
this.projects.insert(project.downgrade(), project_state);
|
||||
});
|
||||
Result::<(), _>::Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn index_project(
|
||||
&mut self,
|
||||
project: ModelHandle<Project>,
|
||||
cx: &mut ModelContext<Self>,
|
||||
) -> Task<Result<(usize, watch::Receiver<usize>)>> {
|
||||
let state = self.projects.get_mut(&project.downgrade());
|
||||
let state = if state.is_none() {
|
||||
return Task::Ready(Some(Err(anyhow!("Project not yet initialized"))));
|
||||
} else {
|
||||
state.unwrap()
|
||||
};
|
||||
|
||||
// let parsing_files_tx = self.parsing_files_tx.clone();
|
||||
// let db_update_tx = self.db_update_tx.clone();
|
||||
let job_count_rx = state.outstanding_job_count_rx.clone();
|
||||
let count = state.get_outstanding_count();
|
||||
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
this.update(&mut cx, |this, _| {
|
||||
let Some(state) = this.projects.get_mut(&project.downgrade()) else {
|
||||
return;
|
||||
};
|
||||
let _ = state.job_queue_tx.try_send(IndexOperation::FlushQueue);
|
||||
});
|
||||
|
||||
Ok((count, job_count_rx))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -86,6 +86,13 @@ async fn test_semantic_index(cx: &mut TestAppContext) {
|
|||
.unwrap();
|
||||
|
||||
let project = Project::test(fs.clone(), ["/the-root".as_ref()], cx).await;
|
||||
|
||||
let _ = store
|
||||
.update(cx, |store, cx| {
|
||||
store.initialize_project(project.clone(), cx)
|
||||
})
|
||||
.await;
|
||||
|
||||
let (file_count, outstanding_file_count) = store
|
||||
.update(cx, |store, cx| store.index_project(project.clone(), cx))
|
||||
.await
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue