debugger: Fix connections over SSH (#32834)

Before this change, we would see "connection reset" when sending the
initialize
request over SSH in the case that the debug adapter was slow to boot.

(Although we'd have successfully created a connection to the local SSH
port,
trying to read/write from it would not work until the remote end of the
connection had been established)

Fixes  #32575

Release Notes:

- debugger: Fix connecting to a Python debugger over SSH
This commit is contained in:
Conrad Irwin 2025-06-17 00:48:17 -06:00 committed by GitHub
parent baf4abe101
commit 109651e6e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 443 additions and 486 deletions

View file

@ -8,8 +8,7 @@ use dap_types::{
requests::Request,
};
use futures::channel::oneshot;
use gpui::{AppContext, AsyncApp};
use smol::channel::{Receiver, Sender};
use gpui::AsyncApp;
use std::{
hash::Hash,
sync::atomic::{AtomicU64, Ordering},
@ -44,99 +43,56 @@ impl DebugAdapterClient {
id: SessionId,
binary: DebugAdapterBinary,
message_handler: DapMessageHandler,
cx: AsyncApp,
cx: &mut AsyncApp,
) -> Result<Self> {
let ((server_rx, server_tx), transport_delegate) =
TransportDelegate::start(&binary, cx.clone()).await?;
let transport_delegate = TransportDelegate::start(&binary, cx).await?;
let this = Self {
id,
binary,
transport_delegate,
sequence_count: AtomicU64::new(1),
};
log::info!("Successfully connected to debug adapter");
let client_id = this.id;
// start handling events/reverse requests
cx.background_spawn(Self::handle_receive_messages(
client_id,
server_rx,
server_tx.clone(),
message_handler,
))
.detach();
this.connect(message_handler, cx).await?;
Ok(this)
}
pub async fn reconnect(
pub fn should_reconnect_for_ssh(&self) -> bool {
self.transport_delegate.tcp_arguments().is_some()
&& self.binary.command.as_deref() == Some("ssh")
}
pub async fn connect(
&self,
message_handler: DapMessageHandler,
cx: &mut AsyncApp,
) -> Result<()> {
self.transport_delegate.connect(message_handler, cx).await
}
pub async fn create_child_connection(
&self,
session_id: SessionId,
binary: DebugAdapterBinary,
message_handler: DapMessageHandler,
cx: AsyncApp,
cx: &mut AsyncApp,
) -> Result<Self> {
let binary = match self.transport_delegate.transport() {
crate::transport::Transport::Tcp(tcp_transport) => DebugAdapterBinary {
let binary = if let Some(connection) = self.transport_delegate.tcp_arguments() {
DebugAdapterBinary {
command: None,
arguments: Default::default(),
envs: Default::default(),
cwd: Default::default(),
connection: Some(crate::adapters::TcpArguments {
host: tcp_transport.host,
port: tcp_transport.port,
timeout: Some(tcp_transport.timeout),
}),
connection: Some(connection),
request_args: binary.request_args,
},
_ => self.binary.clone(),
}
} else {
self.binary.clone()
};
Self::start(session_id, binary, message_handler, cx).await
}
async fn handle_receive_messages(
client_id: SessionId,
server_rx: Receiver<Message>,
client_tx: Sender<Message>,
mut message_handler: DapMessageHandler,
) -> Result<()> {
let result = loop {
let message = match server_rx.recv().await {
Ok(message) => message,
Err(e) => break Err(e.into()),
};
match message {
Message::Event(ev) => {
log::debug!("Client {} received event `{}`", client_id.0, &ev);
message_handler(Message::Event(ev))
}
Message::Request(req) => {
log::debug!(
"Client {} received reverse request `{}`",
client_id.0,
&req.command
);
message_handler(Message::Request(req))
}
Message::Response(response) => {
log::debug!("Received response after request timeout: {:#?}", response);
}
}
smol::future::yield_now().await;
};
drop(client_tx);
log::debug!("Handle receive messages dropped");
result
}
/// Send a request to an adapter and get a response back
/// Note: This function will block until a response is sent back from the adapter
pub async fn request<R: Request>(&self, arguments: R::Arguments) -> Result<R::Response> {
@ -152,8 +108,7 @@ impl DebugAdapterClient {
arguments: Some(serialized_arguments),
};
self.transport_delegate
.add_pending_request(sequence_id, callback_tx)
.await;
.add_pending_request(sequence_id, callback_tx);
log::debug!(
"Client {} send `{}` request with sequence_id: {}",
@ -230,8 +185,11 @@ impl DebugAdapterClient {
+ Send
+ FnMut(u64, R::Arguments) -> Result<R::Response, dap_types::ErrorResponse>,
{
let transport = self.transport_delegate.transport().as_fake();
transport.on_request::<R, F>(handler);
self.transport_delegate
.transport
.lock()
.as_fake()
.on_request::<R, F>(handler);
}
#[cfg(any(test, feature = "test-support"))]
@ -250,8 +208,11 @@ impl DebugAdapterClient {
where
F: 'static + Send + Fn(Response),
{
let transport = self.transport_delegate.transport().as_fake();
transport.on_response::<R, F>(handler).await;
self.transport_delegate
.transport
.lock()
.as_fake()
.on_response::<R, F>(handler);
}
#[cfg(any(test, feature = "test-support"))]
@ -308,7 +269,7 @@ mod tests {
},
},
Box::new(|_| panic!("Did not expect to hit this code path")),
cx.to_async(),
&mut cx.to_async(),
)
.await
.unwrap();
@ -390,7 +351,7 @@ mod tests {
);
}
}),
cx.to_async(),
&mut cx.to_async(),
)
.await
.unwrap();
@ -448,7 +409,7 @@ mod tests {
);
}
}),
cx.to_async(),
&mut cx.to_async(),
)
.await
.unwrap();

