diff --git a/crates/dap/src/client.rs b/crates/dap/src/client.rs index b4dad41162..22926b68cb 100644 --- a/crates/dap/src/client.rs +++ b/crates/dap/src/client.rs @@ -8,8 +8,7 @@ use dap_types::{ requests::Request, }; use futures::channel::oneshot; -use gpui::{AppContext, AsyncApp}; -use smol::channel::{Receiver, Sender}; +use gpui::AsyncApp; use std::{ hash::Hash, sync::atomic::{AtomicU64, Ordering}, @@ -44,99 +43,56 @@ impl DebugAdapterClient { id: SessionId, binary: DebugAdapterBinary, message_handler: DapMessageHandler, - cx: AsyncApp, + cx: &mut AsyncApp, ) -> Result { - let ((server_rx, server_tx), transport_delegate) = - TransportDelegate::start(&binary, cx.clone()).await?; + let transport_delegate = TransportDelegate::start(&binary, cx).await?; let this = Self { id, binary, transport_delegate, sequence_count: AtomicU64::new(1), }; - log::info!("Successfully connected to debug adapter"); - - let client_id = this.id; - - // start handling events/reverse requests - cx.background_spawn(Self::handle_receive_messages( - client_id, - server_rx, - server_tx.clone(), - message_handler, - )) - .detach(); + this.connect(message_handler, cx).await?; Ok(this) } - pub async fn reconnect( + pub fn should_reconnect_for_ssh(&self) -> bool { + self.transport_delegate.tcp_arguments().is_some() + && self.binary.command.as_deref() == Some("ssh") + } + + pub async fn connect( + &self, + message_handler: DapMessageHandler, + cx: &mut AsyncApp, + ) -> Result<()> { + self.transport_delegate.connect(message_handler, cx).await + } + + pub async fn create_child_connection( &self, session_id: SessionId, binary: DebugAdapterBinary, message_handler: DapMessageHandler, - cx: AsyncApp, + cx: &mut AsyncApp, ) -> Result { - let binary = match self.transport_delegate.transport() { - crate::transport::Transport::Tcp(tcp_transport) => DebugAdapterBinary { + let binary = if let Some(connection) = self.transport_delegate.tcp_arguments() { + DebugAdapterBinary { command: None, arguments: Default::default(), envs: Default::default(), cwd: Default::default(), - connection: Some(crate::adapters::TcpArguments { - host: tcp_transport.host, - port: tcp_transport.port, - timeout: Some(tcp_transport.timeout), - }), + connection: Some(connection), request_args: binary.request_args, - }, - _ => self.binary.clone(), + } + } else { + self.binary.clone() }; Self::start(session_id, binary, message_handler, cx).await } - async fn handle_receive_messages( - client_id: SessionId, - server_rx: Receiver, - client_tx: Sender, - mut message_handler: DapMessageHandler, - ) -> Result<()> { - let result = loop { - let message = match server_rx.recv().await { - Ok(message) => message, - Err(e) => break Err(e.into()), - }; - match message { - Message::Event(ev) => { - log::debug!("Client {} received event `{}`", client_id.0, &ev); - - message_handler(Message::Event(ev)) - } - Message::Request(req) => { - log::debug!( - "Client {} received reverse request `{}`", - client_id.0, - &req.command - ); - - message_handler(Message::Request(req)) - } - Message::Response(response) => { - log::debug!("Received response after request timeout: {:#?}", response); - } - } - - smol::future::yield_now().await; - }; - - drop(client_tx); - - log::debug!("Handle receive messages dropped"); - - result - } - /// Send a request to an adapter and get a response back /// Note: This function will block until a response is sent back from the adapter pub async fn request(&self, arguments: R::Arguments) -> Result { @@ -152,8 +108,7 @@ impl DebugAdapterClient { arguments: Some(serialized_arguments), }; self.transport_delegate - .add_pending_request(sequence_id, callback_tx) - .await; + .add_pending_request(sequence_id, callback_tx); log::debug!( "Client {} send `{}` request with sequence_id: {}", @@ -230,8 +185,11 @@ impl DebugAdapterClient { + Send + FnMut(u64, R::Arguments) -> Result, { - let transport = self.transport_delegate.transport().as_fake(); - transport.on_request::(handler); + self.transport_delegate + .transport + .lock() + .as_fake() + .on_request::(handler); } #[cfg(any(test, feature = "test-support"))] @@ -250,8 +208,11 @@ impl DebugAdapterClient { where F: 'static + Send + Fn(Response), { - let transport = self.transport_delegate.transport().as_fake(); - transport.on_response::(handler).await; + self.transport_delegate + .transport + .lock() + .as_fake() + .on_response::(handler); } #[cfg(any(test, feature = "test-support"))] @@ -308,7 +269,7 @@ mod tests { }, }, Box::new(|_| panic!("Did not expect to hit this code path")), - cx.to_async(), + &mut cx.to_async(), ) .await .unwrap(); @@ -390,7 +351,7 @@ mod tests { ); } }), - cx.to_async(), + &mut cx.to_async(), ) .await .unwrap(); @@ -448,7 +409,7 @@ mod tests { ); } }), - cx.to_async(), + &mut cx.to_async(), ) .await .unwrap(); diff --git a/crates/dap/src/transport.rs b/crates/dap/src/transport.rs index 19d4c1674e..5390f2b36d 100644 --- a/crates/dap/src/transport.rs +++ b/crates/dap/src/transport.rs @@ -1,16 +1,19 @@ use anyhow::{Context as _, Result, anyhow, bail}; +#[cfg(any(test, feature = "test-support"))] +use async_pipe::{PipeReader, PipeWriter}; use dap_types::{ ErrorResponse, messages::{Message, Response}, }; use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select}; -use gpui::{AppContext as _, AsyncApp, Task}; +use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task}; +use parking_lot::Mutex; +use proto::ErrorExt; use settings::Settings as _; use smallvec::SmallVec; use smol::{ channel::{Receiver, Sender, unbounded}, io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader}, - lock::Mutex, net::{TcpListener, TcpStream}, }; use std::{ @@ -23,7 +26,11 @@ use std::{ use task::TcpArgumentsTemplate; use util::ConnectionResult; -use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings}; +use crate::{ + adapters::{DebugAdapterBinary, TcpArguments}, + client::DapMessageHandler, + debugger_settings::DebuggerSettings, +}; pub(crate) type IoMessage = str; pub(crate) type Command = str; @@ -35,232 +42,152 @@ pub enum LogKind { Rpc, } +#[derive(Clone, Copy)] pub enum IoKind { StdIn, StdOut, StdErr, } -pub struct TransportPipe { - input: Box, - output: Box, - stdout: Option>, - stderr: Option>, -} - -impl TransportPipe { - pub fn new( - input: Box, - output: Box, - stdout: Option>, - stderr: Option>, - ) -> Self { - TransportPipe { - input, - output, - stdout, - stderr, - } - } -} - type Requests = Arc>>>>; -type LogHandlers = Arc>>; +type LogHandlers = Arc>>; -pub enum Transport { - Stdio(StdioTransport), - Tcp(TcpTransport), +pub trait Transport: Send + Sync { + fn has_adapter_logs(&self) -> bool; + fn tcp_arguments(&self) -> Option; + fn connect( + &mut self, + ) -> Task< + Result<( + Box, + Box, + )>, + >; + fn kill(&self); #[cfg(any(test, feature = "test-support"))] - Fake(FakeTransport), + fn as_fake(&self) -> &FakeTransport { + unreachable!() + } } -impl Transport { - async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> { - #[cfg(any(test, feature = "test-support"))] - if cfg!(any(test, feature = "test-support")) { - return FakeTransport::start(cx) - .await - .map(|(transports, fake)| (transports, Self::Fake(fake))); - } - - if binary.connection.is_some() { - TcpTransport::start(binary, cx) - .await - .map(|(transports, tcp)| (transports, Self::Tcp(tcp))) - .context("Tried to connect to a debug adapter via TCP transport layer") - } else { - StdioTransport::start(binary, cx) - .await - .map(|(transports, stdio)| (transports, Self::Stdio(stdio))) - .context("Tried to connect to a debug adapter via stdin/stdout transport layer") - } - } - - fn has_adapter_logs(&self) -> bool { - match self { - Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(), - Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(), - #[cfg(any(test, feature = "test-support"))] - Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(), - } - } - - async fn kill(&self) { - match self { - Transport::Stdio(stdio_transport) => stdio_transport.kill().await, - Transport::Tcp(tcp_transport) => tcp_transport.kill().await, - #[cfg(any(test, feature = "test-support"))] - Transport::Fake(fake_transport) => fake_transport.kill().await, - } - } - +async fn start( + binary: &DebugAdapterBinary, + log_handlers: LogHandlers, + cx: &mut AsyncApp, +) -> Result> { #[cfg(any(test, feature = "test-support"))] - pub(crate) fn as_fake(&self) -> &FakeTransport { - match self { - Transport::Fake(fake_transport) => fake_transport, - _ => panic!("Not a fake transport layer"), - } + if cfg!(any(test, feature = "test-support")) { + return Ok(Box::new(FakeTransport::start(cx).await?)); + } + + if binary.connection.is_some() { + Ok(Box::new( + TcpTransport::start(binary, log_handlers, cx).await?, + )) + } else { + Ok(Box::new( + StdioTransport::start(binary, log_handlers, cx).await?, + )) } } pub(crate) struct TransportDelegate { log_handlers: LogHandlers, - current_requests: Requests, pending_requests: Requests, - transport: Transport, - server_tx: Arc>>>, - _tasks: Vec>, + pub(crate) transport: Mutex>, + server_tx: smol::lock::Mutex>>, + tasks: Mutex>>, } impl TransportDelegate { - pub(crate) async fn start( - binary: &DebugAdapterBinary, - cx: AsyncApp, - ) -> Result<((Receiver, Sender), Self)> { - let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?; - let mut this = Self { - transport, + pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result { + let log_handlers: LogHandlers = Default::default(); + let transport = start(binary, log_handlers.clone(), cx).await?; + Ok(Self { + transport: Mutex::new(transport), + log_handlers, server_tx: Default::default(), - log_handlers: Default::default(), - current_requests: Default::default(), pending_requests: Default::default(), - _tasks: Vec::new(), - }; - let messages = this.start_handlers(transport_pipes, cx).await?; - Ok((messages, this)) + tasks: Default::default(), + }) } - async fn start_handlers( - &mut self, - mut params: TransportPipe, - cx: AsyncApp, - ) -> Result<(Receiver, Sender)> { - let (client_tx, server_rx) = unbounded::(); + pub async fn connect( + &self, + message_handler: DapMessageHandler, + cx: &mut AsyncApp, + ) -> Result<()> { let (server_tx, client_rx) = unbounded::(); + self.tasks.lock().clear(); let log_dap_communications = cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications) .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false") .unwrap_or(false); + let connect = self.transport.lock().connect(); + let (input, output) = connect.await?; + let log_handler = if log_dap_communications { Some(self.log_handlers.clone()) } else { None }; - let adapter_log_handler = log_handler.clone(); - cx.update(|cx| { - if let Some(stdout) = params.stdout.take() { - self._tasks.push(cx.background_spawn(async move { - match Self::handle_adapter_log(stdout, adapter_log_handler).await { - ConnectionResult::Timeout => { - log::error!("Timed out when handling debugger log"); - } - ConnectionResult::ConnectionReset => { - log::info!("Debugger logs connection closed"); - } - ConnectionResult::Result(Ok(())) => {} - ConnectionResult::Result(Err(e)) => { - log::error!("Error handling debugger log: {e}"); - } - } - })); - } - - let pending_requests = self.pending_requests.clone(); - let output_log_handler = log_handler.clone(); - self._tasks.push(cx.background_spawn(async move { - match Self::handle_output( - params.output, - client_tx, + let pending_requests = self.pending_requests.clone(); + let output_log_handler = log_handler.clone(); + { + let mut tasks = self.tasks.lock(); + tasks.push(cx.background_spawn(async move { + match Self::recv_from_server( + output, + message_handler, pending_requests.clone(), output_log_handler, ) .await { - Ok(()) => {} - Err(e) => log::error!("Error handling debugger output: {e}"), + Ok(()) => { + pending_requests.lock().drain().for_each(|(_, request)| { + request + .send(Err(anyhow!("debugger shutdown unexpectedly"))) + .ok(); + }); + } + Err(e) => { + pending_requests.lock().drain().for_each(|(_, request)| { + request.send(Err(e.cloned())).ok(); + }); + } } - let mut pending_requests = pending_requests.lock().await; - pending_requests.drain().for_each(|(_, request)| { - request - .send(Err(anyhow!("debugger shutdown unexpectedly"))) - .ok(); - }); })); - if let Some(stderr) = params.stderr.take() { - let log_handlers = self.log_handlers.clone(); - self._tasks.push(cx.background_spawn(async move { - match Self::handle_error(stderr, log_handlers).await { - ConnectionResult::Timeout => { - log::error!("Timed out reading debugger error stream") - } - ConnectionResult::ConnectionReset => { - log::info!("Debugger closed its error stream") - } - ConnectionResult::Result(Ok(())) => {} - ConnectionResult::Result(Err(e)) => { - log::error!("Error handling debugger error: {e}") - } - } - })); - } - - let current_requests = self.current_requests.clone(); - let pending_requests = self.pending_requests.clone(); - let log_handler = log_handler.clone(); - self._tasks.push(cx.background_spawn(async move { - match Self::handle_input( - params.input, - client_rx, - current_requests, - pending_requests, - log_handler, - ) - .await - { + tasks.push(cx.background_spawn(async move { + match Self::send_to_server(input, client_rx, log_handler).await { Ok(()) => {} Err(e) => log::error!("Error handling debugger input: {e}"), } })); - })?; + } { let mut lock = self.server_tx.lock().await; *lock = Some(server_tx.clone()); } - Ok((server_rx, server_tx)) + Ok(()) } - pub(crate) async fn add_pending_request( + pub(crate) fn tcp_arguments(&self) -> Option { + self.transport.lock().tcp_arguments() + } + + pub(crate) fn add_pending_request( &self, sequence_id: u64, request: oneshot::Sender>, ) { - let mut pending_requests = self.pending_requests.lock().await; + let mut pending_requests = self.pending_requests.lock(); pending_requests.insert(sequence_id, request); } @@ -272,52 +199,41 @@ impl TransportDelegate { } } - async fn handle_adapter_log( - stdout: Stdout, - log_handlers: Option, - ) -> ConnectionResult<()> - where - Stdout: AsyncRead + Unpin + Send + 'static, - { + async fn handle_adapter_log( + stdout: impl AsyncRead + Unpin + Send + 'static, + iokind: IoKind, + log_handlers: LogHandlers, + ) { let mut reader = BufReader::new(stdout); let mut line = String::new(); - let result = loop { + loop { line.truncate(0); - match reader - .read_line(&mut line) - .await - .context("reading adapter log line") - { - Ok(0) => break ConnectionResult::ConnectionReset, + match reader.read_line(&mut line).await { + Ok(0) => break, Ok(_) => {} - Err(e) => break ConnectionResult::Result(Err(e)), - } - - if let Some(log_handlers) = log_handlers.as_ref() { - for (kind, handler) in log_handlers.lock().iter_mut() { - if matches!(kind, LogKind::Adapter) { - handler(IoKind::StdOut, None, line.as_str()); - } + Err(e) => { + log::debug!("handle_adapter_log: {}", e); + break; } } - }; - log::debug!("Handle adapter log dropped"); - - result + for (kind, handler) in log_handlers.lock().iter_mut() { + if matches!(kind, LogKind::Adapter) { + handler(iokind, None, line.as_str()); + } + } + } } fn build_rpc_message(message: String) -> String { format!("Content-Length: {}\r\n\r\n{}", message.len(), message) } - async fn handle_input( + async fn send_to_server( mut server_stdin: Stdin, client_rx: Receiver, - current_requests: Requests, - pending_requests: Requests, log_handlers: Option, ) -> Result<()> where @@ -326,12 +242,6 @@ impl TransportDelegate { let result = loop { match client_rx.recv().await { Ok(message) => { - if let Message::Request(request) = &message { - if let Some(sender) = current_requests.lock().await.remove(&request.seq) { - pending_requests.lock().await.insert(request.seq, sender); - } - } - let command = match &message { Message::Request(request) => Some(request.command.as_str()), Message::Response(response) => Some(response.command.as_str()), @@ -371,9 +281,9 @@ impl TransportDelegate { result } - async fn handle_output( + async fn recv_from_server( server_stdout: Stdout, - client_tx: Sender, + mut message_handler: DapMessageHandler, pending_requests: Requests, log_handlers: Option, ) -> Result<()> @@ -393,59 +303,25 @@ impl TransportDelegate { return Ok(()); } ConnectionResult::Result(Ok(Message::Response(res))) => { - if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) { + let tx = pending_requests.lock().remove(&res.request_seq); + if let Some(tx) = tx { if let Err(e) = tx.send(Self::process_response(res)) { log::trace!("Did not send response `{:?}` for a cancelled", e); } } else { - client_tx.send(Message::Response(res)).await?; + message_handler(Message::Response(res)) } } - ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?, + ConnectionResult::Result(Ok(message)) => message_handler(message), ConnectionResult::Result(Err(e)) => break Err(e), } }; - drop(client_tx); log::debug!("Handle adapter output dropped"); result } - async fn handle_error(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()> - where - Stderr: AsyncRead + Unpin + Send + 'static, - { - log::debug!("Handle error started"); - let mut buffer = String::new(); - - let mut reader = BufReader::new(stderr); - - let result = loop { - match reader - .read_line(&mut buffer) - .await - .context("reading error log line") - { - Ok(0) => break ConnectionResult::ConnectionReset, - Ok(_) => { - for (kind, log_handler) in log_handlers.lock().iter_mut() { - if matches!(kind, LogKind::Adapter) { - log_handler(IoKind::StdErr, None, buffer.as_str()); - } - } - - buffer.truncate(0); - } - Err(error) => break ConnectionResult::Result(Err(error)), - } - }; - - log::debug!("Handle adapter error dropped"); - - result - } - fn process_response(response: Response) -> Result { if response.success { Ok(response) @@ -479,14 +355,10 @@ impl TransportDelegate { loop { buffer.truncate(0); - match reader - .read_line(buffer) - .await - .with_context(|| "reading a message from server") - { + match reader.read_line(buffer).await { Ok(0) => return ConnectionResult::ConnectionReset, Ok(_) => {} - Err(e) => return ConnectionResult::Result(Err(e)), + Err(e) => return ConnectionResult::Result(Err(e.into())), }; if buffer == "\r\n" { @@ -547,16 +419,8 @@ impl TransportDelegate { server_tx.close(); } - let mut current_requests = self.current_requests.lock().await; - let mut pending_requests = self.pending_requests.lock().await; - - current_requests.clear(); - pending_requests.clear(); - - self.transport.kill().await; - - drop(current_requests); - drop(pending_requests); + self.pending_requests.lock().clear(); + self.transport.lock().kill(); log::debug!("Shutdown client completed"); @@ -564,11 +428,7 @@ impl TransportDelegate { } pub fn has_adapter_logs(&self) -> bool { - self.transport.has_adapter_logs() - } - - pub fn transport(&self) -> &Transport { - &self.transport + self.transport.lock().has_adapter_logs() } pub fn add_log_handler(&self, f: F, kind: LogKind) @@ -581,10 +441,13 @@ impl TransportDelegate { } pub struct TcpTransport { + executor: BackgroundExecutor, pub port: u16, pub host: Ipv4Addr, pub timeout: u64, - process: Option>, + process: Arc>>, + _stderr_task: Option>, + _stdout_task: Option>, } impl TcpTransport { @@ -604,7 +467,11 @@ impl TcpTransport { .port()) } - async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> { + async fn start( + binary: &DebugAdapterBinary, + log_handlers: LogHandlers, + cx: &mut AsyncApp, + ) -> Result { let connection_args = binary .connection .as_ref() @@ -613,7 +480,11 @@ impl TcpTransport { let host = connection_args.host; let port = connection_args.port; - let mut process = if let Some(command) = &binary.command { + let mut process = None; + let mut stdout_task = None; + let mut stderr_task = None; + + if let Some(command) = &binary.command { let mut command = util::command::new_std_command(&command); if let Some(cwd) = &binary.cwd { @@ -623,101 +494,142 @@ impl TcpTransport { command.args(&binary.arguments); command.envs(&binary.envs); - Some( - Child::spawn(command, Stdio::null()) - .with_context(|| "failed to start debug adapter.")?, - ) - } else { - None - }; + let mut p = Child::spawn(command, Stdio::null()) + .with_context(|| "failed to start debug adapter.")?; - let address = SocketAddrV4::new(host, port); + stdout_task = p.stdout.take().map(|stdout| { + cx.background_executor() + .spawn(TransportDelegate::handle_adapter_log( + stdout, + IoKind::StdOut, + log_handlers.clone(), + )) + }); + stderr_task = p.stderr.take().map(|stderr| { + cx.background_executor() + .spawn(TransportDelegate::handle_adapter_log( + stderr, + IoKind::StdErr, + log_handlers, + )) + }); + process = Some(p); + }; let timeout = connection_args.timeout.unwrap_or_else(|| { cx.update(|cx| DebuggerSettings::get_global(cx).timeout) - .unwrap_or(2000u64) + .unwrap_or(20000u64) }); - let (mut process, (rx, tx)) = select! { - _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => { - anyhow::bail!("Connection to TCP DAP timeout {host}:{port}"); - }, - result = cx.spawn(async move |cx| { - loop { - match TcpStream::connect(address).await { - Ok(stream) => return Ok((process, stream.split())), - Err(_) => { - if let Some(p) = &mut process { - if let Ok(Some(_)) = p.try_status() { - let output = process.take().unwrap().into_inner().output().await?; - let output = if output.stderr.is_empty() { - String::from_utf8_lossy(&output.stdout).to_string() - } else { - String::from_utf8_lossy(&output.stderr).to_string() - }; - anyhow::bail!("{output}\nerror: process exited before debugger attached."); - } - } - - cx.background_executor().timer(Duration::from_millis(100)).await; - } - } - } - }).fuse() => result? - }; - log::info!( "Debug adapter has connected to TCP server {}:{}", host, port ); - let stdout = process.as_mut().and_then(|p| p.stdout.take()); - let stderr = process.as_mut().and_then(|p| p.stderr.take()); let this = Self { + executor: cx.background_executor().clone(), port, host, - process: process.map(Mutex::new), + process: Arc::new(Mutex::new(process)), timeout, + _stdout_task: stdout_task, + _stderr_task: stderr_task, }; - let pipe = TransportPipe::new( - Box::new(tx), - Box::new(BufReader::new(rx)), - stdout.map(|s| Box::new(s) as Box), - stderr.map(|s| Box::new(s) as Box), - ); - - Ok((pipe, this)) + Ok(this) } +} +impl Transport for TcpTransport { fn has_adapter_logs(&self) -> bool { true } - async fn kill(&self) { - if let Some(process) = &self.process { - let mut process = process.lock().await; - Child::kill(&mut process); + fn kill(&self) { + if let Some(process) = &mut *self.process.lock() { + process.kill(); } } + + fn tcp_arguments(&self) -> Option { + Some(TcpArguments { + host: self.host, + port: self.port, + timeout: Some(self.timeout), + }) + } + + fn connect( + &mut self, + ) -> Task< + Result<( + Box, + Box, + )>, + > { + let executor = self.executor.clone(); + let timeout = self.timeout; + let address = SocketAddrV4::new(self.host, self.port); + let process = self.process.clone(); + executor.clone().spawn(async move { + select! { + _ = executor.timer(Duration::from_millis(timeout)).fuse() => { + anyhow::bail!("Connection to TCP DAP timeout {address}"); + }, + result = executor.clone().spawn(async move { + loop { + match TcpStream::connect(address).await { + Ok(stream) => { + let (read, write) = stream.split(); + return Ok((Box::new(write) as _, Box::new(read) as _)) + }, + Err(_) => { + let has_process = process.lock().is_some(); + if has_process { + let status = process.lock().as_mut().unwrap().try_status(); + if let Ok(Some(_)) = status { + let process = process.lock().take().unwrap().into_inner(); + let output = process.output().await?; + let output = if output.stderr.is_empty() { + String::from_utf8_lossy(&output.stdout).to_string() + } else { + String::from_utf8_lossy(&output.stderr).to_string() + }; + anyhow::bail!("{output}\nerror: process exited before debugger attached."); + } + } + + executor.timer(Duration::from_millis(100)).await; + } + } + } + }).fuse() => result + } + }) + } } impl Drop for TcpTransport { fn drop(&mut self) { - if let Some(mut p) = self.process.take() { - p.get_mut().kill(); + if let Some(mut p) = self.process.lock().take() { + p.kill(); } } } pub struct StdioTransport { process: Mutex, + _stderr_task: Option>, } impl StdioTransport { - #[allow(dead_code, reason = "This is used in non test builds of Zed")] - async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> { + // #[allow(dead_code, reason = "This is used in non test builds of Zed")] + async fn start( + binary: &DebugAdapterBinary, + log_handlers: LogHandlers, + cx: &mut AsyncApp, + ) -> Result { let Some(binary_command) = &binary.command else { bail!( "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed." @@ -740,42 +652,52 @@ impl StdioTransport { ) })?; - let stdin = process.stdin.take().context("Failed to open stdin")?; - let stdout = process.stdout.take().context("Failed to open stdout")?; - let stderr = process - .stderr - .take() - .map(|io_err| Box::new(io_err) as Box); - - if stderr.is_none() { - bail!( - "Failed to connect to stderr for debug adapter command {}", - &binary_command - ); - } - - log::info!("Debug adapter has connected to stdio adapter"); + let err_task = process.stderr.take().map(|stderr| { + cx.background_spawn(TransportDelegate::handle_adapter_log( + stderr, + IoKind::StdErr, + log_handlers, + )) + }); let process = Mutex::new(process); - Ok(( - TransportPipe::new( - Box::new(stdin), - Box::new(BufReader::new(stdout)), - None, - stderr, - ), - Self { process }, - )) + Ok(Self { + process, + _stderr_task: err_task, + }) } +} +impl Transport for StdioTransport { fn has_adapter_logs(&self) -> bool { false } - async fn kill(&self) { - let mut process = self.process.lock().await; - Child::kill(&mut process); + fn kill(&self) { + self.process.lock().kill() + } + + fn connect( + &mut self, + ) -> Task< + Result<( + Box, + Box, + )>, + > { + let mut process = self.process.lock(); + let result = util::maybe!({ + Ok(( + Box::new(process.stdin.take().context("Cannot reconnect")?) as _, + Box::new(process.stdout.take().context("Cannot reconnect")?) as _, + )) + }); + Task::ready(result) + } + + fn tcp_arguments(&self) -> Option { + None } } @@ -795,9 +717,12 @@ type ResponseHandler = Box; #[cfg(any(test, feature = "test-support"))] pub struct FakeTransport { // for sending fake response back from adapter side - request_handlers: Arc>>, + request_handlers: Arc>>, // for reverse request responses - response_handlers: Arc>>, + response_handlers: Arc>>, + + stdin_writer: Option, + stdout_reader: Option, } #[cfg(any(test, feature = "test-support"))] @@ -833,7 +758,7 @@ impl FakeTransport { ); } - pub async fn on_response(&self, handler: F) + pub fn on_response(&self, handler: F) where F: 'static + Send + Fn(Response), { @@ -842,20 +767,23 @@ impl FakeTransport { .insert(R::COMMAND, Box::new(handler)); } - async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> { - let this = Self { - request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())), - response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())), - }; + async fn start(cx: &mut AsyncApp) -> Result { use dap_types::requests::{Request, RunInTerminal, StartDebugging}; use serde_json::json; let (stdin_writer, stdin_reader) = async_pipe::pipe(); let (stdout_writer, stdout_reader) = async_pipe::pipe(); + let this = Self { + request_handlers: Arc::new(Mutex::new(HashMap::default())), + response_handlers: Arc::new(Mutex::new(HashMap::default())), + stdin_writer: Some(stdin_writer), + stdout_reader: Some(stdout_reader), + }; + let request_handlers = this.request_handlers.clone(); let response_handlers = this.response_handlers.clone(); - let stdout_writer = Arc::new(Mutex::new(stdout_writer)); + let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer)); cx.background_spawn(async move { let mut reader = BufReader::new(stdin_reader); @@ -945,17 +873,43 @@ impl FakeTransport { }) .detach(); - Ok(( - TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None), - this, - )) + Ok(this) + } +} + +#[cfg(any(test, feature = "test-support"))] +impl Transport for FakeTransport { + fn tcp_arguments(&self) -> Option { + None + } + + fn connect( + &mut self, + ) -> Task< + Result<( + Box, + Box, + )>, + > { + let result = util::maybe!({ + Ok(( + Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _, + Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _, + )) + }); + Task::ready(result) } fn has_adapter_logs(&self) -> bool { false } - async fn kill(&self) {} + fn kill(&self) {} + + #[cfg(any(test, feature = "test-support"))] + fn as_fake(&self) -> &FakeTransport { + self + } } struct Child { diff --git a/crates/project/src/debugger/session.rs b/crates/project/src/debugger/session.rs index 0d627f6dad..116c07d625 100644 --- a/crates/project/src/debugger/session.rs +++ b/crates/project/src/debugger/session.rs @@ -29,6 +29,7 @@ use dap::{ StartDebuggingRequestArgumentsRequest, }; use futures::SinkExt; +use futures::channel::mpsc::UnboundedSender; use futures::channel::{mpsc, oneshot}; use futures::{FutureExt, future::Shared}; use gpui::{ @@ -139,6 +140,7 @@ pub struct RunningMode { executor: BackgroundExecutor, is_started: bool, has_ever_stopped: bool, + messages_tx: UnboundedSender, } fn client_source(abs_path: &Path) -> dap::Source { @@ -163,34 +165,35 @@ impl RunningMode { worktree: WeakEntity, binary: DebugAdapterBinary, messages_tx: futures::channel::mpsc::UnboundedSender, - cx: AsyncApp, + cx: &mut AsyncApp, ) -> Result { - let message_handler = Box::new(move |message| { - messages_tx.unbounded_send(message).ok(); + let message_handler = Box::new({ + let messages_tx = messages_tx.clone(); + move |message| { + messages_tx.unbounded_send(message).ok(); + } }); - let client = Arc::new( - if let Some(client) = parent_session - .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok()) - .flatten() - { - client - .reconnect(session_id, binary.clone(), message_handler, cx.clone()) - .await? - } else { - DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone()) - .await? - }, - ); + let client = if let Some(client) = parent_session + .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok()) + .flatten() + { + client + .create_child_connection(session_id, binary.clone(), message_handler, cx) + .await? + } else { + DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await? + }; Ok(Self { - client, + client: Arc::new(client), worktree, tmp_breakpoint: None, binary, executor: cx.background_executor().clone(), is_started: false, has_ever_stopped: false, + messages_tx, }) } @@ -481,6 +484,22 @@ impl RunningMode { }) } + fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option>> { + let client = self.client.clone(); + let messages_tx = self.messages_tx.clone(); + let message_handler = Box::new(move |message| { + messages_tx.unbounded_send(message).ok(); + }); + if client.should_reconnect_for_ssh() { + Some(cx.spawn(async move |cx| { + client.connect(message_handler, cx).await?; + anyhow::Ok(()) + })) + } else { + None + } + } + fn request(&self, request: R) -> Task> where ::Response: 'static, @@ -855,7 +874,7 @@ impl Session { worktree.downgrade(), binary.clone(), message_tx, - cx.clone(), + cx, ) .await?; this.update(cx, |this, cx| { @@ -1131,35 +1150,58 @@ impl Session { pub(super) fn request_initialize(&mut self, cx: &mut Context) -> Task> { let adapter_id = self.adapter().to_string(); let request = Initialize { adapter_id }; - match &self.mode { - Mode::Running(local_mode) => { - let capabilities = local_mode.request(request); - cx.spawn(async move |this, cx| { - let capabilities = capabilities.await?; - this.update(cx, |session, cx| { - session.capabilities = capabilities; - let filters = session - .capabilities - .exception_breakpoint_filters - .clone() - .unwrap_or_default(); - for filter in filters { - let default = filter.default.unwrap_or_default(); - session - .exception_breakpoints - .entry(filter.filter.clone()) - .or_insert_with(|| (filter, default)); - } - cx.emit(SessionEvent::CapabilitiesLoaded); - })?; - Ok(()) - }) - } - Mode::Building => Task::ready(Err(anyhow!( + let Mode::Running(running) = &self.mode else { + return Task::ready(Err(anyhow!( "Cannot send initialize request, task still building" - ))), - } + ))); + }; + let mut response = running.request(request.clone()); + + cx.spawn(async move |this, cx| { + loop { + let capabilities = response.await; + match capabilities { + Err(e) => { + let Ok(Some(reconnect)) = this.update(cx, |this, cx| { + this.as_running() + .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async())) + }) else { + return Err(e); + }; + log::info!("Failed to connect to debug adapter: {}, retrying...", e); + reconnect.await?; + + let Ok(Some(r)) = this.update(cx, |this, _| { + this.as_running() + .map(|running| running.request(request.clone())) + }) else { + return Err(e); + }; + response = r + } + Ok(capabilities) => { + this.update(cx, |session, cx| { + session.capabilities = capabilities; + let filters = session + .capabilities + .exception_breakpoint_filters + .clone() + .unwrap_or_default(); + for filter in filters { + let default = filter.default.unwrap_or_default(); + session + .exception_breakpoints + .entry(filter.filter.clone()) + .or_insert_with(|| (filter, default)); + } + cx.emit(SessionEvent::CapabilitiesLoaded); + })?; + return Ok(()); + } + } + } + }) } pub(super) fn initialize_sequence(