debugger: Fix issues with restarting sessions (#33932)

Restarting sessions was broken in #33273 when we moved away from calling
`kill` in the shutdown sequence. This PR re-adds that `kill` call so
that old debug adapter processes will be cleaned up when sessions are
restarted within Zed. This doesn't re-introduce the issue that motivated
the original changes to the shutdown sequence, because we still send
Disconnect/Terminate to debug adapters when quitting Zed without killing
the process directly.

We also now remove manually-restarted sessions eagerly from the session
list.

Closes #33916 

Release Notes:

- debugger: Fixed not being able to restart sessions for Debugpy and
other adapters that communicate over TCP.
- debugger: Fixed debug adapter processes not being cleaned up.

---------

Co-authored-by: Remco Smits <djsmits12@gmail.com>
This commit is contained in:
Cole Miller 2025-07-07 14:28:59 -04:00 committed by GitHub
parent 2a6ef006f4
commit e0c860c42a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 82 additions and 62 deletions

View file

@ -2,7 +2,7 @@ use crate::{
adapters::DebugAdapterBinary, adapters::DebugAdapterBinary,
transport::{IoKind, LogKind, TransportDelegate}, transport::{IoKind, LogKind, TransportDelegate},
}; };
use anyhow::Result; use anyhow::{Context as _, Result};
use dap_types::{ use dap_types::{
messages::{Message, Response}, messages::{Message, Response},
requests::Request, requests::Request,
@ -108,7 +108,11 @@ impl DebugAdapterClient {
arguments: Some(serialized_arguments), arguments: Some(serialized_arguments),
}; };
self.transport_delegate self.transport_delegate
.add_pending_request(sequence_id, callback_tx); .pending_requests
.lock()
.as_mut()
.context("client is closed")?
.insert(sequence_id, callback_tx);
log::debug!( log::debug!(
"Client {} send `{}` request with sequence_id: {}", "Client {} send `{}` request with sequence_id: {}",

View file

@ -49,7 +49,6 @@ pub enum IoKind {
StdErr, StdErr,
} }
type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>; type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
pub trait Transport: Send + Sync { pub trait Transport: Send + Sync {
@ -93,18 +92,14 @@ async fn start(
pub(crate) struct TransportDelegate { pub(crate) struct TransportDelegate {
log_handlers: LogHandlers, log_handlers: LogHandlers,
pub(crate) pending_requests: Requests, // TODO this should really be some kind of associative channel
pub(crate) pending_requests:
Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
pub(crate) transport: Mutex<Box<dyn Transport>>, pub(crate) transport: Mutex<Box<dyn Transport>>,
pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>, pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
tasks: Mutex<Vec<Task<()>>>, tasks: Mutex<Vec<Task<()>>>,
} }
impl Drop for TransportDelegate {
fn drop(&mut self) {
self.transport.lock().kill()
}
}
impl TransportDelegate { impl TransportDelegate {
pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> { pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
let log_handlers: LogHandlers = Default::default(); let log_handlers: LogHandlers = Default::default();
@ -113,7 +108,7 @@ impl TransportDelegate {
transport: Mutex::new(transport), transport: Mutex::new(transport),
log_handlers, log_handlers,
server_tx: Default::default(), server_tx: Default::default(),
pending_requests: Default::default(), pending_requests: Arc::new(Mutex::new(Some(HashMap::default()))),
tasks: Default::default(), tasks: Default::default(),
}) })
} }
@ -154,16 +149,26 @@ impl TransportDelegate {
.await .await
{ {
Ok(()) => { Ok(()) => {
pending_requests.lock().drain().for_each(|(_, request)| { pending_requests
request .lock()
.send(Err(anyhow!("debugger shutdown unexpectedly"))) .take()
.ok(); .into_iter()
}); .flatten()
.for_each(|(_, request)| {
request
.send(Err(anyhow!("debugger shutdown unexpectedly")))
.ok();
});
} }
Err(e) => { Err(e) => {
pending_requests.lock().drain().for_each(|(_, request)| { pending_requests
request.send(Err(e.cloned())).ok(); .lock()
}); .take()
.into_iter()
.flatten()
.for_each(|(_, request)| {
request.send(Err(e.cloned())).ok();
});
} }
} }
})); }));
@ -188,15 +193,6 @@ impl TransportDelegate {
self.transport.lock().tcp_arguments() self.transport.lock().tcp_arguments()
} }
pub(crate) fn add_pending_request(
&self,
sequence_id: u64,
request: oneshot::Sender<Result<Response>>,
) {
let mut pending_requests = self.pending_requests.lock();
pending_requests.insert(sequence_id, request);
}
pub(crate) async fn send_message(&self, message: Message) -> Result<()> { pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
if let Some(server_tx) = self.server_tx.lock().await.as_ref() { if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
server_tx.send(message).await.context("sending message") server_tx.send(message).await.context("sending message")
@ -290,7 +286,7 @@ impl TransportDelegate {
async fn recv_from_server<Stdout>( async fn recv_from_server<Stdout>(
server_stdout: Stdout, server_stdout: Stdout,
mut message_handler: DapMessageHandler, mut message_handler: DapMessageHandler,
pending_requests: Requests, pending_requests: Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
log_handlers: Option<LogHandlers>, log_handlers: Option<LogHandlers>,
) -> Result<()> ) -> Result<()>
where where
@ -300,16 +296,21 @@ impl TransportDelegate {
let mut reader = BufReader::new(server_stdout); let mut reader = BufReader::new(server_stdout);
let result = loop { let result = loop {
match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref()) let result =
.await Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
{ .await;
match result {
ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"), ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
ConnectionResult::ConnectionReset => { ConnectionResult::ConnectionReset => {
log::info!("Debugger closed the connection"); log::info!("Debugger closed the connection");
return Ok(()); break Ok(());
} }
ConnectionResult::Result(Ok(Message::Response(res))) => { ConnectionResult::Result(Ok(Message::Response(res))) => {
let tx = pending_requests.lock().remove(&res.request_seq); let tx = pending_requests
.lock()
.as_mut()
.context("client is closed")?
.remove(&res.request_seq);
if let Some(tx) = tx { if let Some(tx) = tx {
if let Err(e) = tx.send(Self::process_response(res)) { if let Err(e) = tx.send(Self::process_response(res)) {
log::trace!("Did not send response `{:?}` for a cancelled", e); log::trace!("Did not send response `{:?}` for a cancelled", e);

View file

@ -33,7 +33,7 @@ use std::sync::{Arc, LazyLock};
use task::{DebugScenario, TaskContext}; use task::{DebugScenario, TaskContext};
use tree_sitter::{Query, StreamingIterator as _}; use tree_sitter::{Query, StreamingIterator as _};
use ui::{ContextMenu, Divider, PopoverMenuHandle, Tooltip, prelude::*}; use ui::{ContextMenu, Divider, PopoverMenuHandle, Tooltip, prelude::*};
use util::maybe; use util::{ResultExt, maybe};
use workspace::SplitDirection; use workspace::SplitDirection;
use workspace::{ use workspace::{
Pane, Workspace, Pane, Workspace,
@ -363,11 +363,17 @@ impl DebugPanel {
let label = curr_session.read(cx).label().clone(); let label = curr_session.read(cx).label().clone();
let adapter = curr_session.read(cx).adapter().clone(); let adapter = curr_session.read(cx).adapter().clone();
let binary = curr_session.read(cx).binary().cloned().unwrap(); let binary = curr_session.read(cx).binary().cloned().unwrap();
let task = curr_session.update(cx, |session, cx| session.shutdown(cx));
let task_context = curr_session.read(cx).task_context().clone(); let task_context = curr_session.read(cx).task_context().clone();
let curr_session_id = curr_session.read(cx).session_id();
self.sessions
.retain(|session| session.read(cx).session_id(cx) != curr_session_id);
let task = dap_store_handle.update(cx, |dap_store, cx| {
dap_store.shutdown_session(curr_session_id, cx)
});
cx.spawn_in(window, async move |this, cx| { cx.spawn_in(window, async move |this, cx| {
task.await; task.await.log_err();
let (session, task) = dap_store_handle.update(cx, |dap_store, cx| { let (session, task) = dap_store_handle.update(cx, |dap_store, cx| {
let session = dap_store.new_session(label, adapter, task_context, None, cx); let session = dap_store.new_session(label, adapter, task_context, None, cx);
@ -1298,9 +1304,7 @@ impl Panel for DebugPanel {
impl Render for DebugPanel { impl Render for DebugPanel {
fn render(&mut self, window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement { fn render(&mut self, window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
let has_sessions = self.sessions.len() > 0;
let this = cx.weak_entity(); let this = cx.weak_entity();
debug_assert_eq!(has_sessions, self.active_session.is_some());
if self if self
.active_session .active_session
@ -1487,8 +1491,8 @@ impl Render for DebugPanel {
})) }))
}) })
.map(|this| { .map(|this| {
if has_sessions { if let Some(active_session) = self.active_session.clone() {
this.children(self.active_session.clone()) this.child(active_session)
} else { } else {
let docked_to_bottom = self.position(window, cx) == DockPosition::Bottom; let docked_to_bottom = self.position(window, cx) == DockPosition::Bottom;
let welcome_experience = v_flex() let welcome_experience = v_flex()

View file

@ -27,7 +27,7 @@ async fn test_direct_attach_to_process(executor: BackgroundExecutor, cx: &mut Te
let workspace = init_test_workspace(&project, cx).await; let workspace = init_test_workspace(&project, cx).await;
let cx = &mut VisualTestContext::from_window(*workspace, cx); let cx = &mut VisualTestContext::from_window(*workspace, cx);
let session = start_debug_session_with( let _session = start_debug_session_with(
&workspace, &workspace,
cx, cx,
DebugTaskDefinition { DebugTaskDefinition {
@ -59,14 +59,6 @@ async fn test_direct_attach_to_process(executor: BackgroundExecutor, cx: &mut Te
assert!(workspace.active_modal::<AttachModal>(cx).is_none()); assert!(workspace.active_modal::<AttachModal>(cx).is_none());
}) })
.unwrap(); .unwrap();
let shutdown_session = project.update(cx, |project, cx| {
project.dap_store().update(cx, |dap_store, cx| {
dap_store.shutdown_session(session.read(cx).session_id(), cx)
})
});
shutdown_session.await.unwrap();
} }
#[gpui::test] #[gpui::test]

View file

@ -677,6 +677,7 @@ pub struct Session {
ignore_breakpoints: bool, ignore_breakpoints: bool,
exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>, exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
background_tasks: Vec<Task<()>>, background_tasks: Vec<Task<()>>,
restart_task: Option<Task<()>>,
task_context: TaskContext, task_context: TaskContext,
} }
@ -838,6 +839,7 @@ impl Session {
loaded_sources: Vec::default(), loaded_sources: Vec::default(),
threads: IndexMap::default(), threads: IndexMap::default(),
background_tasks: Vec::default(), background_tasks: Vec::default(),
restart_task: None,
locations: Default::default(), locations: Default::default(),
is_session_terminated: false, is_session_terminated: false,
ignore_breakpoints: false, ignore_breakpoints: false,
@ -1870,18 +1872,30 @@ impl Session {
} }
pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) { pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() { if self.restart_task.is_some() || self.as_running().is_none() {
self.request( return;
RestartCommand {
raw: args.unwrap_or(Value::Null),
},
Self::fallback_to_manual_restart,
cx,
)
.detach();
} else {
cx.emit(SessionStateEvent::Restart);
} }
let supports_dap_restart =
self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
self.restart_task = Some(cx.spawn(async move |this, cx| {
let _ = this.update(cx, |session, cx| {
if supports_dap_restart {
session
.request(
RestartCommand {
raw: args.unwrap_or(Value::Null),
},
Self::fallback_to_manual_restart,
cx,
)
.detach();
} else {
cx.emit(SessionStateEvent::Restart);
}
});
}));
} }
pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> { pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
@ -1919,8 +1933,13 @@ impl Session {
cx.emit(SessionStateEvent::Shutdown); cx.emit(SessionStateEvent::Shutdown);
cx.spawn(async move |_, _| { cx.spawn(async move |this, cx| {
task.await; task.await;
let _ = this.update(cx, |this, _| {
if let Some(adapter_client) = this.adapter_client() {
adapter_client.kill();
}
});
}) })
} }