ssh remoting: Show error if opening connection timed out (#18401)
This shows an error if opening a connection to a remote host didn't work in the timeout of 10s (maybe we'll need to make that configurable in the future? for now it seems fine.)  Release Notes: - N/A --------- Co-authored-by: Bennet <bennet@zed.dev> Co-authored-by: Conrad <conrad@zed.dev>
This commit is contained in:
parent
8559731e0d
commit
5199135b54
2 changed files with 115 additions and 41 deletions
|
@ -129,6 +129,7 @@ pub trait SshClientDelegate {
|
|||
cx: &mut AsyncAppContext,
|
||||
) -> oneshot::Receiver<Result<(PathBuf, SemanticVersion)>>;
|
||||
fn set_status(&self, status: Option<&str>, cx: &mut AsyncAppContext);
|
||||
fn set_error(&self, error_message: String, cx: &mut AsyncAppContext);
|
||||
}
|
||||
|
||||
type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
|
||||
|
@ -208,16 +209,16 @@ impl SshSession {
|
|||
|
||||
result = child_stdout.read(&mut stdout_buffer).fuse() => {
|
||||
match result {
|
||||
Ok(len) => {
|
||||
if len == 0 {
|
||||
child_stdin.close().await?;
|
||||
let status = remote_server_child.status().await?;
|
||||
if !status.success() {
|
||||
log::info!("channel exited with status: {status:?}");
|
||||
}
|
||||
return Ok(());
|
||||
Ok(0) => {
|
||||
child_stdin.close().await?;
|
||||
outgoing_rx.close();
|
||||
let status = remote_server_child.status().await?;
|
||||
if !status.success() {
|
||||
log::error!("channel exited with status: {status:?}");
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
Ok(len) => {
|
||||
if len < stdout_buffer.len() {
|
||||
child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
|
||||
}
|
||||
|
@ -419,8 +420,13 @@ impl SshSession {
|
|||
let mut response_channels_lock = self.response_channels.lock();
|
||||
response_channels_lock.insert(MessageId(envelope.id), tx);
|
||||
drop(response_channels_lock);
|
||||
self.outgoing_tx.unbounded_send(envelope).ok();
|
||||
let result = self.outgoing_tx.unbounded_send(envelope);
|
||||
async move {
|
||||
if let Err(error) = &result {
|
||||
log::error!("failed to send message: {}", error);
|
||||
return Err(anyhow!("failed to send message: {}", error));
|
||||
}
|
||||
|
||||
let response = rx.await.context("connection lost")?.0;
|
||||
if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
|
||||
return Err(RpcError::from_proto(error, type_name));
|
||||
|
@ -525,22 +531,25 @@ impl SshClientState {
|
|||
let listener =
|
||||
UnixListener::bind(&askpass_socket).context("failed to create askpass socket")?;
|
||||
|
||||
let askpass_task = cx.spawn(|mut cx| async move {
|
||||
while let Ok((mut stream, _)) = listener.accept().await {
|
||||
let mut buffer = Vec::new();
|
||||
let mut reader = BufReader::new(&mut stream);
|
||||
if reader.read_until(b'\0', &mut buffer).await.is_err() {
|
||||
buffer.clear();
|
||||
}
|
||||
let password_prompt = String::from_utf8_lossy(&buffer);
|
||||
if let Some(password) = delegate
|
||||
.ask_password(password_prompt.to_string(), &mut cx)
|
||||
.await
|
||||
.context("failed to get ssh password")
|
||||
.and_then(|p| p)
|
||||
.log_err()
|
||||
{
|
||||
stream.write_all(password.as_bytes()).await.log_err();
|
||||
let askpass_task = cx.spawn({
|
||||
let delegate = delegate.clone();
|
||||
|mut cx| async move {
|
||||
while let Ok((mut stream, _)) = listener.accept().await {
|
||||
let mut buffer = Vec::new();
|
||||
let mut reader = BufReader::new(&mut stream);
|
||||
if reader.read_until(b'\0', &mut buffer).await.is_err() {
|
||||
buffer.clear();
|
||||
}
|
||||
let password_prompt = String::from_utf8_lossy(&buffer);
|
||||
if let Some(password) = delegate
|
||||
.ask_password(password_prompt.to_string(), &mut cx)
|
||||
.await
|
||||
.context("failed to get ssh password")
|
||||
.and_then(|p| p)
|
||||
.log_err()
|
||||
{
|
||||
stream.write_all(password.as_bytes()).await.log_err();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -575,7 +584,22 @@ impl SshClientState {
|
|||
// has completed.
|
||||
let stdout = master_process.stdout.as_mut().unwrap();
|
||||
let mut output = Vec::new();
|
||||
stdout.read_to_end(&mut output).await?;
|
||||
let connection_timeout = std::time::Duration::from_secs(10);
|
||||
let result = read_with_timeout(stdout, connection_timeout, &mut output).await;
|
||||
if let Err(e) = result {
|
||||
let error_message = if e.kind() == std::io::ErrorKind::TimedOut {
|
||||
format!(
|
||||
"Failed to connect to host. Timed out after {:?}.",
|
||||
connection_timeout
|
||||
)
|
||||
} else {
|
||||
format!("Failed to connect to host: {}.", e)
|
||||
};
|
||||
|
||||
delegate.set_error(error_message, cx);
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
drop(askpass_task);
|
||||
|
||||
if master_process.try_status()?.is_some() {
|
||||
|
@ -716,6 +740,29 @@ impl SshClientState {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn read_with_timeout(
|
||||
stdout: &mut process::ChildStdout,
|
||||
timeout: std::time::Duration,
|
||||
output: &mut Vec<u8>,
|
||||
) -> Result<(), std::io::Error> {
|
||||
smol::future::or(
|
||||
async {
|
||||
stdout.read_to_end(output).await?;
|
||||
Ok::<_, std::io::Error>(())
|
||||
},
|
||||
async {
|
||||
smol::Timer::after(timeout).await;
|
||||
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"Read operation timed out",
|
||||
))
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
impl Drop for SshClientState {
|
||||
fn drop(&mut self) {
|
||||
if let Err(error) = self.master_process.kill() {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue