accomodate for duplicate entries in indexing queue
Co-authored-by: Piotr <piotr@zed.dev>
This commit is contained in:
parent
3f9f742530
commit
aa07872a24
1 changed files with 28 additions and 9 deletions
|
@ -12,6 +12,7 @@ use db::VectorDatabase;
|
||||||
use embedding::{EmbeddingProvider, OpenAIEmbeddings};
|
use embedding::{EmbeddingProvider, OpenAIEmbeddings};
|
||||||
use futures::{channel::oneshot, Future};
|
use futures::{channel::oneshot, Future};
|
||||||
use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
|
use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
|
||||||
|
use isahc::http::header::OccupiedEntry;
|
||||||
use language::{Anchor, Buffer, Language, LanguageRegistry};
|
use language::{Anchor, Buffer, Language, LanguageRegistry};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
|
use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
|
||||||
|
@ -130,10 +131,11 @@ impl ProjectState {
|
||||||
|
|
||||||
let (job_queue_tx, job_queue_rx) = channel::unbounded();
|
let (job_queue_tx, job_queue_rx) = channel::unbounded();
|
||||||
let _queue_update_task = cx.background().spawn({
|
let _queue_update_task = cx.background().spawn({
|
||||||
let mut worktree_queue = Vec::new();
|
let mut worktree_queue = HashMap::new();
|
||||||
async move {
|
async move {
|
||||||
while let Ok(operation) = job_queue_rx.recv().await {
|
while let Ok(operation) = job_queue_rx.recv().await {
|
||||||
Self::update_queue(&mut worktree_queue, operation);
|
Self::update_queue(&mut worktree_queue, operation);
|
||||||
|
dbg!(worktree_queue.len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -153,24 +155,37 @@ impl ProjectState {
|
||||||
self.outstanding_job_count_rx.borrow().clone()
|
self.outstanding_job_count_rx.borrow().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_queue(queue: &mut Vec<IndexOperation>, operation: IndexOperation) {
|
fn update_queue(queue: &mut HashMap<PathBuf, IndexOperation>, operation: IndexOperation) {
|
||||||
match operation {
|
match operation {
|
||||||
IndexOperation::FlushQueue => {
|
IndexOperation::FlushQueue => {
|
||||||
while let Some(op) = queue.pop() {
|
let queue = std::mem::take(queue);
|
||||||
|
for (_, op) in queue {
|
||||||
match op {
|
match op {
|
||||||
IndexOperation::IndexFile { payload, tx } => {
|
IndexOperation::IndexFile {
|
||||||
|
absolute_path,
|
||||||
|
payload,
|
||||||
|
tx,
|
||||||
|
} => {
|
||||||
tx.try_send(payload);
|
tx.try_send(payload);
|
||||||
}
|
}
|
||||||
IndexOperation::DeleteFile { payload, tx } => {
|
IndexOperation::DeleteFile {
|
||||||
|
absolute_path,
|
||||||
|
payload,
|
||||||
|
tx,
|
||||||
|
} => {
|
||||||
tx.try_send(payload);
|
tx.try_send(payload);
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
IndexOperation::IndexFile {
|
||||||
// TODO: This has to accomodate for duplicate files to index.
|
ref absolute_path, ..
|
||||||
queue.push(operation);
|
}
|
||||||
|
| IndexOperation::DeleteFile {
|
||||||
|
ref absolute_path, ..
|
||||||
|
} => {
|
||||||
|
queue.insert(absolute_path.clone(), operation);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -209,13 +224,14 @@ pub struct PendingFile {
|
||||||
modified_time: SystemTime,
|
modified_time: SystemTime,
|
||||||
job_handle: JobHandle,
|
job_handle: JobHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum IndexOperation {
|
enum IndexOperation {
|
||||||
IndexFile {
|
IndexFile {
|
||||||
|
absolute_path: PathBuf,
|
||||||
payload: PendingFile,
|
payload: PendingFile,
|
||||||
tx: channel::Sender<PendingFile>,
|
tx: channel::Sender<PendingFile>,
|
||||||
},
|
},
|
||||||
DeleteFile {
|
DeleteFile {
|
||||||
|
absolute_path: PathBuf,
|
||||||
payload: DbOperation,
|
payload: DbOperation,
|
||||||
tx: channel::Sender<DbOperation>,
|
tx: channel::Sender<DbOperation>,
|
||||||
},
|
},
|
||||||
|
@ -718,6 +734,7 @@ impl SemanticIndex {
|
||||||
|
|
||||||
let job_handle = JobHandle::new(&outstanding_job_tx);
|
let job_handle = JobHandle::new(&outstanding_job_tx);
|
||||||
let new_operation = IndexOperation::IndexFile {
|
let new_operation = IndexOperation::IndexFile {
|
||||||
|
absolute_path: absolute_path.clone(),
|
||||||
payload: PendingFile {
|
payload: PendingFile {
|
||||||
worktree_db_id,
|
worktree_db_id,
|
||||||
relative_path,
|
relative_path,
|
||||||
|
@ -733,6 +750,7 @@ impl SemanticIndex {
|
||||||
}
|
}
|
||||||
PathChange::Removed => {
|
PathChange::Removed => {
|
||||||
let new_operation = IndexOperation::DeleteFile {
|
let new_operation = IndexOperation::DeleteFile {
|
||||||
|
absolute_path,
|
||||||
payload: DbOperation::Delete {
|
payload: DbOperation::Delete {
|
||||||
worktree_id: worktree_db_id,
|
worktree_id: worktree_db_id,
|
||||||
path: relative_path,
|
path: relative_path,
|
||||||
|
@ -853,6 +871,7 @@ impl SemanticIndex {
|
||||||
if !already_stored {
|
if !already_stored {
|
||||||
let job_handle = JobHandle::new(&job_count_tx);
|
let job_handle = JobHandle::new(&job_count_tx);
|
||||||
worktree_files.push(IndexOperation::IndexFile {
|
worktree_files.push(IndexOperation::IndexFile {
|
||||||
|
absolute_path: absolute_path.clone(),
|
||||||
payload: PendingFile {
|
payload: PendingFile {
|
||||||
worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
|
worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
|
||||||
relative_path: path_buf,
|
relative_path: path_buf,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue