From b7469f5bc346ca9723a3595f6f00c5fe6a3f7a6c Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Tue, 5 Aug 2025 19:10:51 -0300 Subject: [PATCH] Fix ACP connection and thread leak (#35670) When you switched away from an ACP thread, the `AcpThreadView` entity (and thus thread, and subprocess) was leaked. This happened because we were using `cx.processor` for the `list` state callback, which uses a strong reference. This PR changes the callback so that it holds a weak reference, and adds some tests and assertions at various levels to make sure we don't reintroduce the leak in the future. Release Notes: - N/A --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- crates/agent_servers/src/acp/v0.rs | 1 + crates/agent_servers/src/acp/v1.rs | 15 ++++++++----- crates/agent_servers/src/claude.rs | 5 ++--- crates/agent_servers/src/e2e_tests.rs | 27 ++++++++++++++++++++++++ crates/agent_ui/src/acp/thread_view.rs | 29 ++++++++++++++++++-------- crates/agent_ui/src/agent_panel.rs | 17 ++++++++------- 8 files changed, 73 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4cf5a68f1d..4803c0de8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,9 +137,9 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.0.18" +version = "0.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8e4c1dccb35e69d32566f0d11948d902f9942fc3f038821816c1150cf5925f4" +checksum = "12dbfec3d27680337ed9d3064eecafe97acf0b0f190148bb4e29d96707c9e403" dependencies = [ "anyhow", "futures 0.3.31", diff --git a/Cargo.toml b/Cargo.toml index 733db92ce9..05ceb3bd14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -423,7 +423,7 @@ zlog_settings = { path = "crates/zlog_settings" } # agentic-coding-protocol = "0.0.10" -agent-client-protocol = "0.0.18" +agent-client-protocol = "0.0.20" aho-corasick = "1.1" alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" } any_vec = "0.14" diff --git a/crates/agent_servers/src/acp/v0.rs b/crates/agent_servers/src/acp/v0.rs index 3dcda4ce8d..fda28fa176 100644 --- a/crates/agent_servers/src/acp/v0.rs +++ b/crates/agent_servers/src/acp/v0.rs @@ -380,6 +380,7 @@ impl AcpConnection { let stdin = child.stdin.take().unwrap(); let stdout = child.stdout.take().unwrap(); + log::trace!("Spawned (pid: {})", child.id()); let foreground_executor = cx.foreground_executor().clone(); diff --git a/crates/agent_servers/src/acp/v1.rs b/crates/agent_servers/src/acp/v1.rs index a4f0e996b5..0b6fa1c48b 100644 --- a/crates/agent_servers/src/acp/v1.rs +++ b/crates/agent_servers/src/acp/v1.rs @@ -19,7 +19,6 @@ pub struct AcpConnection { sessions: Rc>>, auth_methods: Vec, _io_task: Task>, - _child: smol::process::Child, } pub struct AcpSession { @@ -47,6 +46,7 @@ impl AcpConnection { let stdout = child.stdout.take().expect("Failed to take stdout"); let stdin = child.stdin.take().expect("Failed to take stdin"); + log::trace!("Spawned (pid: {})", child.id()); let sessions = Rc::new(RefCell::new(HashMap::default())); @@ -61,7 +61,11 @@ impl AcpConnection { } }); - let io_task = cx.background_spawn(io_task); + let io_task = cx.background_spawn(async move { + io_task.await?; + drop(child); + Ok(()) + }); let response = connection .initialize(acp::InitializeRequest { @@ -84,7 +88,6 @@ impl AcpConnection { connection: connection.into(), server_name, sessions, - _child: child, _io_task: io_task, }) } @@ -155,8 +158,10 @@ impl AgentConnection for AcpConnection { fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task> { let conn = self.connection.clone(); - cx.foreground_executor() - .spawn(async move { Ok(conn.prompt(params).await?) }) + cx.foreground_executor().spawn(async move { + conn.prompt(params).await?; + Ok(()) + }) } fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) { diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 9040b83085..b097c0345c 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -125,8 +125,7 @@ impl AgentConnection for ClaudeAgentConnection { session_id.clone(), &mcp_config_path, &cwd, - ) - .await?; + )?; let pid = child.id(); log::trace!("Spawned (pid: {})", pid); @@ -262,7 +261,7 @@ enum ClaudeSessionMode { Resume, } -async fn spawn_claude( +fn spawn_claude( command: &AgentServerCommand, mode: ClaudeSessionMode, session_id: acp::SessionId, diff --git a/crates/agent_servers/src/e2e_tests.rs b/crates/agent_servers/src/e2e_tests.rs index a60aefb7b9..05f874bd30 100644 --- a/crates/agent_servers/src/e2e_tests.rs +++ b/crates/agent_servers/src/e2e_tests.rs @@ -311,6 +311,27 @@ pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppCon }); } +pub async fn test_thread_drop(server: impl AgentServer + 'static, cx: &mut TestAppContext) { + let fs = init_test(cx).await; + let project = Project::test(fs, [], cx).await; + let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await; + + thread + .update(cx, |thread, cx| thread.send_raw("Hello from test!", cx)) + .await + .unwrap(); + + thread.read_with(cx, |thread, _| { + assert!(thread.entries().len() >= 2, "Expected at least 2 entries"); + }); + + let weak_thread = thread.downgrade(); + drop(thread); + + cx.executor().run_until_parked(); + assert!(!weak_thread.is_upgradable()); +} + #[macro_export] macro_rules! common_e2e_tests { ($server:expr, allow_option_id = $allow_option_id:expr) => { @@ -351,6 +372,12 @@ macro_rules! common_e2e_tests { async fn cancel(cx: &mut ::gpui::TestAppContext) { $crate::e2e_tests::test_cancel($server, cx).await; } + + #[::gpui::test] + #[cfg_attr(not(feature = "e2e"), ignore)] + async fn thread_drop(cx: &mut ::gpui::TestAppContext) { + $crate::e2e_tests::test_thread_drop($server, cx).await; + } } }; } diff --git a/crates/agent_ui/src/acp/thread_view.rs b/crates/agent_ui/src/acp/thread_view.rs index cecf989a23..7c6f315fb6 100644 --- a/crates/agent_ui/src/acp/thread_view.rs +++ b/crates/agent_ui/src/acp/thread_view.rs @@ -169,12 +169,13 @@ impl AcpThreadView { let mention_set = mention_set.clone(); - let list_state = ListState::new( - 0, - gpui::ListAlignment::Bottom, - px(2048.0), - cx.processor({ - move |this: &mut Self, index: usize, window, cx| { + let list_state = ListState::new(0, gpui::ListAlignment::Bottom, px(2048.0), { + let this = cx.entity().downgrade(); + move |index: usize, window, cx| { + let Some(this) = this.upgrade() else { + return Empty.into_any(); + }; + this.update(cx, |this, cx| { let Some((entry, len)) = this.thread().and_then(|thread| { let entries = &thread.read(cx).entries(); Some((entries.get(index)?, entries.len())) @@ -182,9 +183,9 @@ impl AcpThreadView { return Empty.into_any(); }; this.render_entry(index, len, entry, window, cx) - } - }), - ); + }) + } + }); Self { agent: agent.clone(), @@ -2719,6 +2720,16 @@ mod tests { use super::*; + #[gpui::test] + async fn test_drop(cx: &mut TestAppContext) { + init_test(cx); + + let (thread_view, _cx) = setup_thread_view(StubAgentServer::default(), cx).await; + let weak_view = thread_view.downgrade(); + drop(thread_view); + assert!(!weak_view.is_upgradable()); + } + #[gpui::test] async fn test_notification_for_stop_event(cx: &mut TestAppContext) { init_test(cx); diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index 5f3315f69a..717778bb98 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -970,13 +970,7 @@ impl AgentPanel { ) }); - this.set_active_view( - ActiveView::ExternalAgentThread { - thread_view: thread_view.clone(), - }, - window, - cx, - ); + this.set_active_view(ActiveView::ExternalAgentThread { thread_view }, window, cx); }) }) .detach_and_log_err(cx); @@ -1477,6 +1471,7 @@ impl AgentPanel { let current_is_special = current_is_history || current_is_config; let new_is_special = new_is_history || new_is_config; + let mut old_acp_thread = None; match &self.active_view { ActiveView::Thread { thread, .. } => { @@ -1488,6 +1483,9 @@ impl AgentPanel { }); } } + ActiveView::ExternalAgentThread { thread_view } => { + old_acp_thread.replace(thread_view.downgrade()); + } _ => {} } @@ -1518,6 +1516,11 @@ impl AgentPanel { self.active_view = new_view; } + debug_assert!( + old_acp_thread.map_or(true, |thread| !thread.is_upgradable()), + "AcpThreadView leaked" + ); + self.acp_message_history.borrow_mut().reset_position(); self.focus_handle(cx).focus(window);