Semantic index progress (#11071)
Release Notes: - N/A --------- Co-authored-by: Antonio Scandurra <me@as-cii.com> Co-authored-by: Kyle <kylek@zed.dev> Co-authored-by: Marshall <marshall@zed.dev> Co-authored-by: Marshall Bowers <elliott.codes@gmail.com>
This commit is contained in:
parent
1aa9c868d4
commit
b7d9aeb29d
10 changed files with 298 additions and 407 deletions
|
@ -30,6 +30,7 @@ language.workspace = true
|
|||
log.workspace = true
|
||||
heed.workspace = true
|
||||
open_ai.workspace = true
|
||||
parking_lot.workspace = true
|
||||
project.workspace = true
|
||||
settings.workspace = true
|
||||
serde.workspace = true
|
||||
|
|
|
@ -3,7 +3,7 @@ mod embedding;
|
|||
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use chunking::{chunk_text, Chunk};
|
||||
use collections::{Bound, HashMap};
|
||||
use collections::{Bound, HashMap, HashSet};
|
||||
pub use embedding::*;
|
||||
use fs::Fs;
|
||||
use futures::stream::StreamExt;
|
||||
|
@ -14,15 +14,17 @@ use gpui::{
|
|||
};
|
||||
use heed::types::{SerdeBincode, Str};
|
||||
use language::LanguageRegistry;
|
||||
use project::{Entry, Project, UpdatedEntriesSet, Worktree};
|
||||
use parking_lot::Mutex;
|
||||
use project::{Entry, Project, ProjectEntryId, UpdatedEntriesSet, Worktree};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use smol::channel;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
future::Future,
|
||||
num::NonZeroUsize,
|
||||
ops::Range,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
sync::{Arc, Weak},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use util::ResultExt;
|
||||
|
@ -102,19 +104,16 @@ pub struct ProjectIndex {
|
|||
worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
|
||||
language_registry: Arc<LanguageRegistry>,
|
||||
fs: Arc<dyn Fs>,
|
||||
pub last_status: Status,
|
||||
last_status: Status,
|
||||
status_tx: channel::Sender<()>,
|
||||
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||
_maintain_status: Task<()>,
|
||||
_subscription: Subscription,
|
||||
}
|
||||
|
||||
enum WorktreeIndexHandle {
|
||||
Loading {
|
||||
_task: Task<Result<()>>,
|
||||
},
|
||||
Loaded {
|
||||
index: Model<WorktreeIndex>,
|
||||
_subscription: Subscription,
|
||||
},
|
||||
Loading { _task: Task<Result<()>> },
|
||||
Loaded { index: Model<WorktreeIndex> },
|
||||
}
|
||||
|
||||
impl ProjectIndex {
|
||||
|
@ -126,20 +125,36 @@ impl ProjectIndex {
|
|||
) -> Self {
|
||||
let language_registry = project.read(cx).languages().clone();
|
||||
let fs = project.read(cx).fs().clone();
|
||||
let (status_tx, mut status_rx) = channel::unbounded();
|
||||
let mut this = ProjectIndex {
|
||||
db_connection,
|
||||
project: project.downgrade(),
|
||||
worktree_indices: HashMap::default(),
|
||||
language_registry,
|
||||
fs,
|
||||
status_tx,
|
||||
last_status: Status::Idle,
|
||||
embedding_provider,
|
||||
_subscription: cx.subscribe(&project, Self::handle_project_event),
|
||||
_maintain_status: cx.spawn(|this, mut cx| async move {
|
||||
while status_rx.next().await.is_some() {
|
||||
if this
|
||||
.update(&mut cx, |this, cx| this.update_status(cx))
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}),
|
||||
};
|
||||
this.update_worktree_indices(cx);
|
||||
this
|
||||
}
|
||||
|
||||
pub fn status(&self) -> Status {
|
||||
self.last_status
|
||||
}
|
||||
|
||||
fn handle_project_event(
|
||||
&mut self,
|
||||
_: Model<Project>,
|
||||
|
@ -180,19 +195,18 @@ impl ProjectIndex {
|
|||
self.db_connection.clone(),
|
||||
self.language_registry.clone(),
|
||||
self.fs.clone(),
|
||||
self.status_tx.clone(),
|
||||
self.embedding_provider.clone(),
|
||||
cx,
|
||||
);
|
||||
|
||||
let load_worktree = cx.spawn(|this, mut cx| async move {
|
||||
if let Some(index) = worktree_index.await.log_err() {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
if let Some(worktree_index) = worktree_index.await.log_err() {
|
||||
this.update(&mut cx, |this, _| {
|
||||
this.worktree_indices.insert(
|
||||
worktree_id,
|
||||
WorktreeIndexHandle::Loaded {
|
||||
_subscription: cx
|
||||
.observe(&index, |this, _, cx| this.update_status(cx)),
|
||||
index,
|
||||
index: worktree_index,
|
||||
},
|
||||
);
|
||||
})?;
|
||||
|
@ -215,22 +229,29 @@ impl ProjectIndex {
|
|||
}
|
||||
|
||||
fn update_status(&mut self, cx: &mut ModelContext<Self>) {
|
||||
let mut status = Status::Idle;
|
||||
for index in self.worktree_indices.values() {
|
||||
let mut indexing_count = 0;
|
||||
let mut any_loading = false;
|
||||
|
||||
for index in self.worktree_indices.values_mut() {
|
||||
match index {
|
||||
WorktreeIndexHandle::Loading { .. } => {
|
||||
status = Status::Scanning;
|
||||
any_loading = true;
|
||||
break;
|
||||
}
|
||||
WorktreeIndexHandle::Loaded { index, .. } => {
|
||||
if index.read(cx).status == Status::Scanning {
|
||||
status = Status::Scanning;
|
||||
break;
|
||||
}
|
||||
indexing_count += index.read(cx).entry_ids_being_indexed.len();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let status = if any_loading {
|
||||
Status::Loading
|
||||
} else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
|
||||
Status::Scanning { remaining_count }
|
||||
} else {
|
||||
Status::Idle
|
||||
};
|
||||
|
||||
if status != self.last_status {
|
||||
self.last_status = status;
|
||||
cx.emit(status);
|
||||
|
@ -263,6 +284,17 @@ impl ProjectIndex {
|
|||
results
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn path_count(&self, cx: &AppContext) -> Result<u64> {
|
||||
let mut result = 0;
|
||||
for worktree_index in self.worktree_indices.values() {
|
||||
if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
|
||||
result += index.read(cx).path_count()?;
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SearchResult {
|
||||
|
@ -275,7 +307,8 @@ pub struct SearchResult {
|
|||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||
pub enum Status {
|
||||
Idle,
|
||||
Scanning,
|
||||
Loading,
|
||||
Scanning { remaining_count: NonZeroUsize },
|
||||
}
|
||||
|
||||
impl EventEmitter<Status> for ProjectIndex {}
|
||||
|
@ -287,7 +320,7 @@ struct WorktreeIndex {
|
|||
language_registry: Arc<LanguageRegistry>,
|
||||
fs: Arc<dyn Fs>,
|
||||
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||
status: Status,
|
||||
entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
||||
_index_entries: Task<Result<()>>,
|
||||
_subscription: Subscription,
|
||||
}
|
||||
|
@ -298,6 +331,7 @@ impl WorktreeIndex {
|
|||
db_connection: heed::Env,
|
||||
language_registry: Arc<LanguageRegistry>,
|
||||
fs: Arc<dyn Fs>,
|
||||
status_tx: channel::Sender<()>,
|
||||
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||
cx: &mut AppContext,
|
||||
) -> Task<Result<Model<Self>>> {
|
||||
|
@ -321,6 +355,7 @@ impl WorktreeIndex {
|
|||
worktree,
|
||||
db_connection,
|
||||
db,
|
||||
status_tx,
|
||||
language_registry,
|
||||
fs,
|
||||
embedding_provider,
|
||||
|
@ -330,10 +365,12 @@ impl WorktreeIndex {
|
|||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new(
|
||||
worktree: Model<Worktree>,
|
||||
db_connection: heed::Env,
|
||||
db: heed::Database<Str, SerdeBincode<EmbeddedFile>>,
|
||||
status: channel::Sender<()>,
|
||||
language_registry: Arc<LanguageRegistry>,
|
||||
fs: Arc<dyn Fs>,
|
||||
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||
|
@ -353,7 +390,7 @@ impl WorktreeIndex {
|
|||
language_registry,
|
||||
fs,
|
||||
embedding_provider,
|
||||
status: Status::Idle,
|
||||
entry_ids_being_indexed: Arc::new(IndexingEntrySet::new(status)),
|
||||
_index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)),
|
||||
_subscription,
|
||||
}
|
||||
|
@ -364,28 +401,14 @@ impl WorktreeIndex {
|
|||
updated_entries: channel::Receiver<UpdatedEntriesSet>,
|
||||
mut cx: AsyncAppContext,
|
||||
) -> Result<()> {
|
||||
let index = this.update(&mut cx, |this, cx| {
|
||||
cx.notify();
|
||||
this.status = Status::Scanning;
|
||||
this.index_entries_changed_on_disk(cx)
|
||||
})?;
|
||||
let index = this.update(&mut cx, |this, cx| this.index_entries_changed_on_disk(cx))?;
|
||||
index.await.log_err();
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.status = Status::Idle;
|
||||
cx.notify();
|
||||
})?;
|
||||
|
||||
while let Ok(updated_entries) = updated_entries.recv().await {
|
||||
let index = this.update(&mut cx, |this, cx| {
|
||||
cx.notify();
|
||||
this.status = Status::Scanning;
|
||||
this.index_updated_entries(updated_entries, cx)
|
||||
})?;
|
||||
index.await.log_err();
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.status = Status::Idle;
|
||||
cx.notify();
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -426,6 +449,7 @@ impl WorktreeIndex {
|
|||
let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
|
||||
let db_connection = self.db_connection.clone();
|
||||
let db = self.db;
|
||||
let entries_being_indexed = self.entry_ids_being_indexed.clone();
|
||||
let task = cx.background_executor().spawn(async move {
|
||||
let txn = db_connection
|
||||
.read_txn()
|
||||
|
@ -476,7 +500,8 @@ impl WorktreeIndex {
|
|||
}
|
||||
|
||||
if entry.mtime != saved_mtime {
|
||||
updated_entries_tx.send(entry.clone()).await?;
|
||||
let handle = entries_being_indexed.insert(&entry);
|
||||
updated_entries_tx.send((entry.clone(), handle)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -505,6 +530,7 @@ impl WorktreeIndex {
|
|||
) -> ScanEntries {
|
||||
let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
|
||||
let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
|
||||
let entries_being_indexed = self.entry_ids_being_indexed.clone();
|
||||
let task = cx.background_executor().spawn(async move {
|
||||
for (path, entry_id, status) in updated_entries.iter() {
|
||||
match status {
|
||||
|
@ -513,7 +539,8 @@ impl WorktreeIndex {
|
|||
| project::PathChange::AddedOrUpdated => {
|
||||
if let Some(entry) = worktree.entry_for_id(*entry_id) {
|
||||
if entry.is_file() {
|
||||
updated_entries_tx.send(entry.clone()).await?;
|
||||
let handle = entries_being_indexed.insert(&entry);
|
||||
updated_entries_tx.send((entry.clone(), handle)).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -542,7 +569,7 @@ impl WorktreeIndex {
|
|||
fn chunk_files(
|
||||
&self,
|
||||
worktree_abs_path: Arc<Path>,
|
||||
entries: channel::Receiver<Entry>,
|
||||
entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
|
||||
cx: &AppContext,
|
||||
) -> ChunkFiles {
|
||||
let language_registry = self.language_registry.clone();
|
||||
|
@ -553,7 +580,7 @@ impl WorktreeIndex {
|
|||
.scoped(|cx| {
|
||||
for _ in 0..cx.num_cpus() {
|
||||
cx.spawn(async {
|
||||
while let Ok(entry) = entries.recv().await {
|
||||
while let Ok((entry, handle)) = entries.recv().await {
|
||||
let entry_abs_path = worktree_abs_path.join(&entry.path);
|
||||
let Some(text) = fs
|
||||
.load(&entry_abs_path)
|
||||
|
@ -572,8 +599,8 @@ impl WorktreeIndex {
|
|||
let grammar =
|
||||
language.as_ref().and_then(|language| language.grammar());
|
||||
let chunked_file = ChunkedFile {
|
||||
worktree_root: worktree_abs_path.clone(),
|
||||
chunks: chunk_text(&text, grammar),
|
||||
handle,
|
||||
entry,
|
||||
text,
|
||||
};
|
||||
|
@ -622,7 +649,11 @@ impl WorktreeIndex {
|
|||
|
||||
let mut embeddings = Vec::new();
|
||||
for embedding_batch in chunks.chunks(embedding_provider.batch_size()) {
|
||||
embeddings.extend(embedding_provider.embed(embedding_batch).await?);
|
||||
if let Some(batch_embeddings) =
|
||||
embedding_provider.embed(embedding_batch).await.log_err()
|
||||
{
|
||||
embeddings.extend_from_slice(&batch_embeddings);
|
||||
}
|
||||
}
|
||||
|
||||
let mut embeddings = embeddings.into_iter();
|
||||
|
@ -643,7 +674,9 @@ impl WorktreeIndex {
|
|||
chunks: embedded_chunks,
|
||||
};
|
||||
|
||||
embedded_files_tx.send(embedded_file).await?;
|
||||
embedded_files_tx
|
||||
.send((embedded_file, chunked_file.handle))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -658,7 +691,7 @@ impl WorktreeIndex {
|
|||
fn persist_embeddings(
|
||||
&self,
|
||||
mut deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
|
||||
embedded_files: channel::Receiver<EmbeddedFile>,
|
||||
embedded_files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
|
||||
cx: &AppContext,
|
||||
) -> Task<Result<()>> {
|
||||
let db_connection = self.db_connection.clone();
|
||||
|
@ -676,12 +709,15 @@ impl WorktreeIndex {
|
|||
let mut embedded_files = embedded_files.chunks_timeout(4096, Duration::from_secs(2));
|
||||
while let Some(embedded_files) = embedded_files.next().await {
|
||||
let mut txn = db_connection.write_txn()?;
|
||||
for file in embedded_files {
|
||||
for (file, _) in &embedded_files {
|
||||
log::debug!("saving embedding for file {:?}", file.path);
|
||||
let key = db_key_for_path(&file.path);
|
||||
db.put(&mut txn, &key, &file)?;
|
||||
db.put(&mut txn, &key, file)?;
|
||||
}
|
||||
txn.commit()?;
|
||||
eprintln!("committed {:?}", embedded_files.len());
|
||||
|
||||
drop(embedded_files);
|
||||
log::debug!("committed");
|
||||
}
|
||||
|
||||
|
@ -789,10 +825,19 @@ impl WorktreeIndex {
|
|||
Ok(search_results)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn path_count(&self) -> Result<u64> {
|
||||
let txn = self
|
||||
.db_connection
|
||||
.read_txn()
|
||||
.context("failed to create read transaction")?;
|
||||
Ok(self.db.len(&txn)?)
|
||||
}
|
||||
}
|
||||
|
||||
struct ScanEntries {
|
||||
updated_entries: channel::Receiver<Entry>,
|
||||
updated_entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
|
||||
deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
|
||||
task: Task<Result<()>>,
|
||||
}
|
||||
|
@ -803,15 +848,14 @@ struct ChunkFiles {
|
|||
}
|
||||
|
||||
struct ChunkedFile {
|
||||
#[allow(dead_code)]
|
||||
pub worktree_root: Arc<Path>,
|
||||
pub entry: Entry,
|
||||
pub handle: IndexingEntryHandle,
|
||||
pub text: String,
|
||||
pub chunks: Vec<Chunk>,
|
||||
}
|
||||
|
||||
struct EmbedFiles {
|
||||
files: channel::Receiver<EmbeddedFile>,
|
||||
files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
|
||||
task: Task<Result<()>>,
|
||||
}
|
||||
|
||||
|
@ -828,6 +872,47 @@ struct EmbeddedChunk {
|
|||
embedding: Embedding,
|
||||
}
|
||||
|
||||
struct IndexingEntrySet {
|
||||
entry_ids: Mutex<HashSet<ProjectEntryId>>,
|
||||
tx: channel::Sender<()>,
|
||||
}
|
||||
|
||||
struct IndexingEntryHandle {
|
||||
entry_id: ProjectEntryId,
|
||||
set: Weak<IndexingEntrySet>,
|
||||
}
|
||||
|
||||
impl IndexingEntrySet {
|
||||
fn new(tx: channel::Sender<()>) -> Self {
|
||||
Self {
|
||||
entry_ids: Default::default(),
|
||||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(self: &Arc<Self>, entry: &project::Entry) -> IndexingEntryHandle {
|
||||
self.entry_ids.lock().insert(entry.id);
|
||||
self.tx.send_blocking(()).ok();
|
||||
IndexingEntryHandle {
|
||||
entry_id: entry.id,
|
||||
set: Arc::downgrade(self),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.entry_ids.lock().len()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for IndexingEntryHandle {
|
||||
fn drop(&mut self) {
|
||||
if let Some(set) = self.set.upgrade() {
|
||||
set.tx.send_blocking(()).ok();
|
||||
set.entry_ids.lock().remove(&self.entry_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn db_key_for_path(path: &Arc<Path>) -> String {
|
||||
path.to_string_lossy().replace('/', "\0")
|
||||
}
|
||||
|
@ -835,10 +920,7 @@ fn db_key_for_path(path: &Arc<Path>) -> String {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
|
||||
use gpui::{Global, TestAppContext};
|
||||
use language::language_settings::AllLanguageSettings;
|
||||
use project::Project;
|
||||
|
@ -922,18 +1004,13 @@ mod tests {
|
|||
|
||||
let project_index = cx.update(|cx| semantic_index.project_index(project.clone(), cx));
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let mut tx = Some(tx);
|
||||
let subscription = cx.update(|cx| {
|
||||
cx.subscribe(&project_index, move |_, event, _| {
|
||||
if let Some(tx) = tx.take() {
|
||||
_ = tx.send(*event);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
rx.await.expect("no event emitted");
|
||||
drop(subscription);
|
||||
while project_index
|
||||
.read_with(cx, |index, cx| index.path_count(cx))
|
||||
.unwrap()
|
||||
== 0
|
||||
{
|
||||
project_index.next_event(cx).await;
|
||||
}
|
||||
|
||||
let results = cx
|
||||
.update(|cx| {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue