diff --git a/crates/dap/src/transport.rs b/crates/dap/src/transport.rs index ac51ca7195..dc7d6a0d30 100644 --- a/crates/dap/src/transport.rs +++ b/crates/dap/src/transport.rs @@ -4,7 +4,7 @@ use dap_types::{ messages::{Message, Response}, }; use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select}; -use gpui::AsyncApp; +use gpui::{AppContext as _, AsyncApp, Task}; use settings::Settings as _; use smallvec::SmallVec; use smol::{ @@ -22,7 +22,7 @@ use std::{ time::Duration, }; use task::TcpArgumentsTemplate; -use util::{ResultExt as _, TryFutureExt}; +use util::{ConnectionResult, ResultExt as _}; use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings}; @@ -126,7 +126,7 @@ pub(crate) struct TransportDelegate { pending_requests: Requests, transport: Transport, server_tx: Arc>>>, - _tasks: Vec>>, + _tasks: Vec>, } impl TransportDelegate { @@ -141,7 +141,7 @@ impl TransportDelegate { log_handlers: Default::default(), current_requests: Default::default(), pending_requests: Default::default(), - _tasks: Default::default(), + _tasks: Vec::new(), }; let messages = this.start_handlers(transport_pipes, cx).await?; Ok((messages, this)) @@ -166,45 +166,76 @@ impl TransportDelegate { None }; + let adapter_log_handler = log_handler.clone(); cx.update(|cx| { if let Some(stdout) = params.stdout.take() { - self._tasks.push( - cx.background_executor() - .spawn(Self::handle_adapter_log(stdout, log_handler.clone()).log_err()), - ); + self._tasks.push(cx.background_spawn(async move { + match Self::handle_adapter_log(stdout, adapter_log_handler).await { + ConnectionResult::Timeout => { + log::error!("Timed out when handling debugger log"); + } + ConnectionResult::ConnectionReset => { + log::info!("Debugger logs connection closed"); + } + ConnectionResult::Result(Ok(())) => {} + ConnectionResult::Result(Err(e)) => { + log::error!("Error handling debugger log: {e}"); + } + } + })); } - self._tasks.push( - cx.background_executor().spawn( - Self::handle_output( - params.output, - client_tx, - self.pending_requests.clone(), - log_handler.clone(), - ) - .log_err(), - ), - ); + let pending_requests = self.pending_requests.clone(); + let output_log_handler = log_handler.clone(); + self._tasks.push(cx.background_spawn(async move { + match Self::handle_output( + params.output, + client_tx, + pending_requests, + output_log_handler, + ) + .await + { + Ok(()) => {} + Err(e) => log::error!("Error handling debugger output: {e}"), + } + })); if let Some(stderr) = params.stderr.take() { - self._tasks.push( - cx.background_executor() - .spawn(Self::handle_error(stderr, self.log_handlers.clone()).log_err()), - ); + let log_handlers = self.log_handlers.clone(); + self._tasks.push(cx.background_spawn(async move { + match Self::handle_error(stderr, log_handlers).await { + ConnectionResult::Timeout => { + log::error!("Timed out reading debugger error stream") + } + ConnectionResult::ConnectionReset => { + log::info!("Debugger closed its error stream") + } + ConnectionResult::Result(Ok(())) => {} + ConnectionResult::Result(Err(e)) => { + log::error!("Error handling debugger error: {e}") + } + } + })); } - self._tasks.push( - cx.background_executor().spawn( - Self::handle_input( - params.input, - client_rx, - self.current_requests.clone(), - self.pending_requests.clone(), - log_handler.clone(), - ) - .log_err(), - ), - ); + let current_requests = self.current_requests.clone(); + let pending_requests = self.pending_requests.clone(); + let log_handler = log_handler.clone(); + self._tasks.push(cx.background_spawn(async move { + match Self::handle_input( + params.input, + client_rx, + current_requests, + pending_requests, + log_handler, + ) + .await + { + Ok(()) => {} + Err(e) => log::error!("Error handling debugger input: {e}"), + } + })); })?; { @@ -235,7 +266,7 @@ impl TransportDelegate { async fn handle_adapter_log( stdout: Stdout, log_handlers: Option, - ) -> Result<()> + ) -> ConnectionResult<()> where Stdout: AsyncRead + Unpin + Send + 'static, { @@ -245,13 +276,14 @@ impl TransportDelegate { let result = loop { line.truncate(0); - let bytes_read = match reader.read_line(&mut line).await { - Ok(bytes_read) => bytes_read, - Err(e) => break Err(e.into()), - }; - - if bytes_read == 0 { - anyhow::bail!("Debugger log stream closed"); + match reader + .read_line(&mut line) + .await + .context("reading adapter log line") + { + Ok(0) => break ConnectionResult::ConnectionReset, + Ok(_) => {} + Err(e) => break ConnectionResult::Result(Err(e)), } if let Some(log_handlers) = log_handlers.as_ref() { @@ -337,35 +369,35 @@ impl TransportDelegate { let mut reader = BufReader::new(server_stdout); let result = loop { - let message = - Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref()) - .await; - - match message { - Ok(Message::Response(res)) => { + match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref()) + .await + { + ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"), + ConnectionResult::ConnectionReset => { + log::info!("Debugger closed the connection"); + return Ok(()); + } + ConnectionResult::Result(Ok(Message::Response(res))) => { if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) { if let Err(e) = tx.send(Self::process_response(res)) { log::trace!("Did not send response `{:?}` for a cancelled", e); } } else { client_tx.send(Message::Response(res)).await?; - }; + } } - Ok(message) => { - client_tx.send(message).await?; - } - Err(e) => break Err(e), + ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?, + ConnectionResult::Result(Err(e)) => break Err(e), } }; drop(client_tx); - log::debug!("Handle adapter output dropped"); result } - async fn handle_error(stderr: Stderr, log_handlers: LogHandlers) -> Result<()> + async fn handle_error(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()> where Stderr: AsyncRead + Unpin + Send + 'static, { @@ -375,8 +407,12 @@ impl TransportDelegate { let mut reader = BufReader::new(stderr); let result = loop { - match reader.read_line(&mut buffer).await { - Ok(0) => anyhow::bail!("debugger error stream closed"), + match reader + .read_line(&mut buffer) + .await + .context("reading error log line") + { + Ok(0) => break ConnectionResult::ConnectionReset, Ok(_) => { for (kind, log_handler) in log_handlers.lock().iter_mut() { if matches!(kind, LogKind::Adapter) { @@ -386,7 +422,7 @@ impl TransportDelegate { buffer.truncate(0); } - Err(error) => break Err(error.into()), + Err(error) => break ConnectionResult::Result(Err(error)), } }; @@ -420,7 +456,7 @@ impl TransportDelegate { reader: &mut BufReader, buffer: &mut String, log_handlers: Option<&LogHandlers>, - ) -> Result + ) -> ConnectionResult where Stdout: AsyncRead + Unpin + Send + 'static, { @@ -428,48 +464,58 @@ impl TransportDelegate { loop { buffer.truncate(0); - if reader + match reader .read_line(buffer) .await - .with_context(|| "reading a message from server")? - == 0 + .with_context(|| "reading a message from server") { - anyhow::bail!("debugger reader stream closed, last string output: '{buffer}'"); + Ok(0) => return ConnectionResult::ConnectionReset, + Ok(_) => {} + Err(e) => return ConnectionResult::Result(Err(e)), }; if buffer == "\r\n" { break; } - let parts = buffer.trim().split_once(": "); - - match parts { - Some(("Content-Length", value)) => { - content_length = Some(value.parse().context("invalid content length")?); + if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") { + match value.parse().context("invalid content length") { + Ok(length) => content_length = Some(length), + Err(e) => return ConnectionResult::Result(Err(e)), } - _ => {} } } - let content_length = content_length.context("missing content length")?; + let content_length = match content_length.context("missing content length") { + Ok(length) => length, + Err(e) => return ConnectionResult::Result(Err(e)), + }; let mut content = vec![0; content_length]; - reader + if let Err(e) = reader .read_exact(&mut content) .await - .with_context(|| "reading after a loop")?; + .with_context(|| "reading after a loop") + { + return ConnectionResult::Result(Err(e)); + } - let message = std::str::from_utf8(&content).context("invalid utf8 from server")?; + let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") { + Ok(str) => str, + Err(e) => return ConnectionResult::Result(Err(e)), + }; if let Some(log_handlers) = log_handlers { for (kind, log_handler) in log_handlers.lock().iter_mut() { if matches!(kind, LogKind::Rpc) { - log_handler(IoKind::StdOut, &message); + log_handler(IoKind::StdOut, message_str); } } } - Ok(serde_json::from_str::(message)?) + ConnectionResult::Result( + serde_json::from_str::(message_str).context("deserializing server message"), + ) } pub async fn shutdown(&self) -> Result<()> { @@ -777,71 +823,31 @@ impl FakeTransport { let response_handlers = this.response_handlers.clone(); let stdout_writer = Arc::new(Mutex::new(stdout_writer)); - cx.background_executor() - .spawn(async move { - let mut reader = BufReader::new(stdin_reader); - let mut buffer = String::new(); + cx.background_spawn(async move { + let mut reader = BufReader::new(stdin_reader); + let mut buffer = String::new(); - loop { - let message = - TransportDelegate::receive_server_message(&mut reader, &mut buffer, None) - .await; - - match message { - Err(error) => { - break anyhow::anyhow!(error); - } - Ok(message) => { - match message { - Message::Request(request) => { - // redirect reverse requests to stdout writer/reader - if request.command == RunInTerminal::COMMAND - || request.command == StartDebugging::COMMAND - { - let message = - serde_json::to_string(&Message::Request(request)) - .unwrap(); - - let mut writer = stdout_writer.lock().await; - writer - .write_all( - TransportDelegate::build_rpc_message(message) - .as_bytes(), - ) - .await - .unwrap(); - writer.flush().await.unwrap(); - } else { - let response = if let Some(handle) = request_handlers - .lock() - .get_mut(request.command.as_str()) - { - handle( - request.seq, - request.arguments.unwrap_or(json!({})), - ) - } else { - panic!("No request handler for {}", request.command); - }; - let message = - serde_json::to_string(&Message::Response(response)) - .unwrap(); - - let mut writer = stdout_writer.lock().await; - - writer - .write_all( - TransportDelegate::build_rpc_message(message) - .as_bytes(), - ) - .await - .unwrap(); - writer.flush().await.unwrap(); - } - } - Message::Event(event) => { + loop { + match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None) + .await + { + ConnectionResult::Timeout => { + anyhow::bail!("Timed out when connecting to debugger"); + } + ConnectionResult::ConnectionReset => { + log::info!("Debugger closed the connection"); + break Ok(()); + } + ConnectionResult::Result(Err(e)) => break Err(e), + ConnectionResult::Result(Ok(message)) => { + match message { + Message::Request(request) => { + // redirect reverse requests to stdout writer/reader + if request.command == RunInTerminal::COMMAND + || request.command == StartDebugging::COMMAND + { let message = - serde_json::to_string(&Message::Event(event)).unwrap(); + serde_json::to_string(&Message::Request(request)).unwrap(); let mut writer = stdout_writer.lock().await; writer @@ -852,22 +858,58 @@ impl FakeTransport { .await .unwrap(); writer.flush().await.unwrap(); - } - Message::Response(response) => { - if let Some(handle) = - response_handlers.lock().get(response.command.as_str()) + } else { + let response = if let Some(handle) = + request_handlers.lock().get_mut(request.command.as_str()) { - handle(response); + handle(request.seq, request.arguments.unwrap_or(json!({}))) } else { - log::error!("No response handler for {}", response.command); - } + panic!("No request handler for {}", request.command); + }; + let message = + serde_json::to_string(&Message::Response(response)) + .unwrap(); + + let mut writer = stdout_writer.lock().await; + + writer + .write_all( + TransportDelegate::build_rpc_message(message) + .as_bytes(), + ) + .await + .unwrap(); + writer.flush().await.unwrap(); + } + } + Message::Event(event) => { + let message = + serde_json::to_string(&Message::Event(event)).unwrap(); + + let mut writer = stdout_writer.lock().await; + writer + .write_all( + TransportDelegate::build_rpc_message(message).as_bytes(), + ) + .await + .unwrap(); + writer.flush().await.unwrap(); + } + Message::Response(response) => { + if let Some(handle) = + response_handlers.lock().get(response.command.as_str()) + { + handle(response); + } else { + log::error!("No response handler for {}", response.command); } } } } } - }) - .detach(); + } + }) + .detach(); Ok(( TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),