From 3b7ad6236d7dde386e0a518684e2d2a366629f40 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Mon, 18 Aug 2025 23:02:51 -0600 Subject: [PATCH] Re-wire history --- crates/acp_thread/src/acp_thread.rs | 1 - crates/acp_thread/src/connection.rs | 36 ++- crates/agent2/src/agent.rs | 261 +++++++++++----------- crates/agent2/src/db.rs | 28 ++- crates/agent2/src/history_store.rs | 96 ++++++-- crates/agent_ui/src/acp/thread_history.rs | 33 +-- crates/agent_ui/src/acp/thread_view.rs | 47 +++- crates/agent_ui/src/agent_panel.rs | 2 + 8 files changed, 294 insertions(+), 210 deletions(-) diff --git a/crates/acp_thread/src/acp_thread.rs b/crates/acp_thread/src/acp_thread.rs index 58e171046f..bb9c2e35ea 100644 --- a/crates/acp_thread/src/acp_thread.rs +++ b/crates/acp_thread/src/acp_thread.rs @@ -937,7 +937,6 @@ impl AcpThread { } pub fn update_title(&mut self, title: SharedString, cx: &mut Context) -> Result<()> { - dbg!("update title", &title); self.title = title; cx.emit(AcpThreadEvent::TitleUpdated); Ok(()) diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 1e3272a6b0..af653a1c74 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -2,6 +2,7 @@ use crate::{AcpThread, AcpThreadMetadata}; use agent_client_protocol::{self as acp}; use anyhow::Result; use collections::IndexMap; +use futures::channel::mpsc::UnboundedReceiver; use gpui::{Entity, SharedString, Task}; use project::Project; use serde::{Deserialize, Serialize}; @@ -26,25 +27,6 @@ pub trait AgentConnection { cx: &mut App, ) -> Task>>; - // todo!(expose a history trait, and include list_threads and load_thread) - // todo!(write a test) - fn list_threads( - &self, - _cx: &mut App, - ) -> Option>>> { - return None; - } - - fn load_thread( - self: Rc, - _project: Entity, - _cwd: &Path, - _session_id: acp::SessionId, - _cx: &mut App, - ) -> Task>> { - Task::ready(Err(anyhow::anyhow!("load thread not implemented"))) - } - fn auth_methods(&self) -> &[acp::AuthMethod]; fn authenticate(&self, method: acp::AuthMethodId, cx: &mut App) -> Task>; @@ -82,6 +64,10 @@ pub trait AgentConnection { None } + fn history(self: Rc) -> Option> { + None + } + fn into_any(self: Rc) -> Rc; } @@ -99,6 +85,18 @@ pub trait AgentSessionResume { fn run(&self, cx: &mut App) -> Task>; } +pub trait AgentHistory { + fn list_threads(&self, cx: &mut App) -> Task>>; + fn observe_history(&self, cx: &mut App) -> UnboundedReceiver; + fn load_thread( + self: Rc, + _project: Entity, + _cwd: &Path, + _session_id: acp::SessionId, + _cx: &mut App, + ) -> Task>>; +} + #[derive(Debug)] pub struct AuthRequired; diff --git a/crates/agent2/src/agent.rs b/crates/agent2/src/agent.rs index 564511c632..cc3a40f652 100644 --- a/crates/agent2/src/agent.rs +++ b/crates/agent2/src/agent.rs @@ -6,7 +6,7 @@ use crate::{ UserMessageContent, WebSearchTool, templates::Templates, }; use crate::{ThreadsDatabase, generate_session_id}; -use acp_thread::{AcpThread, AcpThreadMetadata, AgentModelSelector}; +use acp_thread::{AcpThread, AcpThreadMetadata, AgentHistory, AgentModelSelector}; use agent_client_protocol as acp; use agent_settings::AgentSettings; use anyhow::{Context as _, Result, anyhow}; @@ -56,7 +56,7 @@ struct Session { thread: Entity, /// The ACP thread that handles protocol communication acp_thread: WeakEntity, - save_task: Task>, + save_task: Task<()>, _subscriptions: Vec, } @@ -173,8 +173,7 @@ pub struct NativeAgent { project: Entity, prompt_store: Option>, thread_database: Arc, - history: watch::Sender>>, - load_history: Task<()>, + history_watchers: Vec>, fs: Arc, _subscriptions: Vec, } @@ -212,7 +211,7 @@ impl NativeAgent { let (project_context_needs_refresh_tx, project_context_needs_refresh_rx) = watch::channel(()); - let mut this = Self { + Self { sessions: HashMap::new(), project_context: Rc::new(RefCell::new(project_context)), project_context_needs_refresh: project_context_needs_refresh_tx, @@ -228,12 +227,9 @@ impl NativeAgent { project, prompt_store, fs, - history: watch::channel(None).0, - load_history: Task::ready(()), + history_watchers: Vec::new(), _subscriptions: subscriptions, - }; - this.reload_history(cx); - this + } }) } @@ -250,7 +246,7 @@ impl NativeAgent { Session { thread: thread.clone(), acp_thread: weak_thread.clone(), - save_task: Task::ready(Ok(())), + save_task: Task::ready(()), _subscriptions: vec![ cx.observe_release(&acp_thread, |this, acp_thread, _cx| { this.sessions.remove(acp_thread.session_id()); @@ -285,35 +281,23 @@ impl NativeAgent { session.save_task = cx.spawn(async move |this, cx| { cx.background_executor().timer(SAVE_THREAD_DEBOUNCE).await; - let db_thread = thread.update(cx, |thread, cx| thread.to_db(cx))?.await; - thread_database.save_thread(id, db_thread).await?; - this.update(cx, |this, cx| this.reload_history(cx))?; - Ok(()) - }); - } - - fn reload_history(&mut self, cx: &mut Context) { - let thread_database = self.thread_database.clone(); - self.load_history = cx.spawn(async move |this, cx| { - let results = cx - .background_spawn(async move { - let results = thread_database.list_threads().await?; - anyhow::Ok( - results - .into_iter() - .map(|thread| AcpThreadMetadata { - agent: NATIVE_AGENT_SERVER_NAME.clone(), - id: thread.id.into(), - title: thread.title, - updated_at: thread.updated_at, - }) - .collect(), - ) + let Some(task) = thread.update(cx, |thread, cx| thread.to_db(cx)).ok() else { + return; + }; + let db_thread = task.await; + let metadata = thread_database + .save_thread(id.clone(), db_thread) + .await + .log_err(); + if let Some(metadata) = metadata { + this.update(cx, |this, _| { + for watcher in this.history_watchers.iter_mut() { + watcher + .unbounded_send(metadata.clone().to_acp(NATIVE_AGENT_SERVER_NAME)) + .log_err(); + } }) - .await; - if let Some(results) = results.log_err() { - this.update(cx, |this, _| this.history.send(Some(results))) - .ok(); + .ok(); } }); } @@ -667,7 +651,6 @@ impl NativeAgentConnection { })??; } ThreadEvent::TitleUpdate(title) => { - dbg!("updating title"); acp_thread .update(cx, |thread, cx| thread.update_title(title, cx))??; } @@ -884,11 +867,106 @@ impl acp_thread::AgentConnection for NativeAgentConnection { Task::ready(Ok(())) } - fn list_threads( + fn model_selector(&self) -> Option> { + Some(Rc::new(self.clone()) as Rc) + } + + fn prompt( &self, + id: Option, + params: acp::PromptRequest, cx: &mut App, - ) -> Option>>> { - Some(self.0.read(cx).history.receiver()) + ) -> Task> { + let id = id.expect("UserMessageId is required"); + let session_id = params.session_id.clone(); + log::info!("Received prompt request for session: {}", session_id); + log::debug!("Prompt blocks count: {}", params.prompt.len()); + + self.run_turn(session_id, cx, |thread, cx| { + let content: Vec = params + .prompt + .into_iter() + .map(Into::into) + .collect::>(); + log::info!("Converted prompt to message: {} chars", content.len()); + log::debug!("Message id: {:?}", id); + log::debug!("Message content: {:?}", content); + + thread.update(cx, |thread, cx| thread.send(id, content, cx)) + }) + } + + fn resume( + &self, + session_id: &acp::SessionId, + _cx: &mut App, + ) -> Option> { + Some(Rc::new(NativeAgentSessionResume { + connection: self.clone(), + session_id: session_id.clone(), + }) as _) + } + + fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) { + log::info!("Cancelling on session: {}", session_id); + self.0.update(cx, |agent, cx| { + if let Some(agent) = agent.sessions.get(session_id) { + agent.thread.update(cx, |thread, cx| thread.cancel(cx)); + } + }); + } + + fn session_editor( + &self, + session_id: &agent_client_protocol::SessionId, + cx: &mut App, + ) -> Option> { + self.0.update(cx, |agent, _cx| { + agent + .sessions + .get(session_id) + .map(|session| Rc::new(NativeAgentSessionEditor(session.thread.clone())) as _) + }) + } + + fn history(self: Rc) -> Option> { + Some(self) + } + + fn into_any(self: Rc) -> Rc { + self + } +} + +struct NativeAgentSessionEditor(Entity); + +impl acp_thread::AgentSessionEditor for NativeAgentSessionEditor { + fn truncate(&self, message_id: acp_thread::UserMessageId, cx: &mut App) -> Task> { + Task::ready( + self.0 + .update(cx, |thread, cx| thread.truncate(message_id, cx)), + ) + } +} + +impl acp_thread::AgentHistory for NativeAgentConnection { + fn list_threads(&self, cx: &mut App) -> Task>> { + let database = self.0.read(cx).thread_database.clone(); + cx.background_executor().spawn(async move { + let threads = database.list_threads().await?; + anyhow::Ok( + threads + .into_iter() + .map(|thread| thread.to_acp(NATIVE_AGENT_SERVER_NAME)) + .collect::>(), + ) + }) + } + + fn observe_history(&self, cx: &mut App) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + self.0.update(cx, |this, _| this.history_watchers.push(tx)); + rx } fn load_thread( @@ -980,83 +1058,6 @@ impl acp_thread::AgentConnection for NativeAgentConnection { Ok(acp_thread) }) } - - fn model_selector(&self) -> Option> { - Some(Rc::new(self.clone()) as Rc) - } - - fn prompt( - &self, - id: Option, - params: acp::PromptRequest, - cx: &mut App, - ) -> Task> { - let id = id.expect("UserMessageId is required"); - let session_id = params.session_id.clone(); - log::info!("Received prompt request for session: {}", session_id); - log::debug!("Prompt blocks count: {}", params.prompt.len()); - - self.run_turn(session_id, cx, |thread, cx| { - let content: Vec = params - .prompt - .into_iter() - .map(Into::into) - .collect::>(); - log::info!("Converted prompt to message: {} chars", content.len()); - log::debug!("Message id: {:?}", id); - log::debug!("Message content: {:?}", content); - - thread.update(cx, |thread, cx| thread.send(id, content, cx)) - }) - } - - fn resume( - &self, - session_id: &acp::SessionId, - _cx: &mut App, - ) -> Option> { - Some(Rc::new(NativeAgentSessionResume { - connection: self.clone(), - session_id: session_id.clone(), - }) as _) - } - - fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) { - log::info!("Cancelling on session: {}", session_id); - self.0.update(cx, |agent, cx| { - if let Some(agent) = agent.sessions.get(session_id) { - agent.thread.update(cx, |thread, cx| thread.cancel(cx)); - } - }); - } - - fn session_editor( - &self, - session_id: &agent_client_protocol::SessionId, - cx: &mut App, - ) -> Option> { - self.0.update(cx, |agent, _cx| { - agent - .sessions - .get(session_id) - .map(|session| Rc::new(NativeAgentSessionEditor(session.thread.clone())) as _) - }) - } - - fn into_any(self: Rc) -> Rc { - self - } -} - -struct NativeAgentSessionEditor(Entity); - -impl acp_thread::AgentSessionEditor for NativeAgentSessionEditor { - fn truncate(&self, message_id: acp_thread::UserMessageId, cx: &mut App) -> Task> { - Task::ready( - self.0 - .update(cx, |thread, cx| thread.truncate(message_id, cx)), - ) - } } struct NativeAgentSessionResume { @@ -1274,16 +1275,22 @@ mod tests { ) .await .unwrap(); - let connection = NativeAgentConnection(agent.clone()); - let history_store = cx.new(|cx| { - let mut store = HistoryStore::new(cx); - store.register_agent(NATIVE_AGENT_SERVER_NAME.clone(), &connection, cx); - store - }); + let connection = Rc::new(NativeAgentConnection(agent.clone())); + let history = connection.clone().history().unwrap(); + let history_store = cx.new(|cx| HistoryStore::get_or_init(cx)); + + history_store + .update(cx, |history_store, cx| { + history_store.load_history(NATIVE_AGENT_SERVER_NAME.clone(), history.as_ref(), cx) + }) + .await + .unwrap(); let acp_thread = cx .update(|cx| { - Rc::new(connection.clone()).new_thread(project.clone(), Path::new(path!("")), cx) + connection + .clone() + .new_thread(project.clone(), Path::new(path!("")), cx) }) .await .unwrap(); diff --git a/crates/agent2/src/db.rs b/crates/agent2/src/db.rs index 67dc8c5e98..43979e8c74 100644 --- a/crates/agent2/src/db.rs +++ b/crates/agent2/src/db.rs @@ -1,4 +1,5 @@ use crate::{AgentMessage, AgentMessageContent, UserMessage, UserMessageContent}; +use acp_thread::{AcpThreadMetadata, AgentServerName}; use agent::thread_store; use agent_client_protocol as acp; use agent_settings::{AgentProfileId, CompletionMode}; @@ -30,6 +31,17 @@ pub struct DbThreadMetadata { pub updated_at: DateTime, } +impl DbThreadMetadata { + pub fn to_acp(self, agent: AgentServerName) -> AcpThreadMetadata { + AcpThreadMetadata { + agent, + id: self.id, + title: self.title, + updated_at: self.updated_at, + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct DbThread { pub title: SharedString, @@ -288,7 +300,7 @@ impl ThreadsDatabase { connection: &Arc>, id: acp::SessionId, thread: DbThread, - ) -> Result<()> { + ) -> Result { let json_data = serde_json::to_string(&thread)?; let title = thread.title.to_string(); let updated_at = thread.updated_at.to_rfc3339(); @@ -303,9 +315,13 @@ impl ThreadsDatabase { INSERT OR REPLACE INTO threads (id, summary, updated_at, data_type, data) VALUES (?, ?, ?, ?, ?) "})?; - insert((id.0, title, updated_at, data_type, data))?; + insert((id.0.clone(), title, updated_at, data_type, data))?; - Ok(()) + Ok(DbThreadMetadata { + id, + title: thread.title, + updated_at: thread.updated_at, + }) } pub fn list_threads(&self) -> Task>> { @@ -360,7 +376,11 @@ impl ThreadsDatabase { }) } - pub fn save_thread(&self, id: acp::SessionId, thread: DbThread) -> Task> { + pub fn save_thread( + &self, + id: acp::SessionId, + thread: DbThread, + ) -> Task> { let connection = self.connection.clone(); self.executor diff --git a/crates/agent2/src/history_store.rs b/crates/agent2/src/history_store.rs index 151cc4e8a9..996702bff7 100644 --- a/crates/agent2/src/history_store.rs +++ b/crates/agent2/src/history_store.rs @@ -1,12 +1,17 @@ use acp_thread::{AcpThreadMetadata, AgentConnection, AgentServerName}; use agent_client_protocol as acp; +use agent_servers::AgentServer; use assistant_context::SavedContextMetadata; use chrono::{DateTime, Utc}; use collections::HashMap; -use gpui::{SharedString, Task, prelude::*}; +use gpui::{Entity, Global, SharedString, Task, prelude::*}; +use project::Project; use serde::{Deserialize, Serialize}; +use ui::App; -use std::{path::Path, sync::Arc, time::Duration}; +use std::{path::Path, rc::Rc, sync::Arc, time::Duration}; + +use crate::NativeAgentServer; const MAX_RECENTLY_OPENED_ENTRIES: usize = 6; const NAVIGATION_HISTORY_PATH: &str = "agent-navigation-history.json"; @@ -59,41 +64,88 @@ enum SerializedRecentOpen { Context(String), } +#[derive(Default)] pub struct AgentHistory { - entries: watch::Receiver>>, - _task: Task<()>, + entries: HashMap, + loaded: bool, } pub struct HistoryStore { agents: HashMap, // todo!() text threads } +// note, we have to share the history store between all windows +// because we only get updates from one connection at a time. +struct GlobalHistoryStore(Entity); +impl Global for GlobalHistoryStore {} impl HistoryStore { - pub fn new(_cx: &mut Context) -> Self { + pub fn get_or_init(project: &Entity, cx: &mut App) -> Entity { + if cx.has_global::() { + return cx.global::().0.clone(); + } + let history_store = cx.new(|cx| HistoryStore::new(cx)); + cx.set_global(GlobalHistoryStore(history_store.clone())); + let root_dir = project + .read(cx) + .visible_worktrees(cx) + .next() + .map(|worktree| worktree.read(cx).abs_path()) + .unwrap_or_else(|| paths::home_dir().as_path().into()); + + let agent = NativeAgentServer::new(project.read(cx).fs().clone()); + let connect = agent.connect(&root_dir, project, cx); + cx.spawn({ + let history_store = history_store.clone(); + async move |cx| { + let connection = connect.await?.history().unwrap(); + history_store + .update(cx, |history_store, cx| { + history_store.load_history(agent.name(), connection.as_ref(), cx) + })? + .await + } + }) + .detach_and_log_err(cx); + history_store + } + + fn new(_cx: &mut Context) -> Self { Self { agents: HashMap::default(), } } - pub fn register_agent( + pub fn update_history(&mut self, entry: AcpThreadMetadata, cx: &mut Context) { + let agent = self + .agents + .entry(entry.agent.clone()) + .or_insert(Default::default()); + + agent.entries.insert(entry.id.clone(), entry); + cx.notify() + } + + pub fn load_history( &mut self, agent_name: AgentServerName, - connection: &dyn AgentConnection, + connection: &dyn acp_thread::AgentHistory, cx: &mut Context, - ) { - let Some(mut history) = connection.list_threads(cx) else { - return; - }; - let history = AgentHistory { - entries: history.clone(), - _task: cx.spawn(async move |this, cx| { - dbg!("loaded", history.borrow().as_ref().map(|b| b.len())); - while history.changed().await.is_ok() { - this.update(cx, |_, cx| cx.notify()).ok(); - } - }), - }; - self.agents.insert(agent_name.clone(), history); + ) -> Task> { + let threads = connection.list_threads(cx); + cx.spawn(async move |this, cx| { + let threads = threads.await?; + + this.update(cx, |this, cx| { + this.agents.insert( + agent_name, + AgentHistory { + loaded: true, + entries: threads.into_iter().map(|t| (t.id.clone(), t)).collect(), + }, + ); + cx.notify() + }) + }) } pub fn entries(&mut self, _cx: &mut Context) -> Vec { @@ -107,7 +159,7 @@ impl HistoryStore { history_entries.extend( self.agents .values_mut() - .flat_map(|history| history.entries.borrow().clone().unwrap_or_default()) // todo!("surface the loading state?") + .flat_map(|history| history.entries.values().cloned()) // todo!("surface the loading state?") .map(HistoryEntry::AcpThread), ); // todo!() include the text threads in here. diff --git a/crates/agent_ui/src/acp/thread_history.rs b/crates/agent_ui/src/acp/thread_history.rs index 344790f26e..d0bf60ad72 100644 --- a/crates/agent_ui/src/acp/thread_history.rs +++ b/crates/agent_ui/src/acp/thread_history.rs @@ -5,8 +5,8 @@ use chrono::{Datelike as _, Local, NaiveDate, TimeDelta}; use editor::{Editor, EditorEvent}; use fuzzy::{StringMatch, StringMatchCandidate}; use gpui::{ - App, Empty, Entity, EventEmitter, FocusHandle, Focusable, ScrollStrategy, Stateful, Task, - UniformListScrollHandle, Window, uniform_list, + App, Empty, Entity, EventEmitter, FocusHandle, Focusable, Global, ScrollStrategy, Stateful, + Task, UniformListScrollHandle, Window, uniform_list, }; use project::Project; use std::{fmt::Display, ops::Range, sync::Arc}; @@ -18,7 +18,7 @@ use ui::{ use util::ResultExt; pub struct AcpThreadHistory { - history_store: Entity, + pub(crate) history_store: Entity, scroll_handle: UniformListScrollHandle, selected_index: usize, hovered_index: Option, @@ -69,37 +69,12 @@ impl AcpThreadHistory { window: &mut Window, cx: &mut Context, ) -> Self { - let history_store = cx.new(|cx| agent2::HistoryStore::new(cx)); - - let agent = NativeAgentServer::new(project.read(cx).fs().clone()); - - let root_dir = project - .read(cx) - .visible_worktrees(cx) - .next() - .map(|worktree| worktree.read(cx).abs_path()) - .unwrap_or_else(|| paths::home_dir().as_path().into()); - - // todo!() reuse this connection for sending messages - let connect = agent.connect(&root_dir, project, cx); - cx.spawn(async move |this, cx| { - let connection = connect.await?; - this.update(cx, |this, cx| { - this.history_store.update(cx, |this, cx| { - this.register_agent(agent.name(), connection.as_ref(), cx) - }) - })?; - // todo!() we must keep it alive - std::mem::forget(connection); - anyhow::Ok(()) - }) - .detach(); - let search_editor = cx.new(|cx| { let mut editor = Editor::single_line(window, cx); editor.set_placeholder_text("Search threads...", cx); editor }); + let history_store = HistoryStore::get_or_init(project, cx); let search_editor_subscription = cx.subscribe(&search_editor, |this, search_editor, event, cx| { diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index 18eeda267d..676739da3b 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -18,6 +18,7 @@ use editor::scroll::Autoscroll; use editor::{Editor, EditorMode, MultiBuffer, PathKey, SelectionEffects}; use file_icons::FileIcons; use fs::Fs; +use futures::StreamExt; use gpui::{ Action, Animation, AnimationExt, App, BorderStyle, ClickEvent, ClipboardItem, EdgesRefinement, Empty, Entity, FocusHandle, Focusable, Hsla, Length, ListOffset, ListState, MouseButton, @@ -123,6 +124,7 @@ pub struct AcpThreadView { editor_expanded: bool, terminal_expanded: bool, editing_message: Option, + history_store: Entity, _cancel_task: Option>, _subscriptions: [Subscription; 3], } @@ -134,6 +136,7 @@ enum ThreadState { Ready { thread: Entity, _subscription: [Subscription; 2], + _history_task: Option>, }, LoadError(LoadError), Unauthenticated { @@ -149,6 +152,7 @@ impl AcpThreadView { agent: Rc, workspace: WeakEntity, project: Entity, + history_store: Entity, thread_store: Entity, text_thread_store: Entity, restore_thread: Option, @@ -196,11 +200,13 @@ impl AcpThreadView { thread_state: Self::initial_state( agent, restore_thread, + history_store.clone(), workspace, project, window, cx, ), + history_store, message_editor, model_selector: None, profile_selector: None, @@ -225,6 +231,7 @@ impl AcpThreadView { fn initial_state( agent: Rc, restore_thread: Option, + history_store: Entity, workspace: WeakEntity, project: Entity, window: &mut Window, @@ -251,6 +258,25 @@ impl AcpThreadView { } }; + let mut history_task = None; + let history = connection.clone().history(); + if let Some(history) = history.clone() { + if let Some(mut history) = cx.update(|_, cx| history.observe_history(cx)).ok() { + history_task = Some(cx.spawn(async move |cx| { + while let Some(update) = history.next().await { + if !history_store + .update(cx, |history_store, cx| { + history_store.update_history(update, cx) + }) + .is_ok() + { + break; + } + } + })); + } + } + // this.update_in(cx, |_this, _window, cx| { // let status = connection.exit_status(cx); // cx.spawn(async move |this, cx| { @@ -264,15 +290,12 @@ impl AcpThreadView { // .detach(); // }) // .ok(); - // + let history = connection.clone().history(); let task = cx.update(|_, cx| { - if let Some(restore_thread) = restore_thread { - connection.clone().load_thread( - project.clone(), - &root_dir, - restore_thread.id, - cx, - ) + if let Some(restore_thread) = restore_thread + && let Some(history) = history + { + history.load_thread(project.clone(), &root_dir, restore_thread.id, cx) } else { connection .clone() @@ -342,6 +365,7 @@ impl AcpThreadView { this.thread_state = ThreadState::Ready { thread, _subscription: [thread_subscription, action_log_subscription], + _history_task: history_task, }; this.profile_selector = this.as_native_thread(cx).map(|thread| { @@ -751,6 +775,7 @@ impl AcpThreadView { this.thread_state = Self::initial_state( agent, None, // todo!() + this.history_store.clone(), this.workspace.clone(), project.clone(), window, @@ -3755,6 +3780,8 @@ pub(crate) mod tests { cx.update(|_window, cx| cx.new(|cx| ThreadStore::fake(project.clone(), cx))); let text_thread_store = cx.update(|_window, cx| cx.new(|cx| TextThreadStore::fake(project.clone(), cx))); + let history_store = + cx.update(|_window, cx| cx.new(|cx| agent2::HistoryStore::get_or_init(cx))); let thread_view = cx.update(|window, cx| { cx.new(|cx| { @@ -3762,6 +3789,7 @@ pub(crate) mod tests { Rc::new(agent), workspace.downgrade(), project, + history_store.clone(), thread_store.clone(), text_thread_store.clone(), None, @@ -3954,6 +3982,8 @@ pub(crate) mod tests { cx.update(|_window, cx| cx.new(|cx| ThreadStore::fake(project.clone(), cx))); let text_thread_store = cx.update(|_window, cx| cx.new(|cx| TextThreadStore::fake(project.clone(), cx))); + let history_store = + cx.update(|_window, cx| cx.new(|cx| agent2::HistoryStore::get_or_init(cx))); let connection = Rc::new(StubAgentConnection::new()); let thread_view = cx.update(|window, cx| { @@ -3962,6 +3992,7 @@ pub(crate) mod tests { Rc::new(StubAgentServer::new(connection.as_ref().clone())), workspace.downgrade(), project.clone(), + history_store, thread_store.clone(), text_thread_store.clone(), None, diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index 20e6206fa2..8392c5589b 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -1031,11 +1031,13 @@ impl AgentPanel { }; this.update_in(cx, |this, window, cx| { + let acp_history_store = this.acp_history.read(cx).history_store.clone(); let thread_view = cx.new(|cx| { crate::acp::AcpThreadView::new( server, workspace.clone(), project, + acp_history_store, thread_store.clone(), text_thread_store.clone(), restore_thread,