View file

@ -1,16 +1,19 @@
use anyhow::{Context as _, Result, anyhow, bail};
#[cfg(any(test, feature = "test-support"))]
use async_pipe::{PipeReader, PipeWriter};
use dap_types::{
ErrorResponse,
messages::{Message, Response},
};
use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
use gpui::{AppContext as _, AsyncApp, Task};
use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
use parking_lot::Mutex;
use proto::ErrorExt;
use settings::Settings as _;
use smallvec::SmallVec;
use smol::{
channel::{Receiver, Sender, unbounded},
io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
lock::Mutex,
net::{TcpListener, TcpStream},
};
use std::{
@ -23,7 +26,11 @@ use std::{
use task::TcpArgumentsTemplate;
use util::ConnectionResult;
use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
use crate::{
adapters::{DebugAdapterBinary, TcpArguments},
client::DapMessageHandler,
debugger_settings::DebuggerSettings,
};
pub(crate) type IoMessage = str;
pub(crate) type Command = str;
@ -35,232 +42,152 @@ pub enum LogKind {
Rpc,
}
#[derive(Clone, Copy)]
pub enum IoKind {
StdIn,
StdOut,
StdErr,
}
pub struct TransportPipe {
input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
output: Box<dyn AsyncRead + Unpin + Send + 'static>,
stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
}
impl TransportPipe {
pub fn new(
input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
output: Box<dyn AsyncRead + Unpin + Send + 'static>,
stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
) -> Self {
TransportPipe {
input,
output,
stdout,
stderr,
}
}
}
type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
pub enum Transport {
Stdio(StdioTransport),
Tcp(TcpTransport),
pub trait Transport: Send + Sync {
fn has_adapter_logs(&self) -> bool;
fn tcp_arguments(&self) -> Option<TcpArguments>;
fn connect(
&mut self,
) -> Task<
Result<(
Box<dyn AsyncWrite + Unpin + Send + 'static>,
Box<dyn AsyncRead + Unpin + Send + 'static>,
)>,
>;
fn kill(&self);
#[cfg(any(test, feature = "test-support"))]
Fake(FakeTransport),
fn as_fake(&self) -> &FakeTransport {
unreachable!()
}
}
impl Transport {
async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
#[cfg(any(test, feature = "test-support"))]
if cfg!(any(test, feature = "test-support")) {
return FakeTransport::start(cx)
.await
.map(|(transports, fake)| (transports, Self::Fake(fake)));
}
if binary.connection.is_some() {
TcpTransport::start(binary, cx)
.await
.map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
.context("Tried to connect to a debug adapter via TCP transport layer")
} else {
StdioTransport::start(binary, cx)
.await
.map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
.context("Tried to connect to a debug adapter via stdin/stdout transport layer")
}
}
fn has_adapter_logs(&self) -> bool {
match self {
Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
#[cfg(any(test, feature = "test-support"))]
Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
}
}
async fn kill(&self) {
match self {
Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
#[cfg(any(test, feature = "test-support"))]
Transport::Fake(fake_transport) => fake_transport.kill().await,
}
}
async fn start(
binary: &DebugAdapterBinary,
log_handlers: LogHandlers,
cx: &mut AsyncApp,
) -> Result<Box<dyn Transport>> {
#[cfg(any(test, feature = "test-support"))]
pub(crate) fn as_fake(&self) -> &FakeTransport {
match self {
Transport::Fake(fake_transport) => fake_transport,
_ => panic!("Not a fake transport layer"),
}
if cfg!(any(test, feature = "test-support")) {
return Ok(Box::new(FakeTransport::start(cx).await?));
}
if binary.connection.is_some() {
Ok(Box::new(
TcpTransport::start(binary, log_handlers, cx).await?,
))
} else {
Ok(Box::new(
StdioTransport::start(binary, log_handlers, cx).await?,
))
}
}
pub(crate) struct TransportDelegate {
log_handlers: LogHandlers,
current_requests: Requests,
pending_requests: Requests,
transport: Transport,
server_tx: Arc<Mutex<Option<Sender<Message>>>>,
_tasks: Vec<Task<()>>,
pub(crate) transport: Mutex<Box<dyn Transport>>,
server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
tasks: Mutex<Vec<Task<()>>>,
}
impl TransportDelegate {
pub(crate) async fn start(
binary: &DebugAdapterBinary,
cx: AsyncApp,
) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
let mut this = Self {
transport,
pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
let log_handlers: LogHandlers = Default::default();
let transport = start(binary, log_handlers.clone(), cx).await?;
Ok(Self {
transport: Mutex::new(transport),
log_handlers,
server_tx: Default::default(),
log_handlers: Default::default(),
current_requests: Default::default(),
pending_requests: Default::default(),
_tasks: Vec::new(),
};
let messages = this.start_handlers(transport_pipes, cx).await?;
Ok((messages, this))
tasks: Default::default(),
})
}
async fn start_handlers(
&mut self,
mut params: TransportPipe,
cx: AsyncApp,
) -> Result<(Receiver<Message>, Sender<Message>)> {
let (client_tx, server_rx) = unbounded::<Message>();
pub async fn connect(
&self,
message_handler: DapMessageHandler,
cx: &mut AsyncApp,
) -> Result<()> {
let (server_tx, client_rx) = unbounded::<Message>();
self.tasks.lock().clear();
let log_dap_communications =
cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
.with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
.unwrap_or(false);
let connect = self.transport.lock().connect();
let (input, output) = connect.await?;
let log_handler = if log_dap_communications {
Some(self.log_handlers.clone())
} else {
None
};
let adapter_log_handler = log_handler.clone();
cx.update(|cx| {
if let Some(stdout) = params.stdout.take() {
self._tasks.push(cx.background_spawn(async move {
match Self::handle_adapter_log(stdout, adapter_log_handler).await {
ConnectionResult::Timeout => {
log::error!("Timed out when handling debugger log");
}
ConnectionResult::ConnectionReset => {
log::info!("Debugger logs connection closed");
}
ConnectionResult::Result(Ok(())) => {}
ConnectionResult::Result(Err(e)) => {
log::error!("Error handling debugger log: {e}");
}
}
}));
}
let pending_requests = self.pending_requests.clone();
let output_log_handler = log_handler.clone();
self._tasks.push(cx.background_spawn(async move {
match Self::handle_output(
params.output,
client_tx,
let pending_requests = self.pending_requests.clone();
let output_log_handler = log_handler.clone();
{
let mut tasks = self.tasks.lock();
tasks.push(cx.background_spawn(async move {
match Self::recv_from_server(
output,
message_handler,
pending_requests.clone(),
output_log_handler,
)
.await
{
Ok(()) => {}
Err(e) => log::error!("Error handling debugger output: {e}"),
Ok(()) => {
pending_requests.lock().drain().for_each(|(_, request)| {
request
.send(Err(anyhow!("debugger shutdown unexpectedly")))
.ok();
});
}
Err(e) => {
pending_requests.lock().drain().for_each(|(_, request)| {
request.send(Err(e.cloned())).ok();
});
}
}
let mut pending_requests = pending_requests.lock().await;
pending_requests.drain().for_each(|(_, request)| {
request
.send(Err(anyhow!("debugger shutdown unexpectedly")))
.ok();
});
}));
if let Some(stderr) = params.stderr.take() {
let log_handlers = self.log_handlers.clone();
self._tasks.push(cx.background_spawn(async move {
match Self::handle_error(stderr, log_handlers).await {
ConnectionResult::Timeout => {
log::error!("Timed out reading debugger error stream")
}
ConnectionResult::ConnectionReset => {
log::info!("Debugger closed its error stream")
}
ConnectionResult::Result(Ok(())) => {}
ConnectionResult::Result(Err(e)) => {
log::error!("Error handling debugger error: {e}")
}
}
}));
}
let current_requests = self.current_requests.clone();
let pending_requests = self.pending_requests.clone();
let log_handler = log_handler.clone();
self._tasks.push(cx.background_spawn(async move {
match Self::handle_input(
params.input,
client_rx,
current_requests,
pending_requests,
log_handler,
)
.await
{
tasks.push(cx.background_spawn(async move {
match Self::send_to_server(input, client_rx, log_handler).await {
Ok(()) => {}
Err(e) => log::error!("Error handling debugger input: {e}"),
}
}));
})?;
}
{
let mut lock = self.server_tx.lock().await;
*lock = Some(server_tx.clone());
}
Ok((server_rx, server_tx))
Ok(())
}
pub(crate) async fn add_pending_request(
pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
self.transport.lock().tcp_arguments()
}
pub(crate) fn add_pending_request(
&self,
sequence_id: u64,
request: oneshot::Sender<Result<Response>>,
) {
let mut pending_requests = self.pending_requests.lock().await;
let mut pending_requests = self.pending_requests.lock();
pending_requests.insert(sequence_id, request);
}
@ -272,52 +199,41 @@ impl TransportDelegate {
}
}
async fn handle_adapter_log<Stdout>(
stdout: Stdout,
log_handlers: Option<LogHandlers>,
) -> ConnectionResult<()>
where
Stdout: AsyncRead + Unpin + Send + 'static,
{
async fn handle_adapter_log(
stdout: impl AsyncRead + Unpin + Send + 'static,
iokind: IoKind,
log_handlers: LogHandlers,
) {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
let result = loop {
loop {
line.truncate(0);
match reader
.read_line(&mut line)
.await
.context("reading adapter log line")
{
Ok(0) => break ConnectionResult::ConnectionReset,
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {}
Err(e) => break ConnectionResult::Result(Err(e)),
}
if let Some(log_handlers) = log_handlers.as_ref() {
for (kind, handler) in log_handlers.lock().iter_mut() {
if matches!(kind, LogKind::Adapter) {
handler(IoKind::StdOut, None, line.as_str());
}
Err(e) => {
log::debug!("handle_adapter_log: {}", e);
break;
}
}
};
log::debug!("Handle adapter log dropped");
result
for (kind, handler) in log_handlers.lock().iter_mut() {
if matches!(kind, LogKind::Adapter) {
handler(iokind, None, line.as_str());
}
}
}
}
fn build_rpc_message(message: String) -> String {
format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
}
async fn handle_input<Stdin>(
async fn send_to_server<Stdin>(
mut server_stdin: Stdin,
client_rx: Receiver<Message>,
current_requests: Requests,
pending_requests: Requests,
log_handlers: Option<LogHandlers>,
) -> Result<()>
where
@ -326,12 +242,6 @@ impl TransportDelegate {
let result = loop {
match client_rx.recv().await {
Ok(message) => {
if let Message::Request(request) = &message {
if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
pending_requests.lock().await.insert(request.seq, sender);
}
}
let command = match &message {
Message::Request(request) => Some(request.command.as_str()),
Message::Response(response) => Some(response.command.as_str()),
@ -371,9 +281,9 @@ impl TransportDelegate {
result
}
async fn handle_output<Stdout>(
async fn recv_from_server<Stdout>(
server_stdout: Stdout,
client_tx: Sender<Message>,
mut message_handler: DapMessageHandler,
pending_requests: Requests,
log_handlers: Option<LogHandlers>,
) -> Result<()>
@ -393,59 +303,25 @@ impl TransportDelegate {
return Ok(());
}
ConnectionResult::Result(Ok(Message::Response(res))) => {
if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
let tx = pending_requests.lock().remove(&res.request_seq);
if let Some(tx) = tx {
if let Err(e) = tx.send(Self::process_response(res)) {
log::trace!("Did not send response `{:?}` for a cancelled", e);
}
} else {
client_tx.send(Message::Response(res)).await?;
message_handler(Message::Response(res))
}
}
ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
ConnectionResult::Result(Ok(message)) => message_handler(message),
ConnectionResult::Result(Err(e)) => break Err(e),
}
};
drop(client_tx);
log::debug!("Handle adapter output dropped");
result
}
async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
where
Stderr: AsyncRead + Unpin + Send + 'static,
{
log::debug!("Handle error started");
let mut buffer = String::new();
let mut reader = BufReader::new(stderr);
let result = loop {
match reader
.read_line(&mut buffer)
.await
.context("reading error log line")
{
Ok(0) => break ConnectionResult::ConnectionReset,
Ok(_) => {
for (kind, log_handler) in log_handlers.lock().iter_mut() {
if matches!(kind, LogKind::Adapter) {
log_handler(IoKind::StdErr, None, buffer.as_str());
}
}
buffer.truncate(0);
}
Err(error) => break ConnectionResult::Result(Err(error)),
}
};
log::debug!("Handle adapter error dropped");
result
}
fn process_response(response: Response) -> Result<Response> {
if response.success {
Ok(response)
@ -479,14 +355,10 @@ impl TransportDelegate {
loop {
buffer.truncate(0);
match reader
.read_line(buffer)
.await
.with_context(|| "reading a message from server")
{
match reader.read_line(buffer).await {
Ok(0) => return ConnectionResult::ConnectionReset,
Ok(_) => {}
Err(e) => return ConnectionResult::Result(Err(e)),
Err(e) => return ConnectionResult::Result(Err(e.into())),
};
if buffer == "\r\n" {
@ -547,16 +419,8 @@ impl TransportDelegate {
server_tx.close();
}
let mut current_requests = self.current_requests.lock().await;
let mut pending_requests = self.pending_requests.lock().await;
current_requests.clear();
pending_requests.clear();
self.transport.kill().await;
drop(current_requests);
drop(pending_requests);
self.pending_requests.lock().clear();
self.transport.lock().kill();
log::debug!("Shutdown client completed");
@ -564,11 +428,7 @@ impl TransportDelegate {
}
pub fn has_adapter_logs(&self) -> bool {
self.transport.has_adapter_logs()
}
pub fn transport(&self) -> &Transport {
&self.transport
self.transport.lock().has_adapter_logs()
}
pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
@ -581,10 +441,13 @@ impl TransportDelegate {
}
pub struct TcpTransport {
executor: BackgroundExecutor,
pub port: u16,
pub host: Ipv4Addr,
pub timeout: u64,
process: Option<Mutex<Child>>,
process: Arc<Mutex<Option<Child>>>,
_stderr_task: Option<Task<()>>,
_stdout_task: Option<Task<()>>,
}
impl TcpTransport {
@ -604,7 +467,11 @@ impl TcpTransport {
.port())
}
async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
async fn start(
binary: &DebugAdapterBinary,
log_handlers: LogHandlers,
cx: &mut AsyncApp,
) -> Result<Self> {
let connection_args = binary
.connection
.as_ref()
@ -613,7 +480,11 @@ impl TcpTransport {
let host = connection_args.host;
let port = connection_args.port;
let mut process = if let Some(command) = &binary.command {
let mut process = None;
let mut stdout_task = None;
let mut stderr_task = None;
if let Some(command) = &binary.command {
let mut command = util::command::new_std_command(&command);
if let Some(cwd) = &binary.cwd {
@ -623,101 +494,142 @@ impl TcpTransport {
command.args(&binary.arguments);
command.envs(&binary.envs);
Some(
Child::spawn(command, Stdio::null())
.with_context(|| "failed to start debug adapter.")?,
)
} else {
None
};
let mut p = Child::spawn(command, Stdio::null())
.with_context(|| "failed to start debug adapter.")?;
let address = SocketAddrV4::new(host, port);
stdout_task = p.stdout.take().map(|stdout| {
cx.background_executor()
.spawn(TransportDelegate::handle_adapter_log(
stdout,
IoKind::StdOut,
log_handlers.clone(),
))
});
stderr_task = p.stderr.take().map(|stderr| {
cx.background_executor()
.spawn(TransportDelegate::handle_adapter_log(
stderr,
IoKind::StdErr,
log_handlers,
))
});
process = Some(p);
};
let timeout = connection_args.timeout.unwrap_or_else(|| {
cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
.unwrap_or(2000u64)
.unwrap_or(20000u64)
});
let (mut process, (rx, tx)) = select! {
_ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
},
result = cx.spawn(async move |cx| {
loop {
match TcpStream::connect(address).await {
Ok(stream) => return Ok((process, stream.split())),
Err(_) => {
if let Some(p) = &mut process {
if let Ok(Some(_)) = p.try_status() {
let output = process.take().unwrap().into_inner().output().await?;
let output = if output.stderr.is_empty() {
String::from_utf8_lossy(&output.stdout).to_string()
} else {
String::from_utf8_lossy(&output.stderr).to_string()
};
anyhow::bail!("{output}\nerror: process exited before debugger attached.");
}
}
cx.background_executor().timer(Duration::from_millis(100)).await;
}
}
}
}).fuse() => result?
};
log::info!(
"Debug adapter has connected to TCP server {}:{}",
host,
port
);
let stdout = process.as_mut().and_then(|p| p.stdout.take());
let stderr = process.as_mut().and_then(|p| p.stderr.take());
let this = Self {
executor: cx.background_executor().clone(),
port,
host,
process: process.map(Mutex::new),
process: Arc::new(Mutex::new(process)),
timeout,
_stdout_task: stdout_task,
_stderr_task: stderr_task,
};
let pipe = TransportPipe::new(
Box::new(tx),
Box::new(BufReader::new(rx)),
stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
);
Ok((pipe, this))
Ok(this)
}
}
impl Transport for TcpTransport {
fn has_adapter_logs(&self) -> bool {
true
}
async fn kill(&self) {
if let Some(process) = &self.process {
let mut process = process.lock().await;
Child::kill(&mut process);
fn kill(&self) {
if let Some(process) = &mut *self.process.lock() {
process.kill();
}
}
fn tcp_arguments(&self) -> Option<TcpArguments> {
Some(TcpArguments {
host: self.host,
port: self.port,
timeout: Some(self.timeout),
})
}
fn connect(
&mut self,
) -> Task<
Result<(
Box<dyn AsyncWrite + Unpin + Send + 'static>,
Box<dyn AsyncRead + Unpin + Send + 'static>,
)>,
> {
let executor = self.executor.clone();
let timeout = self.timeout;
let address = SocketAddrV4::new(self.host, self.port);
let process = self.process.clone();
executor.clone().spawn(async move {
select! {
_ = executor.timer(Duration::from_millis(timeout)).fuse() => {
anyhow::bail!("Connection to TCP DAP timeout {address}");
},
result = executor.clone().spawn(async move {
loop {
match TcpStream::connect(address).await {
Ok(stream) => {
let (read, write) = stream.split();
return Ok((Box::new(write) as _, Box::new(read) as _))
},
Err(_) => {
let has_process = process.lock().is_some();
if has_process {
let status = process.lock().as_mut().unwrap().try_status();
if let Ok(Some(_)) = status {
let process = process.lock().take().unwrap().into_inner();
let output = process.output().await?;
let output = if output.stderr.is_empty() {
String::from_utf8_lossy(&output.stdout).to_string()
} else {
String::from_utf8_lossy(&output.stderr).to_string()
};
anyhow::bail!("{output}\nerror: process exited before debugger attached.");
}
}
executor.timer(Duration::from_millis(100)).await;
}
}
}
}).fuse() => result
}
})
}
}
impl Drop for TcpTransport {
fn drop(&mut self) {
if let Some(mut p) = self.process.take() {
p.get_mut().kill();
if let Some(mut p) = self.process.lock().take() {
p.kill();
}
}
}
pub struct StdioTransport {
process: Mutex<Child>,
_stderr_task: Option<Task<()>>,
}
impl StdioTransport {
#[allow(dead_code, reason = "This is used in non test builds of Zed")]
async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
// #[allow(dead_code, reason = "This is used in non test builds of Zed")]
async fn start(
binary: &DebugAdapterBinary,
log_handlers: LogHandlers,
cx: &mut AsyncApp,
) -> Result<Self> {
let Some(binary_command) = &binary.command else {
bail!(
"When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
@ -740,42 +652,52 @@ impl StdioTransport {
)
})?;
let stdin = process.stdin.take().context("Failed to open stdin")?;
let stdout = process.stdout.take().context("Failed to open stdout")?;
let stderr = process
.stderr
.take()
.map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
if stderr.is_none() {
bail!(
"Failed to connect to stderr for debug adapter command {}",
&binary_command
);
}
log::info!("Debug adapter has connected to stdio adapter");
let err_task = process.stderr.take().map(|stderr| {
cx.background_spawn(TransportDelegate::handle_adapter_log(
stderr,
IoKind::StdErr,
log_handlers,
))
});
let process = Mutex::new(process);
Ok((
TransportPipe::new(
Box::new(stdin),
Box::new(BufReader::new(stdout)),
None,
stderr,
),
Self { process },
))
Ok(Self {
process,
_stderr_task: err_task,
})
}
}
impl Transport for StdioTransport {
fn has_adapter_logs(&self) -> bool {
false
}
async fn kill(&self) {
let mut process = self.process.lock().await;
Child::kill(&mut process);
fn kill(&self) {
self.process.lock().kill()
}
fn connect(
&mut self,
) -> Task<
Result<(
Box<dyn AsyncWrite + Unpin + Send + 'static>,
Box<dyn AsyncRead + Unpin + Send + 'static>,
)>,
> {
let mut process = self.process.lock();
let result = util::maybe!({
Ok((
Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
))
});
Task::ready(result)
}
fn tcp_arguments(&self) -> Option<TcpArguments> {
None
}
}
@ -795,9 +717,12 @@ type ResponseHandler = Box<dyn Send + Fn(Response)>;
#[cfg(any(test, feature = "test-support"))]
pub struct FakeTransport {
// for sending fake response back from adapter side
request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
// for reverse request responses
response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
stdin_writer: Option<PipeWriter>,
stdout_reader: Option<PipeReader>,
}
#[cfg(any(test, feature = "test-support"))]
@ -833,7 +758,7 @@ impl FakeTransport {
);
}
pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
where
F: 'static + Send + Fn(Response),
{
@ -842,20 +767,23 @@ impl FakeTransport {
.insert(R::COMMAND, Box::new(handler));
}
async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
let this = Self {
request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
};
async fn start(cx: &mut AsyncApp) -> Result<Self> {
use dap_types::requests::{Request, RunInTerminal, StartDebugging};
use serde_json::json;
let (stdin_writer, stdin_reader) = async_pipe::pipe();
let (stdout_writer, stdout_reader) = async_pipe::pipe();
let this = Self {
request_handlers: Arc::new(Mutex::new(HashMap::default())),
response_handlers: Arc::new(Mutex::new(HashMap::default())),
stdin_writer: Some(stdin_writer),
stdout_reader: Some(stdout_reader),
};
let request_handlers = this.request_handlers.clone();
let response_handlers = this.response_handlers.clone();
let stdout_writer = Arc::new(Mutex::new(stdout_writer));
let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
cx.background_spawn(async move {
let mut reader = BufReader::new(stdin_reader);
@ -945,17 +873,43 @@ impl FakeTransport {
})
.detach();
Ok((
TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
this,
))
Ok(this)
}
}
#[cfg(any(test, feature = "test-support"))]
impl Transport for FakeTransport {
fn tcp_arguments(&self) -> Option<TcpArguments> {
None
}
fn connect(
&mut self,
) -> Task<
Result<(
Box<dyn AsyncWrite + Unpin + Send + 'static>,
Box<dyn AsyncRead + Unpin + Send + 'static>,
)>,
> {
let result = util::maybe!({
Ok((
Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
))
});
Task::ready(result)
}
fn has_adapter_logs(&self) -> bool {
false
}
async fn kill(&self) {}
fn kill(&self) {}
#[cfg(any(test, feature = "test-support"))]
fn as_fake(&self) -> &FakeTransport {
self
}
}
struct Child {