use anyhow::{Context as _, Result, anyhow}; use arrayvec::ArrayString; use fs::{Fs, MTime}; use futures::{TryFutureExt, stream::StreamExt}; use futures_batch::ChunksTimeoutStreamExt; use gpui::{App, AppContext as _, Entity, Task}; use heed::{ RoTxn, types::{SerdeBincode, Str}, }; use language_model::{ LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest, LanguageModelRequestMessage, Role, }; use log; use parking_lot::Mutex; use project::{Entry, UpdatedEntriesSet, Worktree}; use serde::{Deserialize, Serialize}; use smol::channel; use std::{ future::Future, path::Path, pin::pin, sync::Arc, time::{Duration, Instant}, }; use util::ResultExt; use worktree::Snapshot; use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog}; #[derive(Serialize, Deserialize, Debug)] pub struct FileSummary { pub filename: String, pub summary: String, } #[derive(Debug, Serialize, Deserialize)] struct UnsummarizedFile { // Path to the file on disk path: Arc, // The mtime of the file on disk mtime: Option, // BLAKE3 hash of the source file's contents digest: Blake3Digest, // The source file's contents contents: String, } #[derive(Debug, Serialize, Deserialize)] struct SummarizedFile { // Path to the file on disk path: String, // The mtime of the file on disk mtime: Option, // BLAKE3 hash of the source file's contents digest: Blake3Digest, // The LLM's summary of the file's contents summary: String, } /// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246 pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>; #[derive(Debug, Serialize, Deserialize)] pub struct FileDigest { pub mtime: Option, pub digest: Blake3Digest, } struct NeedsSummary { files: channel::Receiver, task: Task>, } struct SummarizeFiles { files: channel::Receiver, task: Task>, } pub struct SummaryIndex { worktree: Entity, fs: Arc, db_connection: heed::Env, file_digest_db: heed::Database>, // Key: file path. Val: BLAKE3 digest of its contents. summary_db: heed::Database, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents. backlog: Arc>, _entry_ids_being_indexed: Arc, // TODO can this be removed? } struct Backlogged { paths_to_digest: channel::Receiver, Option)>>, task: Task>, } struct MightNeedSummaryFiles { files: channel::Receiver, task: Task>, } impl SummaryIndex { pub fn new( worktree: Entity, fs: Arc, db_connection: heed::Env, file_digest_db: heed::Database>, summary_db: heed::Database, Str>, _entry_ids_being_indexed: Arc, ) -> Self { Self { worktree, fs, db_connection, file_digest_db, summary_db, _entry_ids_being_indexed, backlog: Default::default(), } } pub fn file_digest_db(&self) -> heed::Database> { self.file_digest_db } pub fn summary_db(&self) -> heed::Database, Str> { self.summary_db } pub fn index_entries_changed_on_disk( &self, is_auto_available: bool, cx: &App, ) -> impl Future> + use<> { let start = Instant::now(); let backlogged; let digest; let needs_summary; let summaries; let persist; if is_auto_available { let worktree = self.worktree.read(cx).snapshot(); let worktree_abs_path = worktree.abs_path().clone(); backlogged = self.scan_entries(worktree, cx); digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx); needs_summary = self.check_summary_cache(digest.files, cx); summaries = self.summarize_files(needs_summary.files, cx); persist = self.persist_summaries(summaries.files, cx); } else { // This feature is only staff-shipped, so make the rest of these no-ops. backlogged = Backlogged { paths_to_digest: channel::unbounded().1, task: Task::ready(Ok(())), }; digest = MightNeedSummaryFiles { files: channel::unbounded().1, task: Task::ready(Ok(())), }; needs_summary = NeedsSummary { files: channel::unbounded().1, task: Task::ready(Ok(())), }; summaries = SummarizeFiles { files: channel::unbounded().1, task: Task::ready(Ok(())), }; persist = Task::ready(Ok(())); } async move { futures::try_join!( backlogged.task, digest.task, needs_summary.task, summaries.task, persist )?; if is_auto_available { log::info!( "Summarizing everything that changed on disk took {:?}", start.elapsed() ); } Ok(()) } } pub fn index_updated_entries( &mut self, updated_entries: UpdatedEntriesSet, is_auto_available: bool, cx: &App, ) -> impl Future> + use<> { let start = Instant::now(); let backlogged; let digest; let needs_summary; let summaries; let persist; if is_auto_available { let worktree = self.worktree.read(cx).snapshot(); let worktree_abs_path = worktree.abs_path().clone(); backlogged = self.scan_updated_entries(worktree, updated_entries.clone(), cx); digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx); needs_summary = self.check_summary_cache(digest.files, cx); summaries = self.summarize_files(needs_summary.files, cx); persist = self.persist_summaries(summaries.files, cx); } else { // This feature is only staff-shipped, so make the rest of these no-ops. backlogged = Backlogged { paths_to_digest: channel::unbounded().1, task: Task::ready(Ok(())), }; digest = MightNeedSummaryFiles { files: channel::unbounded().1, task: Task::ready(Ok(())), }; needs_summary = NeedsSummary { files: channel::unbounded().1, task: Task::ready(Ok(())), }; summaries = SummarizeFiles { files: channel::unbounded().1, task: Task::ready(Ok(())), }; persist = Task::ready(Ok(())); } async move { futures::try_join!( backlogged.task, digest.task, needs_summary.task, summaries.task, persist )?; log::debug!("Summarizing updated entries took {:?}", start.elapsed()); Ok(()) } } fn check_summary_cache( &self, might_need_summary: channel::Receiver, cx: &App, ) -> NeedsSummary { let db_connection = self.db_connection.clone(); let db = self.summary_db; let (needs_summary_tx, needs_summary_rx) = channel::bounded(512); let task = cx.background_spawn(async move { let mut might_need_summary = pin!(might_need_summary); while let Some(file) = might_need_summary.next().await { let tx = db_connection .read_txn() .context("Failed to create read transaction for checking which hashes are in summary cache")?; match db.get(&tx, &file.digest) { Ok(opt_answer) => { if opt_answer.is_none() { // It's not in the summary cache db, so we need to summarize it. log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest); needs_summary_tx.send(file).await?; } else { log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest); } } Err(err) => { log::error!("Reading from the summaries database failed: {:?}", err); } } } Ok(()) }); NeedsSummary { files: needs_summary_rx, task, } } fn scan_entries(&self, worktree: Snapshot, cx: &App) -> Backlogged { let (tx, rx) = channel::bounded(512); let db_connection = self.db_connection.clone(); let digest_db = self.file_digest_db; let backlog = Arc::clone(&self.backlog); let task = cx.background_spawn(async move { let txn = db_connection .read_txn() .context("failed to create read transaction")?; for entry in worktree.files(false, 0) { let needs_summary = Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry); if !needs_summary.is_empty() { tx.send(needs_summary).await?; } } // TODO delete db entries for deleted files Ok(()) }); Backlogged { paths_to_digest: rx, task, } } fn add_to_backlog( backlog: Arc>, digest_db: heed::Database>, txn: &RoTxn<'_>, entry: &Entry, ) -> Vec<(Arc, Option)> { let entry_db_key = db_key_for_path(&entry.path); match digest_db.get(&txn, &entry_db_key) { Ok(opt_saved_digest) => { // The file path is the same, but the mtime is different. (Or there was no mtime.) // It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents. if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) { let mut backlog = backlog.lock(); log::info!( "Inserting {:?} ({:?} bytes) into backlog", &entry.path, entry.size, ); backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime); if backlog.needs_drain() { log::info!("Draining summary backlog..."); return backlog.drain().collect(); } } } Err(err) => { log::error!( "Error trying to get file digest db entry {:?}: {:?}", &entry_db_key, err ); } } Vec::new() } fn scan_updated_entries( &self, worktree: Snapshot, updated_entries: UpdatedEntriesSet, cx: &App, ) -> Backlogged { log::info!("Scanning for updated entries that might need summarization..."); let (tx, rx) = channel::bounded(512); // let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128); let db_connection = self.db_connection.clone(); let digest_db = self.file_digest_db; let backlog = Arc::clone(&self.backlog); let task = cx.background_spawn(async move { let txn = db_connection .read_txn() .context("failed to create read transaction")?; for (path, entry_id, status) in updated_entries.iter() { match status { project::PathChange::Loaded | project::PathChange::Added | project::PathChange::Updated | project::PathChange::AddedOrUpdated => { if let Some(entry) = worktree.entry_for_id(*entry_id) { if entry.is_file() { let needs_summary = Self::add_to_backlog( Arc::clone(&backlog), digest_db, &txn, entry, ); if !needs_summary.is_empty() { tx.send(needs_summary).await?; } } } } project::PathChange::Removed => { let _db_path = db_key_for_path(path); // TODO delete db entries for deleted files // deleted_entry_ranges_tx // .send((Bound::Included(db_path.clone()), Bound::Included(db_path))) // .await?; } } } Ok(()) }); Backlogged { paths_to_digest: rx, // deleted_entry_ranges: deleted_entry_ranges_rx, task, } } fn digest_files( &self, paths: channel::Receiver, Option)>>, worktree_abs_path: Arc, cx: &App, ) -> MightNeedSummaryFiles { let fs = self.fs.clone(); let (rx, tx) = channel::bounded(2048); let task = cx.spawn(async move |cx| { cx.background_executor() .scoped(|cx| { for _ in 0..cx.num_cpus() { cx.spawn(async { while let Ok(pairs) = paths.recv().await { // Note: we could process all these files concurrently if desired. Might or might not speed things up. for (path, mtime) in pairs { let entry_abs_path = worktree_abs_path.join(&path); // Load the file's contents and compute its hash digest. let unsummarized_file = { let Some(contents) = fs .load(&entry_abs_path) .await .with_context(|| { format!("failed to read path {entry_abs_path:?}") }) .log_err() else { continue; }; let digest = { let mut hasher = blake3::Hasher::new(); // Incorporate both the (relative) file path as well as the contents of the file into the hash. // This is because in some languages and frameworks, identical files can do different things // depending on their paths (e.g. Rails controllers). It's also why we send the path to the model. hasher.update(path.display().to_string().as_bytes()); hasher.update(contents.as_bytes()); hasher.finalize().to_hex() }; UnsummarizedFile { digest, contents, path, mtime, } }; if let Err(err) = rx .send(unsummarized_file) .map_err(|error| anyhow!(error)) .await { log::error!("Error: {:?}", err); return; } } } }); } }) .await; Ok(()) }); MightNeedSummaryFiles { files: tx, task } } fn summarize_files( &self, unsummarized_files: channel::Receiver, cx: &App, ) -> SummarizeFiles { let (summarized_tx, summarized_rx) = channel::bounded(512); let task = cx.spawn(async move |cx| { while let Ok(file) = unsummarized_files.recv().await { log::debug!("Summarizing {:?}", file); let summary = cx .update(|cx| Self::summarize_code(&file.contents, &file.path, cx))? .await .unwrap_or_else(|err| { // Log a warning because we'll continue anyway. // In the future, we may want to try splitting it up into multiple requests and concatenating the summaries, // but this might give bad summaries due to cutting off source code files in the middle. log::warn!("Failed to summarize {} - {:?}", file.path.display(), err); String::new() }); // Note that the summary could be empty because of an error talking to a cloud provider, // e.g. because the context limit was exceeded. In that case, we return Ok(String::new()). if !summary.is_empty() { summarized_tx .send(SummarizedFile { path: file.path.display().to_string(), digest: file.digest, summary, mtime: file.mtime, }) .await? } } Ok(()) }); SummarizeFiles { files: summarized_rx, task, } } fn summarize_code( code: &str, path: &Path, cx: &App, ) -> impl Future> + use<> { let start = Instant::now(); let (summary_model_id, use_cache): (LanguageModelId, bool) = ( "Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings. false, // qwen2 doesn't have a cache, but we should probably infer this from the model ); let Some(model) = LanguageModelRegistry::read_global(cx) .available_models(cx) .find(|model| &model.id() == &summary_model_id) else { return cx.background_spawn(async move { anyhow::bail!("Couldn't find the preferred summarization model ({summary_model_id:?}) in the language registry's available models") }); }; let utf8_path = path.to_string_lossy(); const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:"; let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}"); log::debug!( "Summarizing code by sending this prompt to {:?}: {:?}", model.name(), &prompt ); let request = LanguageModelRequest { thread_id: None, prompt_id: None, mode: None, intent: None, messages: vec![LanguageModelRequestMessage { role: Role::User, content: vec![prompt.into()], cache: use_cache, }], tools: Vec::new(), tool_choice: None, stop: Vec::new(), temperature: None, thinking_allowed: true, }; let code_len = code.len(); cx.spawn(async move |cx| { let stream = model.stream_completion(request, &cx); cx.background_spawn(async move { let answer: String = stream .await? .filter_map(|event| async { if let Ok(LanguageModelCompletionEvent::Text(text)) = event { Some(text) } else { None } }) .collect() .await; log::info!( "It took {:?} to summarize {:?} bytes of code.", start.elapsed(), code_len ); log::debug!("Summary was: {:?}", &answer); Ok(answer) }) .await // TODO if summarization failed, put it back in the backlog! }) } fn persist_summaries( &self, summaries: channel::Receiver, cx: &App, ) -> Task> { let db_connection = self.db_connection.clone(); let digest_db = self.file_digest_db; let summary_db = self.summary_db; cx.background_spawn(async move { let mut summaries = pin!(summaries.chunks_timeout(4096, Duration::from_secs(2))); while let Some(summaries) = summaries.next().await { let mut txn = db_connection.write_txn()?; for file in &summaries { log::debug!( "Saving summary of {:?} - which is {} bytes of summary for content digest {:?}", &file.path, file.summary.len(), file.digest ); digest_db.put( &mut txn, &file.path, &FileDigest { mtime: file.mtime, digest: file.digest, }, )?; summary_db.put(&mut txn, &file.digest, &file.summary)?; } txn.commit()?; drop(summaries); log::debug!("committed summaries"); } Ok(()) }) } /// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately. pub(crate) fn flush_backlog( &self, worktree_abs_path: Arc, cx: &App, ) -> impl Future> + use<> { let start = Instant::now(); let backlogged = { let (tx, rx) = channel::bounded(512); let needs_summary: Vec<(Arc, Option)> = { let mut backlog = self.backlog.lock(); backlog.drain().collect() }; let task = cx.background_spawn(async move { tx.send(needs_summary).await?; Ok(()) }); Backlogged { paths_to_digest: rx, task, } }; let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx); let needs_summary = self.check_summary_cache(digest.files, cx); let summaries = self.summarize_files(needs_summary.files, cx); let persist = self.persist_summaries(summaries.files, cx); async move { futures::try_join!( backlogged.task, digest.task, needs_summary.task, summaries.task, persist )?; log::info!("Summarizing backlogged entries took {:?}", start.elapsed()); Ok(()) } } pub(crate) fn backlog_len(&self) -> usize { self.backlog.lock().len() } } fn db_key_for_path(path: &Arc) -> String { path.to_string_lossy().replace('/', "\0") }