SSH reconnect reliability (#19398)
Release Notes: - SSH Remoting: Fix message reliability across restarts --------- Co-authored-by: Nathan <nathan@zed.dev>
This commit is contained in:
parent
be474a6d6f
commit
98ecb43b2d
9 changed files with 420 additions and 79 deletions
|
@ -19,6 +19,7 @@ test-support = ["fs/test-support"]
|
|||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
collections.workspace = true
|
||||
fs.workspace = true
|
||||
futures.workspace = true
|
||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
|||
proxy::ProxyLaunchError,
|
||||
};
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use async_trait::async_trait;
|
||||
use collections::HashMap;
|
||||
use futures::{
|
||||
channel::{
|
||||
|
@ -31,6 +32,7 @@ use smol::{
|
|||
};
|
||||
use std::{
|
||||
any::TypeId,
|
||||
collections::VecDeque,
|
||||
ffi::OsStr,
|
||||
fmt,
|
||||
ops::ControlFlow,
|
||||
|
@ -276,7 +278,7 @@ async fn run_cmd(command: &mut process::Command) -> Result<String> {
|
|||
}
|
||||
}
|
||||
|
||||
struct ChannelForwarder {
|
||||
pub struct ChannelForwarder {
|
||||
quit_tx: UnboundedSender<()>,
|
||||
forwarding_task: Task<(UnboundedSender<Envelope>, UnboundedReceiver<Envelope>)>,
|
||||
}
|
||||
|
@ -347,7 +349,7 @@ const MAX_RECONNECT_ATTEMPTS: usize = 3;
|
|||
enum State {
|
||||
Connecting,
|
||||
Connected {
|
||||
ssh_connection: SshRemoteConnection,
|
||||
ssh_connection: Box<dyn SshRemoteProcess>,
|
||||
delegate: Arc<dyn SshClientDelegate>,
|
||||
forwarder: ChannelForwarder,
|
||||
|
||||
|
@ -357,7 +359,7 @@ enum State {
|
|||
HeartbeatMissed {
|
||||
missed_heartbeats: usize,
|
||||
|
||||
ssh_connection: SshRemoteConnection,
|
||||
ssh_connection: Box<dyn SshRemoteProcess>,
|
||||
delegate: Arc<dyn SshClientDelegate>,
|
||||
forwarder: ChannelForwarder,
|
||||
|
||||
|
@ -366,7 +368,7 @@ enum State {
|
|||
},
|
||||
Reconnecting,
|
||||
ReconnectFailed {
|
||||
ssh_connection: SshRemoteConnection,
|
||||
ssh_connection: Box<dyn SshRemoteProcess>,
|
||||
delegate: Arc<dyn SshClientDelegate>,
|
||||
forwarder: ChannelForwarder,
|
||||
|
||||
|
@ -392,11 +394,11 @@ impl fmt::Display for State {
|
|||
}
|
||||
|
||||
impl State {
|
||||
fn ssh_connection(&self) -> Option<&SshRemoteConnection> {
|
||||
fn ssh_connection(&self) -> Option<&dyn SshRemoteProcess> {
|
||||
match self {
|
||||
Self::Connected { ssh_connection, .. } => Some(ssh_connection),
|
||||
Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection),
|
||||
Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection),
|
||||
Self::Connected { ssh_connection, .. } => Some(ssh_connection.as_ref()),
|
||||
Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.as_ref()),
|
||||
Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.as_ref()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -541,23 +543,19 @@ impl SshRemoteClient {
|
|||
let (proxy, proxy_incoming_tx, proxy_outgoing_rx) =
|
||||
ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx);
|
||||
|
||||
let (ssh_connection, ssh_proxy_process) = Self::establish_connection(
|
||||
let (ssh_connection, io_task) = Self::establish_connection(
|
||||
unique_identifier,
|
||||
false,
|
||||
connection_options,
|
||||
proxy_incoming_tx,
|
||||
proxy_outgoing_rx,
|
||||
connection_activity_tx,
|
||||
delegate.clone(),
|
||||
&mut cx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let multiplex_task = Self::multiplex(
|
||||
this.downgrade(),
|
||||
ssh_proxy_process,
|
||||
proxy_incoming_tx,
|
||||
proxy_outgoing_rx,
|
||||
connection_activity_tx,
|
||||
&mut cx,
|
||||
);
|
||||
let multiplex_task = Self::monitor(this.downgrade(), io_task, &cx);
|
||||
|
||||
if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
|
||||
log::error!("failed to establish connection: {}", error);
|
||||
|
@ -703,30 +701,24 @@ impl SshRemoteClient {
|
|||
};
|
||||
}
|
||||
|
||||
if let Err(error) = ssh_connection.master_process.kill() {
|
||||
if let Err(error) = ssh_connection.kill().await.context("Failed to kill ssh process") {
|
||||
failed!(error, attempts, ssh_connection, delegate, forwarder);
|
||||
};
|
||||
|
||||
if let Err(error) = ssh_connection
|
||||
.master_process
|
||||
.status()
|
||||
.await
|
||||
.context("Failed to kill ssh process")
|
||||
{
|
||||
failed!(error, attempts, ssh_connection, delegate, forwarder);
|
||||
}
|
||||
|
||||
let connection_options = ssh_connection.socket.connection_options.clone();
|
||||
let connection_options = ssh_connection.connection_options();
|
||||
|
||||
let (incoming_tx, outgoing_rx) = forwarder.into_channels().await;
|
||||
let (forwarder, proxy_incoming_tx, proxy_outgoing_rx) =
|
||||
ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx);
|
||||
let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
|
||||
|
||||
let (ssh_connection, ssh_process) = match Self::establish_connection(
|
||||
let (ssh_connection, io_task) = match Self::establish_connection(
|
||||
identifier,
|
||||
true,
|
||||
connection_options,
|
||||
proxy_incoming_tx,
|
||||
proxy_outgoing_rx,
|
||||
connection_activity_tx,
|
||||
delegate.clone(),
|
||||
&mut cx,
|
||||
)
|
||||
|
@ -738,16 +730,9 @@ impl SshRemoteClient {
|
|||
}
|
||||
};
|
||||
|
||||
let multiplex_task = Self::multiplex(
|
||||
this.clone(),
|
||||
ssh_process,
|
||||
proxy_incoming_tx,
|
||||
proxy_outgoing_rx,
|
||||
connection_activity_tx,
|
||||
&mut cx,
|
||||
);
|
||||
let multiplex_task = Self::monitor(this.clone(), io_task, &cx);
|
||||
|
||||
if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
|
||||
if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
|
||||
failed!(error, attempts, ssh_connection, delegate, forwarder);
|
||||
};
|
||||
|
||||
|
@ -911,18 +896,17 @@ impl SshRemoteClient {
|
|||
}
|
||||
|
||||
fn multiplex(
|
||||
this: WeakModel<Self>,
|
||||
mut ssh_proxy_process: Child,
|
||||
incoming_tx: UnboundedSender<Envelope>,
|
||||
mut outgoing_rx: UnboundedReceiver<Envelope>,
|
||||
mut connection_activity_tx: Sender<()>,
|
||||
cx: &AsyncAppContext,
|
||||
) -> Task<Result<()>> {
|
||||
) -> Task<Result<Option<i32>>> {
|
||||
let mut child_stderr = ssh_proxy_process.stderr.take().unwrap();
|
||||
let mut child_stdout = ssh_proxy_process.stdout.take().unwrap();
|
||||
let mut child_stdin = ssh_proxy_process.stdin.take().unwrap();
|
||||
|
||||
let io_task = cx.background_executor().spawn(async move {
|
||||
cx.background_executor().spawn(async move {
|
||||
let mut stdin_buffer = Vec::new();
|
||||
let mut stdout_buffer = Vec::new();
|
||||
let mut stderr_buffer = Vec::new();
|
||||
|
@ -1001,8 +985,14 @@ impl SshRemoteClient {
|
|||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
fn monitor(
|
||||
this: WeakModel<Self>,
|
||||
io_task: Task<Result<Option<i32>>>,
|
||||
cx: &AsyncAppContext,
|
||||
) -> Task<Result<()>> {
|
||||
cx.spawn(|mut cx| async move {
|
||||
let result = io_task.await;
|
||||
|
||||
|
@ -1061,21 +1051,40 @@ impl SshRemoteClient {
|
|||
cx.notify();
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn establish_connection(
|
||||
unique_identifier: String,
|
||||
reconnect: bool,
|
||||
connection_options: SshConnectionOptions,
|
||||
proxy_incoming_tx: UnboundedSender<Envelope>,
|
||||
proxy_outgoing_rx: UnboundedReceiver<Envelope>,
|
||||
connection_activity_tx: Sender<()>,
|
||||
delegate: Arc<dyn SshClientDelegate>,
|
||||
cx: &mut AsyncAppContext,
|
||||
) -> Result<(SshRemoteConnection, Child)> {
|
||||
) -> Result<(Box<dyn SshRemoteProcess>, Task<Result<Option<i32>>>)> {
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
if let Some(fake) = fake::SshRemoteConnection::new(&connection_options) {
|
||||
let io_task = fake::SshRemoteConnection::multiplex(
|
||||
fake.connection_options(),
|
||||
proxy_incoming_tx,
|
||||
proxy_outgoing_rx,
|
||||
connection_activity_tx,
|
||||
cx,
|
||||
)
|
||||
.await;
|
||||
return Ok((fake, io_task));
|
||||
}
|
||||
|
||||
let ssh_connection =
|
||||
SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?;
|
||||
|
||||
let platform = ssh_connection.query_platform().await?;
|
||||
let remote_binary_path = delegate.remote_server_binary_path(platform, cx)?;
|
||||
ssh_connection
|
||||
.ensure_server_binary(&delegate, &remote_binary_path, platform, cx)
|
||||
.await?;
|
||||
if !reconnect {
|
||||
ssh_connection
|
||||
.ensure_server_binary(&delegate, &remote_binary_path, platform, cx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let socket = ssh_connection.socket.clone();
|
||||
run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?;
|
||||
|
@ -1100,7 +1109,15 @@ impl SshRemoteClient {
|
|||
.spawn()
|
||||
.context("failed to spawn remote server")?;
|
||||
|
||||
Ok((ssh_connection, ssh_proxy_process))
|
||||
let io_task = Self::multiplex(
|
||||
ssh_proxy_process,
|
||||
proxy_incoming_tx,
|
||||
proxy_outgoing_rx,
|
||||
connection_activity_tx,
|
||||
&cx,
|
||||
);
|
||||
|
||||
Ok((Box::new(ssh_connection), io_task))
|
||||
}
|
||||
|
||||
pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
|
||||
|
@ -1112,7 +1129,7 @@ impl SshRemoteClient {
|
|||
.lock()
|
||||
.as_ref()
|
||||
.and_then(|state| state.ssh_connection())
|
||||
.map(|ssh_connection| ssh_connection.socket.ssh_args())
|
||||
.map(|ssh_connection| ssh_connection.ssh_args())
|
||||
}
|
||||
|
||||
pub fn proto_client(&self) -> AnyProtoClient {
|
||||
|
@ -1127,7 +1144,6 @@ impl SshRemoteClient {
|
|||
self.connection_options.clone()
|
||||
}
|
||||
|
||||
#[cfg(not(any(test, feature = "test-support")))]
|
||||
pub fn connection_state(&self) -> ConnectionState {
|
||||
self.state
|
||||
.lock()
|
||||
|
@ -1136,37 +1152,69 @@ impl SshRemoteClient {
|
|||
.unwrap_or(ConnectionState::Disconnected)
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub fn connection_state(&self) -> ConnectionState {
|
||||
ConnectionState::Connected
|
||||
}
|
||||
|
||||
pub fn is_disconnected(&self) -> bool {
|
||||
self.connection_state() == ConnectionState::Disconnected
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub fn fake(
|
||||
client_cx: &mut gpui::TestAppContext,
|
||||
pub fn simulate_disconnect(&self, cx: &mut AppContext) -> Task<()> {
|
||||
use gpui::BorrowAppContext;
|
||||
|
||||
let port = self.connection_options().port.unwrap();
|
||||
|
||||
let disconnect =
|
||||
cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.take(port).into_channels());
|
||||
cx.spawn(|mut cx| async move {
|
||||
let (input_rx, output_tx) = disconnect.await;
|
||||
let (forwarder, _, _) = ChannelForwarder::new(input_rx, output_tx, &mut cx);
|
||||
cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.replace(port, forwarder))
|
||||
.unwrap()
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub fn fake_server(
|
||||
server_cx: &mut gpui::TestAppContext,
|
||||
) -> (Model<Self>, Arc<ChannelClient>) {
|
||||
use gpui::Context;
|
||||
) -> (ChannelForwarder, Arc<ChannelClient>) {
|
||||
server_cx.update(|cx| {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
|
||||
let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
|
||||
|
||||
let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded();
|
||||
let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded();
|
||||
// We use the forwarder on the server side (in production we only use one on the client side)
|
||||
// the idea is that we can simulate a disconnect/reconnect by just messing with the forwarder.
|
||||
let (forwarder, _, _) =
|
||||
ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx.to_async());
|
||||
|
||||
(
|
||||
client_cx.update(|cx| {
|
||||
let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx);
|
||||
cx.new_model(|_| Self {
|
||||
client,
|
||||
unique_identifier: "fake".to_string(),
|
||||
connection_options: SshConnectionOptions::default(),
|
||||
state: Arc::new(Mutex::new(None)),
|
||||
})
|
||||
}),
|
||||
server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)),
|
||||
)
|
||||
let client = ChannelClient::new(incoming_rx, outgoing_tx, cx);
|
||||
(forwarder, client)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub async fn fake_client(
|
||||
forwarder: ChannelForwarder,
|
||||
client_cx: &mut gpui::TestAppContext,
|
||||
) -> Model<Self> {
|
||||
use gpui::BorrowAppContext;
|
||||
client_cx
|
||||
.update(|cx| {
|
||||
let port = cx.update_default_global(|c: &mut fake::GlobalConnections, _cx| {
|
||||
c.push(forwarder)
|
||||
});
|
||||
|
||||
Self::new(
|
||||
"fake".to_string(),
|
||||
SshConnectionOptions {
|
||||
host: "<fake>".to_string(),
|
||||
port: Some(port),
|
||||
..Default::default()
|
||||
},
|
||||
Arc::new(fake::Delegate),
|
||||
cx,
|
||||
)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1176,6 +1224,13 @@ impl From<SshRemoteClient> for AnyProtoClient {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
trait SshRemoteProcess: Send + Sync {
|
||||
async fn kill(&mut self) -> Result<()>;
|
||||
fn ssh_args(&self) -> Vec<String>;
|
||||
fn connection_options(&self) -> SshConnectionOptions;
|
||||
}
|
||||
|
||||
struct SshRemoteConnection {
|
||||
socket: SshSocket,
|
||||
master_process: process::Child,
|
||||
|
@ -1190,6 +1245,25 @@ impl Drop for SshRemoteConnection {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SshRemoteProcess for SshRemoteConnection {
|
||||
async fn kill(&mut self) -> Result<()> {
|
||||
self.master_process.kill()?;
|
||||
|
||||
self.master_process.status().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ssh_args(&self) -> Vec<String> {
|
||||
self.socket.ssh_args()
|
||||
}
|
||||
|
||||
fn connection_options(&self) -> SshConnectionOptions {
|
||||
self.socket.connection_options.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl SshRemoteConnection {
|
||||
#[cfg(not(unix))]
|
||||
async fn new(
|
||||
|
@ -1472,8 +1546,10 @@ type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, ones
|
|||
pub struct ChannelClient {
|
||||
next_message_id: AtomicU32,
|
||||
outgoing_tx: mpsc::UnboundedSender<Envelope>,
|
||||
response_channels: ResponseChannels, // Lock
|
||||
message_handlers: Mutex<ProtoMessageHandlerSet>, // Lock
|
||||
buffer: Mutex<VecDeque<Envelope>>,
|
||||
response_channels: ResponseChannels,
|
||||
message_handlers: Mutex<ProtoMessageHandlerSet>,
|
||||
max_received: AtomicU32,
|
||||
}
|
||||
|
||||
impl ChannelClient {
|
||||
|
@ -1485,8 +1561,10 @@ impl ChannelClient {
|
|||
let this = Arc::new(Self {
|
||||
outgoing_tx,
|
||||
next_message_id: AtomicU32::new(0),
|
||||
max_received: AtomicU32::new(0),
|
||||
response_channels: ResponseChannels::default(),
|
||||
message_handlers: Default::default(),
|
||||
buffer: Mutex::new(VecDeque::new()),
|
||||
});
|
||||
|
||||
Self::start_handling_messages(this.clone(), incoming_rx, cx);
|
||||
|
@ -1507,6 +1585,25 @@ impl ChannelClient {
|
|||
let Some(this) = this.upgrade() else {
|
||||
return anyhow::Ok(());
|
||||
};
|
||||
if let Some(ack_id) = incoming.ack_id {
|
||||
let mut buffer = this.buffer.lock();
|
||||
while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
|
||||
buffer.pop_front();
|
||||
}
|
||||
}
|
||||
if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload {
|
||||
{
|
||||
let buffer = this.buffer.lock();
|
||||
for envelope in buffer.iter() {
|
||||
this.outgoing_tx.unbounded_send(envelope.clone()).ok();
|
||||
}
|
||||
}
|
||||
let response = proto::Ack{}.into_envelope(0, Some(incoming.id), None);
|
||||
this.send_dynamic(response).ok();
|
||||
continue;
|
||||
}
|
||||
|
||||
this.max_received.store(incoming.id, SeqCst);
|
||||
|
||||
if let Some(request_id) = incoming.responding_to {
|
||||
let request_id = MessageId(request_id);
|
||||
|
@ -1583,6 +1680,23 @@ impl ChannelClient {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn resync(&self, timeout: Duration) -> Result<()> {
|
||||
smol::future::or(
|
||||
async {
|
||||
self.request(proto::FlushBufferedMessages {}).await?;
|
||||
for envelope in self.buffer.lock().iter() {
|
||||
self.outgoing_tx.unbounded_send(envelope.clone()).ok();
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
async {
|
||||
smol::Timer::after(timeout).await;
|
||||
Err(anyhow!("Timeout detected"))
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn ping(&self, timeout: Duration) -> Result<()> {
|
||||
smol::future::or(
|
||||
async {
|
||||
|
@ -1612,7 +1726,8 @@ impl ChannelClient {
|
|||
let mut response_channels_lock = self.response_channels.lock();
|
||||
response_channels_lock.insert(MessageId(envelope.id), tx);
|
||||
drop(response_channels_lock);
|
||||
let result = self.outgoing_tx.unbounded_send(envelope);
|
||||
|
||||
let result = self.send_buffered(envelope);
|
||||
async move {
|
||||
if let Err(error) = &result {
|
||||
log::error!("failed to send message: {}", error);
|
||||
|
@ -1629,6 +1744,12 @@ impl ChannelClient {
|
|||
|
||||
pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
|
||||
envelope.id = self.next_message_id.fetch_add(1, SeqCst);
|
||||
self.send_buffered(envelope)
|
||||
}
|
||||
|
||||
pub fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
|
||||
envelope.ack_id = Some(self.max_received.load(SeqCst));
|
||||
self.buffer.lock().push_back(envelope.clone());
|
||||
self.outgoing_tx.unbounded_send(envelope)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1659,3 +1780,165 @@ impl ProtoClient for ChannelClient {
|
|||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
mod fake {
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use futures::{
|
||||
channel::{
|
||||
mpsc::{self, Sender},
|
||||
oneshot,
|
||||
},
|
||||
select_biased, FutureExt, SinkExt, StreamExt,
|
||||
};
|
||||
use gpui::{AsyncAppContext, BorrowAppContext, Global, SemanticVersion, Task};
|
||||
use rpc::proto::Envelope;
|
||||
|
||||
use super::{
|
||||
ChannelForwarder, SshClientDelegate, SshConnectionOptions, SshPlatform, SshRemoteProcess,
|
||||
};
|
||||
|
||||
pub(super) struct SshRemoteConnection {
|
||||
connection_options: SshConnectionOptions,
|
||||
}
|
||||
|
||||
impl SshRemoteConnection {
|
||||
pub(super) fn new(
|
||||
connection_options: &SshConnectionOptions,
|
||||
) -> Option<Box<dyn SshRemoteProcess>> {
|
||||
if connection_options.host == "<fake>" {
|
||||
return Some(Box::new(Self {
|
||||
connection_options: connection_options.clone(),
|
||||
}));
|
||||
}
|
||||
return None;
|
||||
}
|
||||
pub(super) async fn multiplex(
|
||||
connection_options: SshConnectionOptions,
|
||||
mut client_tx: mpsc::UnboundedSender<Envelope>,
|
||||
mut client_rx: mpsc::UnboundedReceiver<Envelope>,
|
||||
mut connection_activity_tx: Sender<()>,
|
||||
cx: &mut AsyncAppContext,
|
||||
) -> Task<Result<Option<i32>>> {
|
||||
let (server_tx, server_rx) = cx
|
||||
.update(|cx| {
|
||||
cx.update_global(|conns: &mut GlobalConnections, _| {
|
||||
conns.take(connection_options.port.unwrap())
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
.into_channels()
|
||||
.await;
|
||||
|
||||
let (forwarder, mut proxy_tx, mut proxy_rx) =
|
||||
ChannelForwarder::new(server_tx, server_rx, cx);
|
||||
|
||||
cx.update(|cx| {
|
||||
cx.update_global(|conns: &mut GlobalConnections, _| {
|
||||
conns.replace(connection_options.port.unwrap(), forwarder)
|
||||
})
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
cx.background_executor().spawn(async move {
|
||||
loop {
|
||||
select_biased! {
|
||||
server_to_client = proxy_rx.next().fuse() => {
|
||||
let Some(server_to_client) = server_to_client else {
|
||||
return Ok(Some(1))
|
||||
};
|
||||
connection_activity_tx.try_send(()).ok();
|
||||
client_tx.send(server_to_client).await.ok();
|
||||
}
|
||||
client_to_server = client_rx.next().fuse() => {
|
||||
let Some(client_to_server) = client_to_server else {
|
||||
return Ok(None)
|
||||
};
|
||||
proxy_tx.send(client_to_server).await.ok();
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SshRemoteProcess for SshRemoteConnection {
|
||||
async fn kill(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ssh_args(&self) -> Vec<String> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn connection_options(&self) -> SshConnectionOptions {
|
||||
self.connection_options.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(super) struct GlobalConnections(Vec<Option<ChannelForwarder>>);
|
||||
impl Global for GlobalConnections {}
|
||||
|
||||
impl GlobalConnections {
|
||||
pub(super) fn push(&mut self, forwarder: ChannelForwarder) -> u16 {
|
||||
self.0.push(Some(forwarder));
|
||||
self.0.len() as u16 - 1
|
||||
}
|
||||
|
||||
pub(super) fn take(&mut self, port: u16) -> ChannelForwarder {
|
||||
self.0
|
||||
.get_mut(port as usize)
|
||||
.expect("no fake server for port")
|
||||
.take()
|
||||
.expect("fake server is already borrowed")
|
||||
}
|
||||
pub(super) fn replace(&mut self, port: u16, forwarder: ChannelForwarder) {
|
||||
let ret = self
|
||||
.0
|
||||
.get_mut(port as usize)
|
||||
.expect("no fake server for port")
|
||||
.replace(forwarder);
|
||||
if ret.is_some() {
|
||||
panic!("fake server is already replaced");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct Delegate;
|
||||
|
||||
impl SshClientDelegate for Delegate {
|
||||
fn ask_password(
|
||||
&self,
|
||||
_: String,
|
||||
_: &mut AsyncAppContext,
|
||||
) -> oneshot::Receiver<Result<String>> {
|
||||
unreachable!()
|
||||
}
|
||||
fn remote_server_binary_path(
|
||||
&self,
|
||||
_: SshPlatform,
|
||||
_: &mut AsyncAppContext,
|
||||
) -> Result<PathBuf> {
|
||||
unreachable!()
|
||||
}
|
||||
fn get_server_binary(
|
||||
&self,
|
||||
_: SshPlatform,
|
||||
_: &mut AsyncAppContext,
|
||||
) -> oneshot::Receiver<Result<(PathBuf, SemanticVersion)>> {
|
||||
unreachable!()
|
||||
}
|
||||
fn set_status(&self, _: Option<&str>, _: &mut AsyncAppContext) {
|
||||
unreachable!()
|
||||
}
|
||||
fn set_error(&self, _: String, _: &mut AsyncAppContext) {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue