From e0c860c42a3669f7daabb7d902c6ad76a19d3da9 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Mon, 7 Jul 2025 14:28:59 -0400 Subject: [PATCH] debugger: Fix issues with restarting sessions (#33932) Restarting sessions was broken in #33273 when we moved away from calling `kill` in the shutdown sequence. This PR re-adds that `kill` call so that old debug adapter processes will be cleaned up when sessions are restarted within Zed. This doesn't re-introduce the issue that motivated the original changes to the shutdown sequence, because we still send Disconnect/Terminate to debug adapters when quitting Zed without killing the process directly. We also now remove manually-restarted sessions eagerly from the session list. Closes #33916 Release Notes: - debugger: Fixed not being able to restart sessions for Debugpy and other adapters that communicate over TCP. - debugger: Fixed debug adapter processes not being cleaned up. --------- Co-authored-by: Remco Smits --- crates/dap/src/client.rs | 8 ++- crates/dap/src/transport.rs | 65 ++++++++++---------- crates/debugger_ui/src/debugger_panel.rs | 18 +++--- crates/debugger_ui/src/tests/attach_modal.rs | 10 +-- crates/project/src/debugger/session.rs | 43 +++++++++---- 5 files changed, 82 insertions(+), 62 deletions(-) diff --git a/crates/dap/src/client.rs b/crates/dap/src/client.rs index 4515e2a1d7..ff082e3b76 100644 --- a/crates/dap/src/client.rs +++ b/crates/dap/src/client.rs @@ -2,7 +2,7 @@ use crate::{ adapters::DebugAdapterBinary, transport::{IoKind, LogKind, TransportDelegate}, }; -use anyhow::Result; +use anyhow::{Context as _, Result}; use dap_types::{ messages::{Message, Response}, requests::Request, @@ -108,7 +108,11 @@ impl DebugAdapterClient { arguments: Some(serialized_arguments), }; self.transport_delegate - .add_pending_request(sequence_id, callback_tx); + .pending_requests + .lock() + .as_mut() + .context("client is closed")? + .insert(sequence_id, callback_tx); log::debug!( "Client {} send `{}` request with sequence_id: {}", diff --git a/crates/dap/src/transport.rs b/crates/dap/src/transport.rs index 9576608ab0..14370f66e4 100644 --- a/crates/dap/src/transport.rs +++ b/crates/dap/src/transport.rs @@ -49,7 +49,6 @@ pub enum IoKind { StdErr, } -type Requests = Arc>>>>; type LogHandlers = Arc>>; pub trait Transport: Send + Sync { @@ -93,18 +92,14 @@ async fn start( pub(crate) struct TransportDelegate { log_handlers: LogHandlers, - pub(crate) pending_requests: Requests, + // TODO this should really be some kind of associative channel + pub(crate) pending_requests: + Arc>>>>>, pub(crate) transport: Mutex>, pub(crate) server_tx: smol::lock::Mutex>>, tasks: Mutex>>, } -impl Drop for TransportDelegate { - fn drop(&mut self) { - self.transport.lock().kill() - } -} - impl TransportDelegate { pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result { let log_handlers: LogHandlers = Default::default(); @@ -113,7 +108,7 @@ impl TransportDelegate { transport: Mutex::new(transport), log_handlers, server_tx: Default::default(), - pending_requests: Default::default(), + pending_requests: Arc::new(Mutex::new(Some(HashMap::default()))), tasks: Default::default(), }) } @@ -154,16 +149,26 @@ impl TransportDelegate { .await { Ok(()) => { - pending_requests.lock().drain().for_each(|(_, request)| { - request - .send(Err(anyhow!("debugger shutdown unexpectedly"))) - .ok(); - }); + pending_requests + .lock() + .take() + .into_iter() + .flatten() + .for_each(|(_, request)| { + request + .send(Err(anyhow!("debugger shutdown unexpectedly"))) + .ok(); + }); } Err(e) => { - pending_requests.lock().drain().for_each(|(_, request)| { - request.send(Err(e.cloned())).ok(); - }); + pending_requests + .lock() + .take() + .into_iter() + .flatten() + .for_each(|(_, request)| { + request.send(Err(e.cloned())).ok(); + }); } } })); @@ -188,15 +193,6 @@ impl TransportDelegate { self.transport.lock().tcp_arguments() } - pub(crate) fn add_pending_request( - &self, - sequence_id: u64, - request: oneshot::Sender>, - ) { - let mut pending_requests = self.pending_requests.lock(); - pending_requests.insert(sequence_id, request); - } - pub(crate) async fn send_message(&self, message: Message) -> Result<()> { if let Some(server_tx) = self.server_tx.lock().await.as_ref() { server_tx.send(message).await.context("sending message") @@ -290,7 +286,7 @@ impl TransportDelegate { async fn recv_from_server( server_stdout: Stdout, mut message_handler: DapMessageHandler, - pending_requests: Requests, + pending_requests: Arc>>>>>, log_handlers: Option, ) -> Result<()> where @@ -300,16 +296,21 @@ impl TransportDelegate { let mut reader = BufReader::new(server_stdout); let result = loop { - match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref()) - .await - { + let result = + Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref()) + .await; + match result { ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"), ConnectionResult::ConnectionReset => { log::info!("Debugger closed the connection"); - return Ok(()); + break Ok(()); } ConnectionResult::Result(Ok(Message::Response(res))) => { - let tx = pending_requests.lock().remove(&res.request_seq); + let tx = pending_requests + .lock() + .as_mut() + .context("client is closed")? + .remove(&res.request_seq); if let Some(tx) = tx { if let Err(e) = tx.send(Self::process_response(res)) { log::trace!("Did not send response `{:?}` for a cancelled", e); diff --git a/crates/debugger_ui/src/debugger_panel.rs b/crates/debugger_ui/src/debugger_panel.rs index d03e8c5225..37df989c0b 100644 --- a/crates/debugger_ui/src/debugger_panel.rs +++ b/crates/debugger_ui/src/debugger_panel.rs @@ -33,7 +33,7 @@ use std::sync::{Arc, LazyLock}; use task::{DebugScenario, TaskContext}; use tree_sitter::{Query, StreamingIterator as _}; use ui::{ContextMenu, Divider, PopoverMenuHandle, Tooltip, prelude::*}; -use util::maybe; +use util::{ResultExt, maybe}; use workspace::SplitDirection; use workspace::{ Pane, Workspace, @@ -363,11 +363,17 @@ impl DebugPanel { let label = curr_session.read(cx).label().clone(); let adapter = curr_session.read(cx).adapter().clone(); let binary = curr_session.read(cx).binary().cloned().unwrap(); - let task = curr_session.update(cx, |session, cx| session.shutdown(cx)); let task_context = curr_session.read(cx).task_context().clone(); + let curr_session_id = curr_session.read(cx).session_id(); + self.sessions + .retain(|session| session.read(cx).session_id(cx) != curr_session_id); + let task = dap_store_handle.update(cx, |dap_store, cx| { + dap_store.shutdown_session(curr_session_id, cx) + }); + cx.spawn_in(window, async move |this, cx| { - task.await; + task.await.log_err(); let (session, task) = dap_store_handle.update(cx, |dap_store, cx| { let session = dap_store.new_session(label, adapter, task_context, None, cx); @@ -1298,9 +1304,7 @@ impl Panel for DebugPanel { impl Render for DebugPanel { fn render(&mut self, window: &mut Window, cx: &mut Context) -> impl IntoElement { - let has_sessions = self.sessions.len() > 0; let this = cx.weak_entity(); - debug_assert_eq!(has_sessions, self.active_session.is_some()); if self .active_session @@ -1487,8 +1491,8 @@ impl Render for DebugPanel { })) }) .map(|this| { - if has_sessions { - this.children(self.active_session.clone()) + if let Some(active_session) = self.active_session.clone() { + this.child(active_session) } else { let docked_to_bottom = self.position(window, cx) == DockPosition::Bottom; let welcome_experience = v_flex() diff --git a/crates/debugger_ui/src/tests/attach_modal.rs b/crates/debugger_ui/src/tests/attach_modal.rs index 1395915305..906a7a0d4b 100644 --- a/crates/debugger_ui/src/tests/attach_modal.rs +++ b/crates/debugger_ui/src/tests/attach_modal.rs @@ -27,7 +27,7 @@ async fn test_direct_attach_to_process(executor: BackgroundExecutor, cx: &mut Te let workspace = init_test_workspace(&project, cx).await; let cx = &mut VisualTestContext::from_window(*workspace, cx); - let session = start_debug_session_with( + let _session = start_debug_session_with( &workspace, cx, DebugTaskDefinition { @@ -59,14 +59,6 @@ async fn test_direct_attach_to_process(executor: BackgroundExecutor, cx: &mut Te assert!(workspace.active_modal::(cx).is_none()); }) .unwrap(); - - let shutdown_session = project.update(cx, |project, cx| { - project.dap_store().update(cx, |dap_store, cx| { - dap_store.shutdown_session(session.read(cx).session_id(), cx) - }) - }); - - shutdown_session.await.unwrap(); } #[gpui::test] diff --git a/crates/project/src/debugger/session.rs b/crates/project/src/debugger/session.rs index 9ab83610f0..3190254af8 100644 --- a/crates/project/src/debugger/session.rs +++ b/crates/project/src/debugger/session.rs @@ -677,6 +677,7 @@ pub struct Session { ignore_breakpoints: bool, exception_breakpoints: BTreeMap, background_tasks: Vec>, + restart_task: Option>, task_context: TaskContext, } @@ -838,6 +839,7 @@ impl Session { loaded_sources: Vec::default(), threads: IndexMap::default(), background_tasks: Vec::default(), + restart_task: None, locations: Default::default(), is_session_terminated: false, ignore_breakpoints: false, @@ -1870,18 +1872,30 @@ impl Session { } pub fn restart(&mut self, args: Option, cx: &mut Context) { - if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() { - self.request( - RestartCommand { - raw: args.unwrap_or(Value::Null), - }, - Self::fallback_to_manual_restart, - cx, - ) - .detach(); - } else { - cx.emit(SessionStateEvent::Restart); + if self.restart_task.is_some() || self.as_running().is_none() { + return; } + + let supports_dap_restart = + self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated(); + + self.restart_task = Some(cx.spawn(async move |this, cx| { + let _ = this.update(cx, |session, cx| { + if supports_dap_restart { + session + .request( + RestartCommand { + raw: args.unwrap_or(Value::Null), + }, + Self::fallback_to_manual_restart, + cx, + ) + .detach(); + } else { + cx.emit(SessionStateEvent::Restart); + } + }); + })); } pub fn shutdown(&mut self, cx: &mut Context) -> Task<()> { @@ -1919,8 +1933,13 @@ impl Session { cx.emit(SessionStateEvent::Shutdown); - cx.spawn(async move |_, _| { + cx.spawn(async move |this, cx| { task.await; + let _ = this.update(cx, |this, _| { + if let Some(adapter_client) = this.adapter_client() { + adapter_client.kill(); + } + }); }) }