assistant2: Add thread persistence (#23582)

This PR adds persistence for threads in Assistant2.

Threads are now persisted to an LMDB database.

Release Notes:

- N/A
This commit is contained in:
Marshall Bowers 2025-01-23 19:09:26 -05:00 committed by GitHub
parent fb63f61755
commit c55cdd0cb9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 371 additions and 249 deletions

3
Cargo.lock generated
View file

@ -456,6 +456,7 @@ dependencies = [
"futures 0.3.31", "futures 0.3.31",
"fuzzy", "fuzzy",
"gpui", "gpui",
"heed",
"html_to_markdown", "html_to_markdown",
"http_client", "http_client",
"indoc", "indoc",
@ -470,6 +471,7 @@ dependencies = [
"menu", "menu",
"multi_buffer", "multi_buffer",
"parking_lot", "parking_lot",
"paths",
"picker", "picker",
"project", "project",
"prompt_library", "prompt_library",
@ -490,7 +492,6 @@ dependencies = [
"time", "time",
"time_format", "time_format",
"ui", "ui",
"unindent",
"util", "util",
"uuid", "uuid",
"workspace", "workspace",

View file

@ -39,6 +39,7 @@ fs.workspace = true
futures.workspace = true futures.workspace = true
fuzzy.workspace = true fuzzy.workspace = true
gpui.workspace = true gpui.workspace = true
heed.workspace = true
html_to_markdown.workspace = true html_to_markdown.workspace = true
http_client.workspace = true http_client.workspace = true
itertools.workspace = true itertools.workspace = true
@ -52,6 +53,7 @@ markdown.workspace = true
menu.workspace = true menu.workspace = true
multi_buffer.workspace = true multi_buffer.workspace = true
parking_lot.workspace = true parking_lot.workspace = true
paths.workspace = true
picker.workspace = true picker.workspace = true
project.workspace = true project.workspace = true
prompt_library.workspace = true prompt_library.workspace = true
@ -71,7 +73,6 @@ theme.workspace = true
time.workspace = true time.workspace = true
time_format.workspace = true time_format.workspace = true
ui.workspace = true ui.workspace = true
unindent.workspace = true
util.workspace = true util.workspace = true
uuid.workspace = true uuid.workspace = true
workspace.workspace = true workspace.workspace = true

View file

@ -16,12 +16,14 @@ use ui::prelude::*;
use workspace::Workspace; use workspace::Workspace;
use crate::thread::{MessageId, Thread, ThreadError, ThreadEvent}; use crate::thread::{MessageId, Thread, ThreadError, ThreadEvent};
use crate::thread_store::ThreadStore;
use crate::ui::ContextPill; use crate::ui::ContextPill;
pub struct ActiveThread { pub struct ActiveThread {
workspace: WeakView<Workspace>, workspace: WeakView<Workspace>,
language_registry: Arc<LanguageRegistry>, language_registry: Arc<LanguageRegistry>,
tools: Arc<ToolWorkingSet>, tools: Arc<ToolWorkingSet>,
thread_store: Model<ThreadStore>,
thread: Model<Thread>, thread: Model<Thread>,
messages: Vec<MessageId>, messages: Vec<MessageId>,
list_state: ListState, list_state: ListState,
@ -33,6 +35,7 @@ pub struct ActiveThread {
impl ActiveThread { impl ActiveThread {
pub fn new( pub fn new(
thread: Model<Thread>, thread: Model<Thread>,
thread_store: Model<ThreadStore>,
workspace: WeakView<Workspace>, workspace: WeakView<Workspace>,
language_registry: Arc<LanguageRegistry>, language_registry: Arc<LanguageRegistry>,
tools: Arc<ToolWorkingSet>, tools: Arc<ToolWorkingSet>,
@ -47,6 +50,7 @@ impl ActiveThread {
workspace, workspace,
language_registry, language_registry,
tools, tools,
thread_store,
thread: thread.clone(), thread: thread.clone(),
messages: Vec::new(), messages: Vec::new(),
rendered_messages_by_id: HashMap::default(), rendered_messages_by_id: HashMap::default(),
@ -192,8 +196,13 @@ impl ActiveThread {
ThreadEvent::ShowError(error) => { ThreadEvent::ShowError(error) => {
self.last_error = Some(error.clone()); self.last_error = Some(error.clone());
} }
ThreadEvent::StreamedCompletion => {} ThreadEvent::StreamedCompletion | ThreadEvent::SummaryChanged => {
ThreadEvent::SummaryChanged => {} self.thread_store
.update(cx, |thread_store, cx| {
thread_store.save_thread(&self.thread, cx)
})
.detach_and_log_err(cx);
}
ThreadEvent::StreamedAssistantText(message_id, text) => { ThreadEvent::StreamedAssistantText(message_id, text) => {
if let Some(markdown) = self.rendered_messages_by_id.get_mut(&message_id) { if let Some(markdown) = self.rendered_messages_by_id.get_mut(&message_id) {
markdown.update(cx, |markdown, cx| { markdown.update(cx, |markdown, cx| {
@ -211,6 +220,12 @@ impl ActiveThread {
self.push_message(message_id, message_text, cx); self.push_message(message_id, message_text, cx);
} }
self.thread_store
.update(cx, |thread_store, cx| {
thread_store.save_thread(&self.thread, cx)
})
.detach_and_log_err(cx);
cx.notify(); cx.notify();
} }
ThreadEvent::UsePendingTools => { ThreadEvent::UsePendingTools => {

View file

@ -179,6 +179,7 @@ impl AssistantPanel {
thread: cx.new_view(|cx| { thread: cx.new_view(|cx| {
ActiveThread::new( ActiveThread::new(
thread.clone(), thread.clone(),
thread_store.clone(),
workspace, workspace,
language_registry, language_registry,
tools.clone(), tools.clone(),
@ -239,6 +240,7 @@ impl AssistantPanel {
self.thread = cx.new_view(|cx| { self.thread = cx.new_view(|cx| {
ActiveThread::new( ActiveThread::new(
thread.clone(), thread.clone(),
self.thread_store.clone(),
self.workspace.clone(), self.workspace.clone(),
self.language_registry.clone(), self.language_registry.clone(),
self.tools.clone(), self.tools.clone(),
@ -361,34 +363,41 @@ impl AssistantPanel {
}) })
} }
pub(crate) fn open_thread(&mut self, thread_id: &ThreadId, cx: &mut ViewContext<Self>) { pub(crate) fn open_thread(
let Some(thread) = self &mut self,
thread_id: &ThreadId,
cx: &mut ViewContext<Self>,
) -> Task<Result<()>> {
let open_thread_task = self
.thread_store .thread_store
.update(cx, |this, cx| this.open_thread(thread_id, cx)) .update(cx, |this, cx| this.open_thread(thread_id, cx));
else {
return;
};
self.active_view = ActiveView::Thread; cx.spawn(|this, mut cx| async move {
self.thread = cx.new_view(|cx| { let thread = open_thread_task.await?;
this.update(&mut cx, |this, cx| {
this.active_view = ActiveView::Thread;
this.thread = cx.new_view(|cx| {
ActiveThread::new( ActiveThread::new(
thread.clone(), thread.clone(),
self.workspace.clone(), this.thread_store.clone(),
self.language_registry.clone(), this.workspace.clone(),
self.tools.clone(), this.language_registry.clone(),
this.tools.clone(),
cx, cx,
) )
}); });
self.message_editor = cx.new_view(|cx| { this.message_editor = cx.new_view(|cx| {
MessageEditor::new( MessageEditor::new(
self.fs.clone(), this.fs.clone(),
self.workspace.clone(), this.workspace.clone(),
self.thread_store.downgrade(), this.thread_store.downgrade(),
thread, thread,
cx, cx,
) )
}); });
self.message_editor.focus_handle(cx).focus(cx); this.message_editor.focus_handle(cx).focus(cx);
})
})
} }
pub(crate) fn open_configuration(&mut self, cx: &mut ViewContext<Self>) { pub(crate) fn open_configuration(&mut self, cx: &mut ViewContext<Self>) {
@ -437,7 +446,8 @@ impl AssistantPanel {
pub(crate) fn delete_thread(&mut self, thread_id: &ThreadId, cx: &mut ViewContext<Self>) { pub(crate) fn delete_thread(&mut self, thread_id: &ThreadId, cx: &mut ViewContext<Self>) {
self.thread_store self.thread_store
.update(cx, |this, cx| this.delete_thread(thread_id, cx)); .update(cx, |this, cx| this.delete_thread(thread_id, cx))
.detach_and_log_err(cx);
} }
} }
@ -655,7 +665,7 @@ impl AssistantPanel {
fn render_thread_empty_state(&self, cx: &mut ViewContext<Self>) -> impl IntoElement { fn render_thread_empty_state(&self, cx: &mut ViewContext<Self>) -> impl IntoElement {
let recent_threads = self let recent_threads = self
.thread_store .thread_store
.update(cx, |this, cx| this.recent_threads(3, cx)); .update(cx, |this, _cx| this.recent_threads(3));
v_flex() v_flex()
.gap_2() .gap_2()

View file

@ -6,10 +6,12 @@ mod thread_context_picker;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use anyhow::{anyhow, Result};
use editor::Editor; use editor::Editor;
use file_context_picker::render_file_context_entry; use file_context_picker::render_file_context_entry;
use gpui::{ use gpui::{
AppContext, DismissEvent, EventEmitter, FocusHandle, FocusableView, View, WeakModel, WeakView, AppContext, DismissEvent, EventEmitter, FocusHandle, FocusableView, Task, View, WeakModel,
WeakView,
}; };
use project::ProjectPath; use project::ProjectPath;
use thread_context_picker::{render_thread_context_entry, ThreadContextEntry}; use thread_context_picker::{render_thread_context_entry, ThreadContextEntry};
@ -237,7 +239,8 @@ impl ContextPicker {
}, },
move |cx| { move |cx| {
context_picker.update(cx, |this, cx| { context_picker.update(cx, |this, cx| {
this.add_recent_thread(thread.clone(), cx); this.add_recent_thread(thread.clone(), cx)
.detach_and_log_err(cx);
}) })
}, },
) )
@ -260,25 +263,32 @@ impl ContextPicker {
cx.notify(); cx.notify();
} }
fn add_recent_thread(&self, thread: ThreadContextEntry, cx: &mut ViewContext<Self>) { fn add_recent_thread(
&self,
thread: ThreadContextEntry,
cx: &mut ViewContext<Self>,
) -> Task<Result<()>> {
let Some(context_store) = self.context_store.upgrade() else { let Some(context_store) = self.context_store.upgrade() else {
return; return Task::ready(Err(anyhow!("context store not available")));
}; };
let Some(thread) = self let Some(thread_store) = self
.thread_store .thread_store
.clone() .as_ref()
.and_then(|this| this.upgrade()) .and_then(|thread_store| thread_store.upgrade())
.and_then(|this| this.update(cx, |this, cx| this.open_thread(&thread.id, cx)))
else { else {
return; return Task::ready(Err(anyhow!("thread store not available")));
}; };
context_store.update(cx, |context_store, cx| { let open_thread_task = thread_store.update(cx, |this, cx| this.open_thread(&thread.id, cx));
cx.spawn(|this, mut cx| async move {
let thread = open_thread_task.await?;
context_store.update(&mut cx, |context_store, cx| {
context_store.add_thread(thread, cx); context_store.add_thread(thread, cx);
}); })?;
cx.notify(); this.update(&mut cx, |_this, cx| cx.notify())
})
} }
fn recent_entries(&self, cx: &mut WindowContext) -> Vec<RecentEntry> { fn recent_entries(&self, cx: &mut WindowContext) -> Vec<RecentEntry> {
@ -332,19 +342,17 @@ impl ContextPicker {
return recent; return recent;
}; };
thread_store.update(cx, |thread_store, cx| { thread_store.update(cx, |thread_store, _cx| {
recent.extend( recent.extend(
thread_store thread_store
.threads(cx) .threads()
.into_iter() .into_iter()
.filter(|thread| !current_threads.contains(thread.read(cx).id())) .filter(|thread| !current_threads.contains(&thread.id))
.take(2) .take(2)
.map(|thread| { .map(|thread| {
let thread = thread.read(cx);
RecentEntry::Thread(ThreadContextEntry { RecentEntry::Thread(ThreadContextEntry {
id: thread.id().clone(), id: thread.id,
summary: thread.summary_or_default(), summary: thread.summary,
}) })
}), }),
) )

View file

@ -99,13 +99,12 @@ impl PickerDelegate for ThreadContextPickerDelegate {
} }
fn update_matches(&mut self, query: String, cx: &mut ViewContext<Picker<Self>>) -> Task<()> { fn update_matches(&mut self, query: String, cx: &mut ViewContext<Picker<Self>>) -> Task<()> {
let Ok(threads) = self.thread_store.update(cx, |this, cx| { let Ok(threads) = self.thread_store.update(cx, |this, _cx| {
this.threads(cx) this.threads()
.into_iter() .into_iter()
.map(|thread| { .map(|thread| ThreadContextEntry {
let id = thread.read(cx).id().clone(); id: thread.id,
let summary = thread.read(cx).summary_or_default(); summary: thread.summary,
ThreadContextEntry { id, summary }
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
}) else { }) else {
@ -159,19 +158,23 @@ impl PickerDelegate for ThreadContextPickerDelegate {
return; return;
}; };
let Some(thread) = thread_store.update(cx, |this, cx| this.open_thread(&entry.id, cx)) let open_thread_task = thread_store.update(cx, |this, cx| this.open_thread(&entry.id, cx));
else {
return;
};
self.context_store cx.spawn(|this, mut cx| async move {
let thread = open_thread_task.await?;
this.update(&mut cx, |this, cx| {
this.delegate
.context_store
.update(cx, |context_store, cx| context_store.add_thread(thread, cx)) .update(cx, |context_store, cx| context_store.add_thread(thread, cx))
.ok(); .ok();
match self.confirm_behavior { match this.delegate.confirm_behavior {
ConfirmBehavior::KeepOpen => {} ConfirmBehavior::KeepOpen => {}
ConfirmBehavior::Close => self.dismissed(cx), ConfirmBehavior::Close => this.delegate.dismissed(cx),
} }
})
})
.detach_and_log_err(cx);
} }
fn dismissed(&mut self, cx: &mut ViewContext<Picker<Self>>) { fn dismissed(&mut self, cx: &mut ViewContext<Picker<Self>>) {

View file

@ -18,6 +18,7 @@ use util::{post_inc, TryFutureExt as _};
use uuid::Uuid; use uuid::Uuid;
use crate::context::{attach_context_to_message, ContextId, ContextSnapshot}; use crate::context::{attach_context_to_message, ContextId, ContextSnapshot};
use crate::thread_store::SavedThread;
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub enum RequestKind { pub enum RequestKind {
@ -94,6 +95,40 @@ impl Thread {
} }
} }
pub fn from_saved(
id: ThreadId,
saved: SavedThread,
tools: Arc<ToolWorkingSet>,
_cx: &mut ModelContext<Self>,
) -> Self {
let next_message_id = MessageId(saved.messages.len());
Self {
id,
updated_at: saved.updated_at,
summary: Some(saved.summary),
pending_summary: Task::ready(None),
messages: saved
.messages
.into_iter()
.map(|message| Message {
id: message.id,
role: message.role,
text: message.text,
})
.collect(),
next_message_id,
context: BTreeMap::default(),
context_by_message: HashMap::default(),
completion_count: 0,
pending_completions: Vec::new(),
tools,
tool_uses_by_message: HashMap::default(),
tool_results_by_message: HashMap::default(),
pending_tool_uses_by_id: HashMap::default(),
}
}
pub fn id(&self) -> &ThreadId { pub fn id(&self) -> &ThreadId {
&self.id &self.id
} }

View file

@ -5,8 +5,7 @@ use gpui::{
use time::{OffsetDateTime, UtcOffset}; use time::{OffsetDateTime, UtcOffset};
use ui::{prelude::*, IconButtonShape, ListItem, ListItemSpacing, Tooltip}; use ui::{prelude::*, IconButtonShape, ListItem, ListItemSpacing, Tooltip};
use crate::thread::Thread; use crate::thread_store::{SavedThreadMetadata, ThreadStore};
use crate::thread_store::ThreadStore;
use crate::{AssistantPanel, RemoveSelectedThread}; use crate::{AssistantPanel, RemoveSelectedThread};
pub struct ThreadHistory { pub struct ThreadHistory {
@ -33,8 +32,7 @@ impl ThreadHistory {
} }
pub fn select_prev(&mut self, _: &menu::SelectPrev, cx: &mut ViewContext<Self>) { pub fn select_prev(&mut self, _: &menu::SelectPrev, cx: &mut ViewContext<Self>) {
let count = self.thread_store.read(cx).non_empty_len(cx); let count = self.thread_store.read(cx).thread_count();
if count > 0 { if count > 0 {
if self.selected_index == 0 { if self.selected_index == 0 {
self.set_selected_index(count - 1, cx); self.set_selected_index(count - 1, cx);
@ -45,8 +43,7 @@ impl ThreadHistory {
} }
pub fn select_next(&mut self, _: &menu::SelectNext, cx: &mut ViewContext<Self>) { pub fn select_next(&mut self, _: &menu::SelectNext, cx: &mut ViewContext<Self>) {
let count = self.thread_store.read(cx).non_empty_len(cx); let count = self.thread_store.read(cx).thread_count();
if count > 0 { if count > 0 {
if self.selected_index == count - 1 { if self.selected_index == count - 1 {
self.set_selected_index(0, cx); self.set_selected_index(0, cx);
@ -57,14 +54,14 @@ impl ThreadHistory {
} }
fn select_first(&mut self, _: &menu::SelectFirst, cx: &mut ViewContext<Self>) { fn select_first(&mut self, _: &menu::SelectFirst, cx: &mut ViewContext<Self>) {
let count = self.thread_store.read(cx).non_empty_len(cx); let count = self.thread_store.read(cx).thread_count();
if count > 0 { if count > 0 {
self.set_selected_index(0, cx); self.set_selected_index(0, cx);
} }
} }
fn select_last(&mut self, _: &menu::SelectLast, cx: &mut ViewContext<Self>) { fn select_last(&mut self, _: &menu::SelectLast, cx: &mut ViewContext<Self>) {
let count = self.thread_store.read(cx).non_empty_len(cx); let count = self.thread_store.read(cx).thread_count();
if count > 0 { if count > 0 {
self.set_selected_index(count - 1, cx); self.set_selected_index(count - 1, cx);
} }
@ -78,14 +75,11 @@ impl ThreadHistory {
} }
fn confirm(&mut self, _: &menu::Confirm, cx: &mut ViewContext<Self>) { fn confirm(&mut self, _: &menu::Confirm, cx: &mut ViewContext<Self>) {
let threads = self.thread_store.update(cx, |this, cx| this.threads(cx)); let threads = self.thread_store.update(cx, |this, _cx| this.threads());
if let Some(thread) = threads.get(self.selected_index) { if let Some(thread) = threads.get(self.selected_index) {
self.assistant_panel self.assistant_panel
.update(cx, move |this, cx| { .update(cx, move |this, cx| this.open_thread(&thread.id, cx))
let thread_id = thread.read(cx).id().clone();
this.open_thread(&thread_id, cx)
})
.ok(); .ok();
cx.notify(); cx.notify();
@ -93,13 +87,12 @@ impl ThreadHistory {
} }
fn remove_selected_thread(&mut self, _: &RemoveSelectedThread, cx: &mut ViewContext<Self>) { fn remove_selected_thread(&mut self, _: &RemoveSelectedThread, cx: &mut ViewContext<Self>) {
let threads = self.thread_store.update(cx, |this, cx| this.threads(cx)); let threads = self.thread_store.update(cx, |this, _cx| this.threads());
if let Some(thread) = threads.get(self.selected_index) { if let Some(thread) = threads.get(self.selected_index) {
self.assistant_panel self.assistant_panel
.update(cx, |this, cx| { .update(cx, |this, cx| {
let thread_id = thread.read(cx).id().clone(); this.delete_thread(&thread.id, cx);
this.delete_thread(&thread_id, cx);
}) })
.ok(); .ok();
@ -116,7 +109,7 @@ impl FocusableView for ThreadHistory {
impl Render for ThreadHistory { impl Render for ThreadHistory {
fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement { fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
let threads = self.thread_store.update(cx, |this, cx| this.threads(cx)); let threads = self.thread_store.update(cx, |this, _cx| this.threads());
let selected_index = self.selected_index; let selected_index = self.selected_index;
v_flex() v_flex()
@ -172,14 +165,14 @@ impl Render for ThreadHistory {
#[derive(IntoElement)] #[derive(IntoElement)]
pub struct PastThread { pub struct PastThread {
thread: Model<Thread>, thread: SavedThreadMetadata,
assistant_panel: WeakView<AssistantPanel>, assistant_panel: WeakView<AssistantPanel>,
selected: bool, selected: bool,
} }
impl PastThread { impl PastThread {
pub fn new( pub fn new(
thread: Model<Thread>, thread: SavedThreadMetadata,
assistant_panel: WeakView<AssistantPanel>, assistant_panel: WeakView<AssistantPanel>,
selected: bool, selected: bool,
) -> Self { ) -> Self {
@ -193,14 +186,10 @@ impl PastThread {
impl RenderOnce for PastThread { impl RenderOnce for PastThread {
fn render(self, cx: &mut WindowContext) -> impl IntoElement { fn render(self, cx: &mut WindowContext) -> impl IntoElement {
let (id, summary) = { let summary = self.thread.summary;
let thread = self.thread.read(cx);
(thread.id().clone(), thread.summary_or_default())
};
let thread_timestamp = time_format::format_localized_timestamp( let thread_timestamp = time_format::format_localized_timestamp(
OffsetDateTime::from_unix_timestamp(self.thread.read(cx).updated_at().timestamp()) OffsetDateTime::from_unix_timestamp(self.thread.updated_at.timestamp()).unwrap(),
.unwrap(),
OffsetDateTime::now_utc(), OffsetDateTime::now_utc(),
self.assistant_panel self.assistant_panel
.update(cx, |this, _cx| this.local_timezone()) .update(cx, |this, _cx| this.local_timezone())
@ -208,7 +197,7 @@ impl RenderOnce for PastThread {
time_format::TimestampFormat::EnhancedAbsolute, time_format::TimestampFormat::EnhancedAbsolute,
); );
ListItem::new(("past-thread", self.thread.entity_id())) ListItem::new(SharedString::from(self.thread.id.to_string()))
.outlined() .outlined()
.toggle_state(self.selected) .toggle_state(self.selected)
.start_slot( .start_slot(
@ -233,7 +222,7 @@ impl RenderOnce for PastThread {
.tooltip(|cx| Tooltip::text("Delete Thread", cx)) .tooltip(|cx| Tooltip::text("Delete Thread", cx))
.on_click({ .on_click({
let assistant_panel = self.assistant_panel.clone(); let assistant_panel = self.assistant_panel.clone();
let id = id.clone(); let id = self.thread.id.clone();
move |_event, cx| { move |_event, cx| {
assistant_panel assistant_panel
.update(cx, |this, cx| { .update(cx, |this, cx| {
@ -246,11 +235,11 @@ impl RenderOnce for PastThread {
) )
.on_click({ .on_click({
let assistant_panel = self.assistant_panel.clone(); let assistant_panel = self.assistant_panel.clone();
let id = id.clone(); let id = self.thread.id.clone();
move |_event, cx| { move |_event, cx| {
assistant_panel assistant_panel
.update(cx, |this, cx| { .update(cx, |this, cx| {
this.open_thread(&id, cx); this.open_thread(&id, cx).detach_and_log_err(cx);
}) })
.ok(); .ok();
} }

View file

@ -1,16 +1,23 @@
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::{anyhow, Result};
use assistant_tool::{ToolId, ToolWorkingSet}; use assistant_tool::{ToolId, ToolWorkingSet};
use chrono::{DateTime, Utc};
use collections::HashMap; use collections::HashMap;
use context_server::manager::ContextServerManager; use context_server::manager::ContextServerManager;
use context_server::{ContextServerFactoryRegistry, ContextServerTool}; use context_server::{ContextServerFactoryRegistry, ContextServerTool};
use gpui::{prelude::*, AppContext, Model, ModelContext, Task}; use futures::future::{self, BoxFuture, Shared};
use futures::FutureExt as _;
use gpui::{prelude::*, AppContext, BackgroundExecutor, Model, ModelContext, SharedString, Task};
use heed::types::SerdeBincode;
use heed::Database;
use language_model::Role;
use project::Project; use project::Project;
use unindent::Unindent; use serde::{Deserialize, Serialize};
use util::ResultExt as _; use util::ResultExt as _;
use crate::thread::{Thread, ThreadId}; use crate::thread::{MessageId, Thread, ThreadId};
pub struct ThreadStore { pub struct ThreadStore {
#[allow(unused)] #[allow(unused)]
@ -18,7 +25,8 @@ pub struct ThreadStore {
tools: Arc<ToolWorkingSet>, tools: Arc<ToolWorkingSet>,
context_server_manager: Model<ContextServerManager>, context_server_manager: Model<ContextServerManager>,
context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>, context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
threads: Vec<Model<Thread>>, threads: Vec<SavedThreadMetadata>,
database_future: Shared<BoxFuture<'static, Result<Arc<ThreadsDatabase>, Arc<anyhow::Error>>>>,
} }
impl ThreadStore { impl ThreadStore {
@ -35,61 +43,138 @@ impl ThreadStore {
ContextServerManager::new(context_server_factory_registry, project.clone(), cx) ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
}); });
let mut this = Self { let executor = cx.background_executor().clone();
let database_future = executor
.spawn({
let executor = executor.clone();
let database_path = paths::support_dir().join("threads/threads-db.0.mdb");
async move { ThreadsDatabase::new(database_path, executor) }
})
.then(|result| future::ready(result.map(Arc::new).map_err(Arc::new)))
.boxed()
.shared();
let this = Self {
project, project,
tools, tools,
context_server_manager, context_server_manager,
context_server_tool_ids: HashMap::default(), context_server_tool_ids: HashMap::default(),
threads: Vec::new(), threads: Vec::new(),
database_future,
}; };
this.mock_recent_threads(cx);
this.register_context_server_handlers(cx); this.register_context_server_handlers(cx);
this this
})?; })?;
this.update(&mut cx, |this, cx| this.reload(cx))?.await?;
Ok(this) Ok(this)
}) })
} }
/// Returns the number of non-empty threads. /// Returns the number of threads.
pub fn non_empty_len(&self, cx: &AppContext) -> usize { pub fn thread_count(&self) -> usize {
self.threads self.threads.len()
.iter()
.filter(|thread| !thread.read(cx).is_empty())
.count()
} }
pub fn threads(&self, cx: &ModelContext<Self>) -> Vec<Model<Thread>> { pub fn threads(&self) -> Vec<SavedThreadMetadata> {
let mut threads = self let mut threads = self.threads.iter().cloned().collect::<Vec<_>>();
.threads threads.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.updated_at));
.iter()
.filter(|thread| !thread.read(cx).is_empty())
.cloned()
.collect::<Vec<_>>();
threads.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.read(cx).updated_at()));
threads threads
} }
pub fn recent_threads(&self, limit: usize, cx: &ModelContext<Self>) -> Vec<Model<Thread>> { pub fn recent_threads(&self, limit: usize) -> Vec<SavedThreadMetadata> {
self.threads(cx).into_iter().take(limit).collect() self.threads().into_iter().take(limit).collect()
} }
pub fn create_thread(&mut self, cx: &mut ModelContext<Self>) -> Model<Thread> { pub fn create_thread(&mut self, cx: &mut ModelContext<Self>) -> Model<Thread> {
let thread = cx.new_model(|cx| Thread::new(self.tools.clone(), cx)); cx.new_model(|cx| Thread::new(self.tools.clone(), cx))
self.threads.push(thread.clone());
thread
} }
pub fn open_thread(&self, id: &ThreadId, cx: &mut ModelContext<Self>) -> Option<Model<Thread>> { pub fn open_thread(
self.threads &self,
.iter() id: &ThreadId,
.find(|thread| thread.read(cx).id() == id) cx: &mut ModelContext<Self>,
.cloned() ) -> Task<Result<Model<Thread>>> {
let id = id.clone();
let database_future = self.database_future.clone();
cx.spawn(|this, mut cx| async move {
let database = database_future.await.map_err(|err| anyhow!(err))?;
let thread = database
.try_find_thread(id.clone())
.await?
.ok_or_else(|| anyhow!("no thread found with ID: {id:?}"))?;
this.update(&mut cx, |this, cx| {
cx.new_model(|cx| Thread::from_saved(id.clone(), thread, this.tools.clone(), cx))
})
})
} }
pub fn delete_thread(&mut self, id: &ThreadId, cx: &mut ModelContext<Self>) { pub fn save_thread(
self.threads.retain(|thread| thread.read(cx).id() != id); &self,
thread: &Model<Thread>,
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
let (metadata, thread) = thread.update(cx, |thread, _cx| {
let id = thread.id().clone();
let thread = SavedThread {
summary: thread.summary_or_default(),
updated_at: thread.updated_at(),
messages: thread
.messages()
.map(|message| SavedMessage {
id: message.id,
role: message.role,
text: message.text.clone(),
})
.collect(),
};
(id, thread)
});
let database_future = self.database_future.clone();
cx.spawn(|this, mut cx| async move {
let database = database_future.await.map_err(|err| anyhow!(err))?;
database.save_thread(metadata, thread).await?;
this.update(&mut cx, |this, cx| this.reload(cx))?.await
})
}
pub fn delete_thread(
&mut self,
id: &ThreadId,
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
let id = id.clone();
let database_future = self.database_future.clone();
cx.spawn(|this, mut cx| async move {
let database = database_future.await.map_err(|err| anyhow!(err))?;
database.delete_thread(id.clone()).await?;
this.update(&mut cx, |this, _cx| {
this.threads.retain(|thread| thread.id != id)
})
})
}
fn reload(&self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
let database_future = self.database_future.clone();
cx.spawn(|this, mut cx| async move {
let threads = database_future
.await
.map_err(|err| anyhow!(err))?
.list_threads()
.await?;
this.update(&mut cx, |this, cx| {
this.threads = threads;
cx.notify();
})
})
} }
fn register_context_server_handlers(&self, cx: &mut ModelContext<Self>) { fn register_context_server_handlers(&self, cx: &mut ModelContext<Self>) {
@ -159,133 +244,108 @@ impl ThreadStore {
} }
} }
impl ThreadStore { #[derive(Debug, Clone, Serialize, Deserialize)]
/// Creates some mocked recent threads for testing purposes. pub struct SavedThreadMetadata {
fn mock_recent_threads(&mut self, cx: &mut ModelContext<Self>) { pub id: ThreadId,
use language_model::Role; pub summary: SharedString,
pub updated_at: DateTime<Utc>,
}
self.threads.push(cx.new_model(|cx| { #[derive(Serialize, Deserialize)]
let mut thread = Thread::new(self.tools.clone(), cx); pub struct SavedThread {
thread.set_summary("Introduction to quantum computing", cx); pub summary: SharedString,
thread.insert_user_message("Hello! Can you help me understand quantum computing?", Vec::new(), cx); pub updated_at: DateTime<Utc>,
thread.insert_message(Role::Assistant, "Of course! I'd be happy to help you understand quantum computing. Quantum computing is a fascinating field that uses the principles of quantum mechanics to process information. Unlike classical computers that use bits (0s and 1s), quantum computers use quantum bits or 'qubits'. These qubits can exist in multiple states simultaneously, a property called superposition. This allows quantum computers to perform certain calculations much faster than classical computers. What specific aspect of quantum computing would you like to know more about?", cx); pub messages: Vec<SavedMessage>,
thread.insert_user_message("That's interesting! Can you explain how quantum entanglement is used in quantum computing?", Vec::new(), cx); }
thread.insert_message(Role::Assistant, "Certainly! Quantum entanglement is a key principle used in quantum computing. When two qubits become entangled, the state of one qubit is directly related to the state of the other, regardless of the distance between them. This property is used in quantum computing to create complex quantum states and to perform operations on multiple qubits simultaneously. Entanglement allows quantum computers to process information in ways that classical computers cannot, potentially solving certain problems much more efficiently. For example, it's crucial in quantum error correction and in algorithms like quantum teleportation, which is important for quantum communication.", cx);
thread
}));
self.threads.push(cx.new_model(|cx| { #[derive(Serialize, Deserialize)]
let mut thread = Thread::new(self.tools.clone(), cx); pub struct SavedMessage {
thread.set_summary("Rust web development and async programming", cx); pub id: MessageId,
thread.insert_user_message("Can you show me an example of Rust code for a simple web server?", Vec::new(), cx); pub role: Role,
thread.insert_message(Role::Assistant, "Certainly! Here's an example of a simple web server in Rust using the `actix-web` framework: pub text: String,
}
```rust struct ThreadsDatabase {
use actix_web::{web, App, HttpResponse, HttpServer, Responder}; executor: BackgroundExecutor,
env: heed::Env,
threads: Database<SerdeBincode<ThreadId>, SerdeBincode<SavedThread>>,
}
async fn hello() -> impl Responder { impl ThreadsDatabase {
HttpResponse::Ok().body(\"Hello, World!\") pub fn new(path: PathBuf, executor: BackgroundExecutor) -> Result<Self> {
} std::fs::create_dir_all(&path)?;
#[actix_web::main] const ONE_GB_IN_BYTES: usize = 1024 * 1024 * 1024;
async fn main() -> std::io::Result<()> { let env = unsafe {
HttpServer::new(|| { heed::EnvOpenOptions::new()
App::new() .map_size(ONE_GB_IN_BYTES)
.route(\"/\", web::get().to(hello)) .max_dbs(1)
.open(path)?
};
let mut txn = env.write_txn()?;
let threads = env.create_database(&mut txn, Some("threads"))?;
txn.commit()?;
Ok(Self {
executor,
env,
threads,
}) })
.bind(\"127.0.0.1:8080\")?
.run()
.await
} }
```
This code creates a basic web server that responds with 'Hello, World!' when you access the root URL. Here's a breakdown of what's happening: pub fn list_threads(&self) -> Task<Result<Vec<SavedThreadMetadata>>> {
let env = self.env.clone();
let threads = self.threads;
1. We import necessary items from the `actix-web` crate. self.executor.spawn(async move {
2. We define an async `hello` function that returns a simple HTTP response. let txn = env.read_txn()?;
3. In the `main` function, we set up the server to listen on `127.0.0.1:8080`. let mut iter = threads.iter(&txn)?;
4. We configure the app to respond to GET requests on the root path with our `hello` function. let mut threads = Vec::new();
while let Some((key, value)) = iter.next().transpose()? {
To run this, you'd need to add `actix-web` to your `Cargo.toml` dependencies: threads.push(SavedThreadMetadata {
id: key,
```toml summary: value.summary,
[dependencies] updated_at: value.updated_at,
actix-web = \"4.0\" });
```
Then you can run the server with `cargo run` and access it at `http://localhost:8080`.".unindent(), cx);
thread.insert_user_message("That's great! Can you explain more about async functions in Rust?", Vec::new(), cx);
thread.insert_message(Role::Assistant, "Certainly! Async functions are a key feature in Rust for writing efficient, non-blocking code, especially for I/O-bound operations. Here's an overview:
1. **Syntax**: Async functions are declared using the `async` keyword:
```rust
async fn my_async_function() -> Result<(), Error> {
// Asynchronous code here
} }
```
2. **Futures**: Async functions return a `Future`. A `Future` represents a value that may not be available yet but will be at some point. Ok(threads)
3. **Await**: Inside an async function, you can use the `.await` syntax to wait for other async operations to complete:
```rust
async fn fetch_data() -> Result<String, Error> {
let response = make_http_request().await?;
let data = process_response(response).await?;
Ok(data)
}
```
4. **Non-blocking**: Async functions allow the runtime to work on other tasks while waiting for I/O or other operations to complete, making efficient use of system resources.
5. **Runtime**: To execute async code, you need a runtime like `tokio` or `async-std`. Actix-web, which we used in the previous example, includes its own runtime.
6. **Error Handling**: Async functions work well with Rust's `?` operator for error handling.
Async programming in Rust provides a powerful way to write concurrent code that's both safe and efficient. It's particularly useful for servers, network programming, and any application that deals with many concurrent operations.".unindent(), cx);
thread
}));
self.threads.push(cx.new_model(|cx| {
let mut thread = Thread::new(self.tools.clone(), cx);
thread.set_summary("Rust code with long lines", cx);
thread.insert_user_message("Could you write me some Rust code with long lines?", Vec::new(), cx);
thread.insert_message(Role::Assistant, r#"Here's some Rust code with some intentionally long lines:
```rust
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let very_long_vector = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25];
let complicated_hashmap: HashMap<String, Vec<(i32, f64, String)>> = [("key1".to_string(), vec![(1, 1.1, "value1".to_string()), (2, 2.2, "value2".to_string())]), ("key2".to_string(), vec![(3, 3.3, "value3".to_string()), (4, 4.4, "value4".to_string())])].iter().cloned().collect();
let nested_structure = Arc::new(Mutex::new(HashMap::new()));
let long_closure = |x: i32, y: i32, z: i32| -> i32 { let result = x * y + z; println!("The result of the long closure calculation is: {}", result); result };
let thread_handles: Vec<_> = (0..10).map(|i| {
let nested_structure_clone = Arc::clone(&nested_structure);
thread::spawn(move || {
let mut lock = nested_structure_clone.lock().unwrap();
lock.entry(format!("thread_{}", i)).or_insert_with(|| HashSet::new()).insert(i * i);
}) })
}).collect();
for handle in thread_handles {
handle.join().unwrap();
} }
println!("The final state of the nested structure is: {:?}", nested_structure.lock().unwrap()); pub fn try_find_thread(&self, id: ThreadId) -> Task<Result<Option<SavedThread>>> {
let env = self.env.clone();
let threads = self.threads;
let complex_expression = very_long_vector.iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).fold(0, |acc, x| acc + x) + long_closure(5, 10, 15); self.executor.spawn(async move {
let txn = env.read_txn()?;
println!("The result of the complex expression is: {}", complex_expression); let thread = threads.get(&txn, &id)?;
Ok(thread)
})
} }
```"#.unindent(), cx);
thread pub fn save_thread(&self, id: ThreadId, thread: SavedThread) -> Task<Result<()>> {
})); let env = self.env.clone();
let threads = self.threads;
self.executor.spawn(async move {
let mut txn = env.write_txn()?;
threads.put(&mut txn, &id, &thread)?;
txn.commit()?;
Ok(())
})
}
pub fn delete_thread(&self, id: ThreadId) -> Task<Result<()>> {
let env = self.env.clone();
let threads = self.threads;
self.executor.spawn(async move {
let mut txn = env.write_txn()?;
threads.delete(&mut txn, &id)?;
txn.commit()?;
Ok(())
})
} }
} }