
Release Notes: - Allow Anthropic custom models to override "temperature" This also centralized the defaulting of "temperature" to be inside of each model's `into_x` call instead of being sprinkled around the code.
693 lines
25 KiB
Rust
693 lines
25 KiB
Rust
use anyhow::{anyhow, Context as _, Result};
|
|
use arrayvec::ArrayString;
|
|
use fs::Fs;
|
|
use futures::{stream::StreamExt, TryFutureExt};
|
|
use futures_batch::ChunksTimeoutStreamExt;
|
|
use gpui::{AppContext, Model, Task};
|
|
use heed::{
|
|
types::{SerdeBincode, Str},
|
|
RoTxn,
|
|
};
|
|
use language_model::{
|
|
LanguageModelCompletionEvent, LanguageModelId, LanguageModelRegistry, LanguageModelRequest,
|
|
LanguageModelRequestMessage, Role,
|
|
};
|
|
use log;
|
|
use parking_lot::Mutex;
|
|
use project::{Entry, UpdatedEntriesSet, Worktree};
|
|
use serde::{Deserialize, Serialize};
|
|
use smol::channel;
|
|
use std::{
|
|
future::Future,
|
|
path::Path,
|
|
sync::Arc,
|
|
time::{Duration, Instant, SystemTime},
|
|
};
|
|
use util::ResultExt;
|
|
use worktree::Snapshot;
|
|
|
|
use crate::{indexing::IndexingEntrySet, summary_backlog::SummaryBacklog};
|
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
pub struct FileSummary {
|
|
pub filename: String,
|
|
pub summary: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
struct UnsummarizedFile {
|
|
// Path to the file on disk
|
|
path: Arc<Path>,
|
|
// The mtime of the file on disk
|
|
mtime: Option<SystemTime>,
|
|
// BLAKE3 hash of the source file's contents
|
|
digest: Blake3Digest,
|
|
// The source file's contents
|
|
contents: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
struct SummarizedFile {
|
|
// Path to the file on disk
|
|
path: String,
|
|
// The mtime of the file on disk
|
|
mtime: Option<SystemTime>,
|
|
// BLAKE3 hash of the source file's contents
|
|
digest: Blake3Digest,
|
|
// The LLM's summary of the file's contents
|
|
summary: String,
|
|
}
|
|
|
|
/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
|
|
pub type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
pub struct FileDigest {
|
|
pub mtime: Option<SystemTime>,
|
|
pub digest: Blake3Digest,
|
|
}
|
|
|
|
struct NeedsSummary {
|
|
files: channel::Receiver<UnsummarizedFile>,
|
|
task: Task<Result<()>>,
|
|
}
|
|
|
|
struct SummarizeFiles {
|
|
files: channel::Receiver<SummarizedFile>,
|
|
task: Task<Result<()>>,
|
|
}
|
|
|
|
pub struct SummaryIndex {
|
|
worktree: Model<Worktree>,
|
|
fs: Arc<dyn Fs>,
|
|
db_connection: heed::Env,
|
|
file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
|
|
summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
|
|
backlog: Arc<Mutex<SummaryBacklog>>,
|
|
_entry_ids_being_indexed: Arc<IndexingEntrySet>, // TODO can this be removed?
|
|
}
|
|
|
|
struct Backlogged {
|
|
paths_to_digest: channel::Receiver<Vec<(Arc<Path>, Option<SystemTime>)>>,
|
|
task: Task<Result<()>>,
|
|
}
|
|
|
|
struct MightNeedSummaryFiles {
|
|
files: channel::Receiver<UnsummarizedFile>,
|
|
task: Task<Result<()>>,
|
|
}
|
|
|
|
impl SummaryIndex {
|
|
pub fn new(
|
|
worktree: Model<Worktree>,
|
|
fs: Arc<dyn Fs>,
|
|
db_connection: heed::Env,
|
|
file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
|
|
summary_db: heed::Database<SerdeBincode<Blake3Digest>, Str>,
|
|
_entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
|
) -> Self {
|
|
Self {
|
|
worktree,
|
|
fs,
|
|
db_connection,
|
|
file_digest_db,
|
|
summary_db,
|
|
_entry_ids_being_indexed,
|
|
backlog: Default::default(),
|
|
}
|
|
}
|
|
|
|
pub fn file_digest_db(&self) -> heed::Database<Str, SerdeBincode<FileDigest>> {
|
|
self.file_digest_db
|
|
}
|
|
|
|
pub fn summary_db(&self) -> heed::Database<SerdeBincode<Blake3Digest>, Str> {
|
|
self.summary_db
|
|
}
|
|
|
|
pub fn index_entries_changed_on_disk(
|
|
&self,
|
|
is_auto_available: bool,
|
|
cx: &AppContext,
|
|
) -> impl Future<Output = Result<()>> {
|
|
let start = Instant::now();
|
|
let backlogged;
|
|
let digest;
|
|
let needs_summary;
|
|
let summaries;
|
|
let persist;
|
|
|
|
if is_auto_available {
|
|
let worktree = self.worktree.read(cx).snapshot();
|
|
let worktree_abs_path = worktree.abs_path().clone();
|
|
|
|
backlogged = self.scan_entries(worktree, cx);
|
|
digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
|
|
needs_summary = self.check_summary_cache(digest.files, cx);
|
|
summaries = self.summarize_files(needs_summary.files, cx);
|
|
persist = self.persist_summaries(summaries.files, cx);
|
|
} else {
|
|
// This feature is only staff-shipped, so make the rest of these no-ops.
|
|
backlogged = Backlogged {
|
|
paths_to_digest: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
digest = MightNeedSummaryFiles {
|
|
files: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
needs_summary = NeedsSummary {
|
|
files: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
summaries = SummarizeFiles {
|
|
files: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
persist = Task::ready(Ok(()));
|
|
}
|
|
|
|
async move {
|
|
futures::try_join!(
|
|
backlogged.task,
|
|
digest.task,
|
|
needs_summary.task,
|
|
summaries.task,
|
|
persist
|
|
)?;
|
|
|
|
if is_auto_available {
|
|
log::info!(
|
|
"Summarizing everything that changed on disk took {:?}",
|
|
start.elapsed()
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub fn index_updated_entries(
|
|
&mut self,
|
|
updated_entries: UpdatedEntriesSet,
|
|
is_auto_available: bool,
|
|
cx: &AppContext,
|
|
) -> impl Future<Output = Result<()>> {
|
|
let start = Instant::now();
|
|
let backlogged;
|
|
let digest;
|
|
let needs_summary;
|
|
let summaries;
|
|
let persist;
|
|
|
|
if is_auto_available {
|
|
let worktree = self.worktree.read(cx).snapshot();
|
|
let worktree_abs_path = worktree.abs_path().clone();
|
|
|
|
backlogged = self.scan_updated_entries(worktree, updated_entries.clone(), cx);
|
|
digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
|
|
needs_summary = self.check_summary_cache(digest.files, cx);
|
|
summaries = self.summarize_files(needs_summary.files, cx);
|
|
persist = self.persist_summaries(summaries.files, cx);
|
|
} else {
|
|
// This feature is only staff-shipped, so make the rest of these no-ops.
|
|
backlogged = Backlogged {
|
|
paths_to_digest: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
digest = MightNeedSummaryFiles {
|
|
files: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
needs_summary = NeedsSummary {
|
|
files: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
summaries = SummarizeFiles {
|
|
files: channel::unbounded().1,
|
|
task: Task::ready(Ok(())),
|
|
};
|
|
persist = Task::ready(Ok(()));
|
|
}
|
|
|
|
async move {
|
|
futures::try_join!(
|
|
backlogged.task,
|
|
digest.task,
|
|
needs_summary.task,
|
|
summaries.task,
|
|
persist
|
|
)?;
|
|
|
|
log::info!("Summarizing updated entries took {:?}", start.elapsed());
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn check_summary_cache(
|
|
&self,
|
|
mut might_need_summary: channel::Receiver<UnsummarizedFile>,
|
|
cx: &AppContext,
|
|
) -> NeedsSummary {
|
|
let db_connection = self.db_connection.clone();
|
|
let db = self.summary_db;
|
|
let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
|
|
let task = cx.background_executor().spawn(async move {
|
|
while let Some(file) = might_need_summary.next().await {
|
|
let tx = db_connection
|
|
.read_txn()
|
|
.context("Failed to create read transaction for checking which hashes are in summary cache")?;
|
|
|
|
match db.get(&tx, &file.digest) {
|
|
Ok(opt_answer) => {
|
|
if opt_answer.is_none() {
|
|
// It's not in the summary cache db, so we need to summarize it.
|
|
log::debug!("File {:?} (digest {:?}) was NOT in the db cache and needs to be resummarized.", file.path.display(), &file.digest);
|
|
needs_summary_tx.send(file).await?;
|
|
} else {
|
|
log::debug!("File {:?} (digest {:?}) was in the db cache and does not need to be resummarized.", file.path.display(), &file.digest);
|
|
}
|
|
}
|
|
Err(err) => {
|
|
log::error!("Reading from the summaries database failed: {:?}", err);
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
NeedsSummary {
|
|
files: needs_summary_rx,
|
|
task,
|
|
}
|
|
}
|
|
|
|
fn scan_entries(&self, worktree: Snapshot, cx: &AppContext) -> Backlogged {
|
|
let (tx, rx) = channel::bounded(512);
|
|
let db_connection = self.db_connection.clone();
|
|
let digest_db = self.file_digest_db;
|
|
let backlog = Arc::clone(&self.backlog);
|
|
let task = cx.background_executor().spawn(async move {
|
|
let txn = db_connection
|
|
.read_txn()
|
|
.context("failed to create read transaction")?;
|
|
|
|
for entry in worktree.files(false, 0) {
|
|
let needs_summary =
|
|
Self::add_to_backlog(Arc::clone(&backlog), digest_db, &txn, entry);
|
|
|
|
if !needs_summary.is_empty() {
|
|
tx.send(needs_summary).await?;
|
|
}
|
|
}
|
|
|
|
// TODO delete db entries for deleted files
|
|
|
|
Ok(())
|
|
});
|
|
|
|
Backlogged {
|
|
paths_to_digest: rx,
|
|
task,
|
|
}
|
|
}
|
|
|
|
fn add_to_backlog(
|
|
backlog: Arc<Mutex<SummaryBacklog>>,
|
|
digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
|
|
txn: &RoTxn<'_>,
|
|
entry: &Entry,
|
|
) -> Vec<(Arc<Path>, Option<SystemTime>)> {
|
|
let entry_db_key = db_key_for_path(&entry.path);
|
|
|
|
match digest_db.get(&txn, &entry_db_key) {
|
|
Ok(opt_saved_digest) => {
|
|
// The file path is the same, but the mtime is different. (Or there was no mtime.)
|
|
// It needs updating, so add it to the backlog! Then, if the backlog is full, drain it and summarize its contents.
|
|
if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
|
|
let mut backlog = backlog.lock();
|
|
|
|
log::info!(
|
|
"Inserting {:?} ({:?} bytes) into backlog",
|
|
&entry.path,
|
|
entry.size,
|
|
);
|
|
backlog.insert(Arc::clone(&entry.path), entry.size, entry.mtime);
|
|
|
|
if backlog.needs_drain() {
|
|
log::info!("Draining summary backlog...");
|
|
return backlog.drain().collect();
|
|
}
|
|
}
|
|
}
|
|
Err(err) => {
|
|
log::error!(
|
|
"Error trying to get file digest db entry {:?}: {:?}",
|
|
&entry_db_key,
|
|
err
|
|
);
|
|
}
|
|
}
|
|
|
|
Vec::new()
|
|
}
|
|
|
|
fn scan_updated_entries(
|
|
&self,
|
|
worktree: Snapshot,
|
|
updated_entries: UpdatedEntriesSet,
|
|
cx: &AppContext,
|
|
) -> Backlogged {
|
|
log::info!("Scanning for updated entries that might need summarization...");
|
|
let (tx, rx) = channel::bounded(512);
|
|
// let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
|
|
let db_connection = self.db_connection.clone();
|
|
let digest_db = self.file_digest_db;
|
|
let backlog = Arc::clone(&self.backlog);
|
|
let task = cx.background_executor().spawn(async move {
|
|
let txn = db_connection
|
|
.read_txn()
|
|
.context("failed to create read transaction")?;
|
|
|
|
for (path, entry_id, status) in updated_entries.iter() {
|
|
match status {
|
|
project::PathChange::Loaded
|
|
| project::PathChange::Added
|
|
| project::PathChange::Updated
|
|
| project::PathChange::AddedOrUpdated => {
|
|
if let Some(entry) = worktree.entry_for_id(*entry_id) {
|
|
if entry.is_file() {
|
|
let needs_summary = Self::add_to_backlog(
|
|
Arc::clone(&backlog),
|
|
digest_db,
|
|
&txn,
|
|
entry,
|
|
);
|
|
|
|
if !needs_summary.is_empty() {
|
|
tx.send(needs_summary).await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
project::PathChange::Removed => {
|
|
let _db_path = db_key_for_path(path);
|
|
// TODO delete db entries for deleted files
|
|
// deleted_entry_ranges_tx
|
|
// .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
|
|
// .await?;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
Backlogged {
|
|
paths_to_digest: rx,
|
|
// deleted_entry_ranges: deleted_entry_ranges_rx,
|
|
task,
|
|
}
|
|
}
|
|
|
|
fn digest_files(
|
|
&self,
|
|
paths: channel::Receiver<Vec<(Arc<Path>, Option<SystemTime>)>>,
|
|
worktree_abs_path: Arc<Path>,
|
|
cx: &AppContext,
|
|
) -> MightNeedSummaryFiles {
|
|
let fs = self.fs.clone();
|
|
let (rx, tx) = channel::bounded(2048);
|
|
let task = cx.spawn(|cx| async move {
|
|
cx.background_executor()
|
|
.scoped(|cx| {
|
|
for _ in 0..cx.num_cpus() {
|
|
cx.spawn(async {
|
|
while let Ok(pairs) = paths.recv().await {
|
|
// Note: we could process all these files concurrently if desired. Might or might not speed things up.
|
|
for (path, mtime) in pairs {
|
|
let entry_abs_path = worktree_abs_path.join(&path);
|
|
|
|
// Load the file's contents and compute its hash digest.
|
|
let unsummarized_file = {
|
|
let Some(contents) = fs
|
|
.load(&entry_abs_path)
|
|
.await
|
|
.with_context(|| {
|
|
format!("failed to read path {entry_abs_path:?}")
|
|
})
|
|
.log_err()
|
|
else {
|
|
continue;
|
|
};
|
|
|
|
let digest = {
|
|
let mut hasher = blake3::Hasher::new();
|
|
// Incorporate both the (relative) file path as well as the contents of the file into the hash.
|
|
// This is because in some languages and frameworks, identical files can do different things
|
|
// depending on their paths (e.g. Rails controllers). It's also why we send the path to the model.
|
|
hasher.update(path.display().to_string().as_bytes());
|
|
hasher.update(contents.as_bytes());
|
|
hasher.finalize().to_hex()
|
|
};
|
|
|
|
UnsummarizedFile {
|
|
digest,
|
|
contents,
|
|
path,
|
|
mtime,
|
|
}
|
|
};
|
|
|
|
if let Err(err) = rx
|
|
.send(unsummarized_file)
|
|
.map_err(|error| anyhow!(error))
|
|
.await
|
|
{
|
|
log::error!("Error: {:?}", err);
|
|
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
})
|
|
.await;
|
|
Ok(())
|
|
});
|
|
|
|
MightNeedSummaryFiles { files: tx, task }
|
|
}
|
|
|
|
fn summarize_files(
|
|
&self,
|
|
mut unsummarized_files: channel::Receiver<UnsummarizedFile>,
|
|
cx: &AppContext,
|
|
) -> SummarizeFiles {
|
|
let (summarized_tx, summarized_rx) = channel::bounded(512);
|
|
let task = cx.spawn(|cx| async move {
|
|
while let Some(file) = unsummarized_files.next().await {
|
|
log::debug!("Summarizing {:?}", file);
|
|
let summary = cx
|
|
.update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
|
|
.await
|
|
.unwrap_or_else(|err| {
|
|
// Log a warning because we'll continue anyway.
|
|
// In the future, we may want to try splitting it up into multiple requests and concatenating the summaries,
|
|
// but this might give bad summaries due to cutting off source code files in the middle.
|
|
log::warn!("Failed to summarize {} - {:?}", file.path.display(), err);
|
|
|
|
String::new()
|
|
});
|
|
|
|
// Note that the summary could be empty because of an error talking to a cloud provider,
|
|
// e.g. because the context limit was exceeded. In that case, we return Ok(String::new()).
|
|
if !summary.is_empty() {
|
|
summarized_tx
|
|
.send(SummarizedFile {
|
|
path: file.path.display().to_string(),
|
|
digest: file.digest,
|
|
summary,
|
|
mtime: file.mtime,
|
|
})
|
|
.await?
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
SummarizeFiles {
|
|
files: summarized_rx,
|
|
task,
|
|
}
|
|
}
|
|
|
|
fn summarize_code(
|
|
code: &str,
|
|
path: &Path,
|
|
cx: &AppContext,
|
|
) -> impl Future<Output = Result<String>> {
|
|
let start = Instant::now();
|
|
let (summary_model_id, use_cache): (LanguageModelId, bool) = (
|
|
"Qwen/Qwen2-7B-Instruct".to_string().into(), // TODO read this from the user's settings.
|
|
false, // qwen2 doesn't have a cache, but we should probably infer this from the model
|
|
);
|
|
let Some(model) = LanguageModelRegistry::read_global(cx)
|
|
.available_models(cx)
|
|
.find(|model| &model.id() == &summary_model_id)
|
|
else {
|
|
return cx.background_executor().spawn(async move {
|
|
Err(anyhow!("Couldn't find the preferred summarization model ({:?}) in the language registry's available models", summary_model_id))
|
|
});
|
|
};
|
|
let utf8_path = path.to_string_lossy();
|
|
const PROMPT_BEFORE_CODE: &str = "Summarize what the code in this file does in 3 sentences, using no newlines or bullet points in the summary:";
|
|
let prompt = format!("{PROMPT_BEFORE_CODE}\n{utf8_path}:\n{code}");
|
|
|
|
log::debug!(
|
|
"Summarizing code by sending this prompt to {:?}: {:?}",
|
|
model.name(),
|
|
&prompt
|
|
);
|
|
|
|
let request = LanguageModelRequest {
|
|
messages: vec![LanguageModelRequestMessage {
|
|
role: Role::User,
|
|
content: vec![prompt.into()],
|
|
cache: use_cache,
|
|
}],
|
|
tools: Vec::new(),
|
|
stop: Vec::new(),
|
|
temperature: None,
|
|
};
|
|
|
|
let code_len = code.len();
|
|
cx.spawn(|cx| async move {
|
|
let stream = model.stream_completion(request, &cx);
|
|
cx.background_executor()
|
|
.spawn(async move {
|
|
let answer: String = stream
|
|
.await?
|
|
.filter_map(|event| async {
|
|
if let Ok(LanguageModelCompletionEvent::Text(text)) = event {
|
|
Some(text)
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect()
|
|
.await;
|
|
|
|
log::info!(
|
|
"It took {:?} to summarize {:?} bytes of code.",
|
|
start.elapsed(),
|
|
code_len
|
|
);
|
|
|
|
log::debug!("Summary was: {:?}", &answer);
|
|
|
|
Ok(answer)
|
|
})
|
|
.await
|
|
|
|
// TODO if summarization failed, put it back in the backlog!
|
|
})
|
|
}
|
|
|
|
fn persist_summaries(
|
|
&self,
|
|
summaries: channel::Receiver<SummarizedFile>,
|
|
cx: &AppContext,
|
|
) -> Task<Result<()>> {
|
|
let db_connection = self.db_connection.clone();
|
|
let digest_db = self.file_digest_db;
|
|
let summary_db = self.summary_db;
|
|
cx.background_executor().spawn(async move {
|
|
let mut summaries = summaries.chunks_timeout(4096, Duration::from_secs(2));
|
|
while let Some(summaries) = summaries.next().await {
|
|
let mut txn = db_connection.write_txn()?;
|
|
for file in &summaries {
|
|
log::debug!(
|
|
"Saving summary of {:?} - which is {} bytes of summary for content digest {:?}",
|
|
&file.path,
|
|
file.summary.len(),
|
|
file.digest
|
|
);
|
|
digest_db.put(
|
|
&mut txn,
|
|
&file.path,
|
|
&FileDigest {
|
|
mtime: file.mtime,
|
|
digest: file.digest,
|
|
},
|
|
)?;
|
|
summary_db.put(&mut txn, &file.digest, &file.summary)?;
|
|
}
|
|
txn.commit()?;
|
|
|
|
drop(summaries);
|
|
log::debug!("committed summaries");
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
/// Empty out the backlog of files that haven't been resummarized, and resummarize them immediately.
|
|
pub(crate) fn flush_backlog(
|
|
&self,
|
|
worktree_abs_path: Arc<Path>,
|
|
cx: &AppContext,
|
|
) -> impl Future<Output = Result<()>> {
|
|
let start = Instant::now();
|
|
let backlogged = {
|
|
let (tx, rx) = channel::bounded(512);
|
|
let needs_summary: Vec<(Arc<Path>, Option<SystemTime>)> = {
|
|
let mut backlog = self.backlog.lock();
|
|
|
|
backlog.drain().collect()
|
|
};
|
|
|
|
let task = cx.background_executor().spawn(async move {
|
|
tx.send(needs_summary).await?;
|
|
Ok(())
|
|
});
|
|
|
|
Backlogged {
|
|
paths_to_digest: rx,
|
|
task,
|
|
}
|
|
};
|
|
|
|
let digest = self.digest_files(backlogged.paths_to_digest, worktree_abs_path, cx);
|
|
let needs_summary = self.check_summary_cache(digest.files, cx);
|
|
let summaries = self.summarize_files(needs_summary.files, cx);
|
|
let persist = self.persist_summaries(summaries.files, cx);
|
|
|
|
async move {
|
|
futures::try_join!(
|
|
backlogged.task,
|
|
digest.task,
|
|
needs_summary.task,
|
|
summaries.task,
|
|
persist
|
|
)?;
|
|
|
|
log::info!("Summarizing backlogged entries took {:?}", start.elapsed());
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub(crate) fn backlog_len(&self) -> usize {
|
|
self.backlog.lock().len()
|
|
}
|
|
}
|
|
|
|
fn db_key_for_path(path: &Arc<Path>) -> String {
|
|
path.to_string_lossy().replace('/', "\0")
|
|
}
|