diff --git a/Cargo.lock b/Cargo.lock index a27005e5d1..f32d24dbfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9119,6 +9119,7 @@ name = "remote" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "collections", "fs", "futures 0.3.30", diff --git a/crates/collab/src/tests/remote_editing_collaboration_tests.rs b/crates/collab/src/tests/remote_editing_collaboration_tests.rs index dae3345755..a23aadf5e0 100644 --- a/crates/collab/src/tests/remote_editing_collaboration_tests.rs +++ b/crates/collab/src/tests/remote_editing_collaboration_tests.rs @@ -26,7 +26,7 @@ async fn test_sharing_an_ssh_remote_project( .await; // Set up project on remote FS - let (client_ssh, server_ssh) = SshRemoteClient::fake(cx_a, server_cx); + let (forwarder, server_ssh) = SshRemoteClient::fake_server(server_cx); let remote_fs = FakeFs::new(server_cx.executor()); remote_fs .insert_tree( @@ -67,6 +67,7 @@ async fn test_sharing_an_ssh_remote_project( ) }); + let client_ssh = SshRemoteClient::fake_client(forwarder, cx_a).await; let (project_a, worktree_id) = client_a .build_ssh_project("/code/project1", client_ssh, cx_a) .await; diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 70d5962647..8ea9e78cb7 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1243,6 +1243,10 @@ impl Project { self.client.clone() } + pub fn ssh_client(&self) -> Option> { + self.ssh_client.clone() + } + pub fn user_store(&self) -> Model { self.user_store.clone() } diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index 09891de6fe..e089fcdeba 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -12,6 +12,7 @@ message Envelope { uint32 id = 1; optional uint32 responding_to = 2; optional PeerId original_sender_id = 3; + optional uint32 ack_id = 266; oneof payload { Hello hello = 4; @@ -295,7 +296,9 @@ message Envelope { OpenServerSettings open_server_settings = 263; GetPermalinkToLine get_permalink_to_line = 264; - GetPermalinkToLineResponse get_permalink_to_line_response = 265; // current max + GetPermalinkToLineResponse get_permalink_to_line_response = 265; + + FlushBufferedMessages flush_buffered_messages = 267; } reserved 87 to 88; @@ -2521,3 +2524,6 @@ message GetPermalinkToLine { message GetPermalinkToLineResponse { string permalink = 1; } + +message FlushBufferedMessages {} +message FlushBufferedMessagesResponse {} diff --git a/crates/proto/src/macros.rs b/crates/proto/src/macros.rs index 4fdbfff81b..2ce0c0df25 100644 --- a/crates/proto/src/macros.rs +++ b/crates/proto/src/macros.rs @@ -32,6 +32,7 @@ macro_rules! messages { responding_to, original_sender_id, payload: Some(envelope::Payload::$name(self)), + ack_id: None, } } diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index ffbbeb49c2..8179473fea 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -372,6 +372,7 @@ messages!( (OpenServerSettings, Foreground), (GetPermalinkToLine, Foreground), (GetPermalinkToLineResponse, Foreground), + (FlushBufferedMessages, Foreground), ); request_messages!( @@ -498,6 +499,7 @@ request_messages!( (RemoveWorktree, Ack), (OpenServerSettings, OpenBufferResponse), (GetPermalinkToLine, GetPermalinkToLineResponse), + (FlushBufferedMessages, Ack), ); entity_messages!( diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index b8c5f34cc5..937a69ee59 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -19,6 +19,7 @@ test-support = ["fs/test-support"] [dependencies] anyhow.workspace = true +async-trait.workspace = true collections.workspace = true fs.workspace = true futures.workspace = true diff --git a/crates/remote/src/ssh_session.rs b/crates/remote/src/ssh_session.rs index f7ef74ce39..e4d292d2f8 100644 --- a/crates/remote/src/ssh_session.rs +++ b/crates/remote/src/ssh_session.rs @@ -6,6 +6,7 @@ use crate::{ proxy::ProxyLaunchError, }; use anyhow::{anyhow, Context as _, Result}; +use async_trait::async_trait; use collections::HashMap; use futures::{ channel::{ @@ -31,6 +32,7 @@ use smol::{ }; use std::{ any::TypeId, + collections::VecDeque, ffi::OsStr, fmt, ops::ControlFlow, @@ -276,7 +278,7 @@ async fn run_cmd(command: &mut process::Command) -> Result { } } -struct ChannelForwarder { +pub struct ChannelForwarder { quit_tx: UnboundedSender<()>, forwarding_task: Task<(UnboundedSender, UnboundedReceiver)>, } @@ -347,7 +349,7 @@ const MAX_RECONNECT_ATTEMPTS: usize = 3; enum State { Connecting, Connected { - ssh_connection: SshRemoteConnection, + ssh_connection: Box, delegate: Arc, forwarder: ChannelForwarder, @@ -357,7 +359,7 @@ enum State { HeartbeatMissed { missed_heartbeats: usize, - ssh_connection: SshRemoteConnection, + ssh_connection: Box, delegate: Arc, forwarder: ChannelForwarder, @@ -366,7 +368,7 @@ enum State { }, Reconnecting, ReconnectFailed { - ssh_connection: SshRemoteConnection, + ssh_connection: Box, delegate: Arc, forwarder: ChannelForwarder, @@ -392,11 +394,11 @@ impl fmt::Display for State { } impl State { - fn ssh_connection(&self) -> Option<&SshRemoteConnection> { + fn ssh_connection(&self) -> Option<&dyn SshRemoteProcess> { match self { - Self::Connected { ssh_connection, .. } => Some(ssh_connection), - Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection), - Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection), + Self::Connected { ssh_connection, .. } => Some(ssh_connection.as_ref()), + Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.as_ref()), + Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.as_ref()), _ => None, } } @@ -541,23 +543,19 @@ impl SshRemoteClient { let (proxy, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); - let (ssh_connection, ssh_proxy_process) = Self::establish_connection( + let (ssh_connection, io_task) = Self::establish_connection( unique_identifier, false, connection_options, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, delegate.clone(), &mut cx, ) .await?; - let multiplex_task = Self::multiplex( - this.downgrade(), - ssh_proxy_process, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - &mut cx, - ); + let multiplex_task = Self::monitor(this.downgrade(), io_task, &cx); if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { log::error!("failed to establish connection: {}", error); @@ -703,30 +701,24 @@ impl SshRemoteClient { }; } - if let Err(error) = ssh_connection.master_process.kill() { + if let Err(error) = ssh_connection.kill().await.context("Failed to kill ssh process") { failed!(error, attempts, ssh_connection, delegate, forwarder); }; - if let Err(error) = ssh_connection - .master_process - .status() - .await - .context("Failed to kill ssh process") - { - failed!(error, attempts, ssh_connection, delegate, forwarder); - } - - let connection_options = ssh_connection.socket.connection_options.clone(); + let connection_options = ssh_connection.connection_options(); let (incoming_tx, outgoing_rx) = forwarder.into_channels().await; let (forwarder, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1); - let (ssh_connection, ssh_process) = match Self::establish_connection( + let (ssh_connection, io_task) = match Self::establish_connection( identifier, true, connection_options, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, delegate.clone(), &mut cx, ) @@ -738,16 +730,9 @@ impl SshRemoteClient { } }; - let multiplex_task = Self::multiplex( - this.clone(), - ssh_process, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - &mut cx, - ); + let multiplex_task = Self::monitor(this.clone(), io_task, &cx); - if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { + if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await { failed!(error, attempts, ssh_connection, delegate, forwarder); }; @@ -911,18 +896,17 @@ impl SshRemoteClient { } fn multiplex( - this: WeakModel, mut ssh_proxy_process: Child, incoming_tx: UnboundedSender, mut outgoing_rx: UnboundedReceiver, mut connection_activity_tx: Sender<()>, cx: &AsyncAppContext, - ) -> Task> { + ) -> Task>> { let mut child_stderr = ssh_proxy_process.stderr.take().unwrap(); let mut child_stdout = ssh_proxy_process.stdout.take().unwrap(); let mut child_stdin = ssh_proxy_process.stdin.take().unwrap(); - let io_task = cx.background_executor().spawn(async move { + cx.background_executor().spawn(async move { let mut stdin_buffer = Vec::new(); let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); @@ -1001,8 +985,14 @@ impl SshRemoteClient { } } } - }); + }) + } + fn monitor( + this: WeakModel, + io_task: Task>>, + cx: &AsyncAppContext, + ) -> Task> { cx.spawn(|mut cx| async move { let result = io_task.await; @@ -1061,21 +1051,40 @@ impl SshRemoteClient { cx.notify(); } + #[allow(clippy::too_many_arguments)] async fn establish_connection( unique_identifier: String, reconnect: bool, connection_options: SshConnectionOptions, + proxy_incoming_tx: UnboundedSender, + proxy_outgoing_rx: UnboundedReceiver, + connection_activity_tx: Sender<()>, delegate: Arc, cx: &mut AsyncAppContext, - ) -> Result<(SshRemoteConnection, Child)> { + ) -> Result<(Box, Task>>)> { + #[cfg(any(test, feature = "test-support"))] + if let Some(fake) = fake::SshRemoteConnection::new(&connection_options) { + let io_task = fake::SshRemoteConnection::multiplex( + fake.connection_options(), + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + cx, + ) + .await; + return Ok((fake, io_task)); + } + let ssh_connection = SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?; let platform = ssh_connection.query_platform().await?; let remote_binary_path = delegate.remote_server_binary_path(platform, cx)?; - ssh_connection - .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) - .await?; + if !reconnect { + ssh_connection + .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) + .await?; + } let socket = ssh_connection.socket.clone(); run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?; @@ -1100,7 +1109,15 @@ impl SshRemoteClient { .spawn() .context("failed to spawn remote server")?; - Ok((ssh_connection, ssh_proxy_process)) + let io_task = Self::multiplex( + ssh_proxy_process, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + &cx, + ); + + Ok((Box::new(ssh_connection), io_task)) } pub fn subscribe_to_entity(&self, remote_id: u64, entity: &Model) { @@ -1112,7 +1129,7 @@ impl SshRemoteClient { .lock() .as_ref() .and_then(|state| state.ssh_connection()) - .map(|ssh_connection| ssh_connection.socket.ssh_args()) + .map(|ssh_connection| ssh_connection.ssh_args()) } pub fn proto_client(&self) -> AnyProtoClient { @@ -1127,7 +1144,6 @@ impl SshRemoteClient { self.connection_options.clone() } - #[cfg(not(any(test, feature = "test-support")))] pub fn connection_state(&self) -> ConnectionState { self.state .lock() @@ -1136,37 +1152,69 @@ impl SshRemoteClient { .unwrap_or(ConnectionState::Disconnected) } - #[cfg(any(test, feature = "test-support"))] - pub fn connection_state(&self) -> ConnectionState { - ConnectionState::Connected - } - pub fn is_disconnected(&self) -> bool { self.connection_state() == ConnectionState::Disconnected } #[cfg(any(test, feature = "test-support"))] - pub fn fake( - client_cx: &mut gpui::TestAppContext, + pub fn simulate_disconnect(&self, cx: &mut AppContext) -> Task<()> { + use gpui::BorrowAppContext; + + let port = self.connection_options().port.unwrap(); + + let disconnect = + cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.take(port).into_channels()); + cx.spawn(|mut cx| async move { + let (input_rx, output_tx) = disconnect.await; + let (forwarder, _, _) = ChannelForwarder::new(input_rx, output_tx, &mut cx); + cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.replace(port, forwarder)) + .unwrap() + }) + } + + #[cfg(any(test, feature = "test-support"))] + pub fn fake_server( server_cx: &mut gpui::TestAppContext, - ) -> (Model, Arc) { - use gpui::Context; + ) -> (ChannelForwarder, Arc) { + server_cx.update(|cx| { + let (outgoing_tx, outgoing_rx) = mpsc::unbounded::(); + let (incoming_tx, incoming_rx) = mpsc::unbounded::(); - let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded(); - let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded(); + // We use the forwarder on the server side (in production we only use one on the client side) + // the idea is that we can simulate a disconnect/reconnect by just messing with the forwarder. + let (forwarder, _, _) = + ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx.to_async()); - ( - client_cx.update(|cx| { - let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx); - cx.new_model(|_| Self { - client, - unique_identifier: "fake".to_string(), - connection_options: SshConnectionOptions::default(), - state: Arc::new(Mutex::new(None)), - }) - }), - server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)), - ) + let client = ChannelClient::new(incoming_rx, outgoing_tx, cx); + (forwarder, client) + }) + } + + #[cfg(any(test, feature = "test-support"))] + pub async fn fake_client( + forwarder: ChannelForwarder, + client_cx: &mut gpui::TestAppContext, + ) -> Model { + use gpui::BorrowAppContext; + client_cx + .update(|cx| { + let port = cx.update_default_global(|c: &mut fake::GlobalConnections, _cx| { + c.push(forwarder) + }); + + Self::new( + "fake".to_string(), + SshConnectionOptions { + host: "".to_string(), + port: Some(port), + ..Default::default() + }, + Arc::new(fake::Delegate), + cx, + ) + }) + .await + .unwrap() } } @@ -1176,6 +1224,13 @@ impl From for AnyProtoClient { } } +#[async_trait] +trait SshRemoteProcess: Send + Sync { + async fn kill(&mut self) -> Result<()>; + fn ssh_args(&self) -> Vec; + fn connection_options(&self) -> SshConnectionOptions; +} + struct SshRemoteConnection { socket: SshSocket, master_process: process::Child, @@ -1190,6 +1245,25 @@ impl Drop for SshRemoteConnection { } } +#[async_trait] +impl SshRemoteProcess for SshRemoteConnection { + async fn kill(&mut self) -> Result<()> { + self.master_process.kill()?; + + self.master_process.status().await?; + + Ok(()) + } + + fn ssh_args(&self) -> Vec { + self.socket.ssh_args() + } + + fn connection_options(&self) -> SshConnectionOptions { + self.socket.connection_options.clone() + } +} + impl SshRemoteConnection { #[cfg(not(unix))] async fn new( @@ -1472,8 +1546,10 @@ type ResponseChannels = Mutex, - response_channels: ResponseChannels, // Lock - message_handlers: Mutex, // Lock + buffer: Mutex>, + response_channels: ResponseChannels, + message_handlers: Mutex, + max_received: AtomicU32, } impl ChannelClient { @@ -1485,8 +1561,10 @@ impl ChannelClient { let this = Arc::new(Self { outgoing_tx, next_message_id: AtomicU32::new(0), + max_received: AtomicU32::new(0), response_channels: ResponseChannels::default(), message_handlers: Default::default(), + buffer: Mutex::new(VecDeque::new()), }); Self::start_handling_messages(this.clone(), incoming_rx, cx); @@ -1507,6 +1585,25 @@ impl ChannelClient { let Some(this) = this.upgrade() else { return anyhow::Ok(()); }; + if let Some(ack_id) = incoming.ack_id { + let mut buffer = this.buffer.lock(); + while buffer.front().is_some_and(|msg| msg.id <= ack_id) { + buffer.pop_front(); + } + } + if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload { + { + let buffer = this.buffer.lock(); + for envelope in buffer.iter() { + this.outgoing_tx.unbounded_send(envelope.clone()).ok(); + } + } + let response = proto::Ack{}.into_envelope(0, Some(incoming.id), None); + this.send_dynamic(response).ok(); + continue; + } + + this.max_received.store(incoming.id, SeqCst); if let Some(request_id) = incoming.responding_to { let request_id = MessageId(request_id); @@ -1583,6 +1680,23 @@ impl ChannelClient { } } + pub async fn resync(&self, timeout: Duration) -> Result<()> { + smol::future::or( + async { + self.request(proto::FlushBufferedMessages {}).await?; + for envelope in self.buffer.lock().iter() { + self.outgoing_tx.unbounded_send(envelope.clone()).ok(); + } + Ok(()) + }, + async { + smol::Timer::after(timeout).await; + Err(anyhow!("Timeout detected")) + }, + ) + .await + } + pub async fn ping(&self, timeout: Duration) -> Result<()> { smol::future::or( async { @@ -1612,7 +1726,8 @@ impl ChannelClient { let mut response_channels_lock = self.response_channels.lock(); response_channels_lock.insert(MessageId(envelope.id), tx); drop(response_channels_lock); - let result = self.outgoing_tx.unbounded_send(envelope); + + let result = self.send_buffered(envelope); async move { if let Err(error) = &result { log::error!("failed to send message: {}", error); @@ -1629,6 +1744,12 @@ impl ChannelClient { pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> { envelope.id = self.next_message_id.fetch_add(1, SeqCst); + self.send_buffered(envelope) + } + + pub fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> { + envelope.ack_id = Some(self.max_received.load(SeqCst)); + self.buffer.lock().push_back(envelope.clone()); self.outgoing_tx.unbounded_send(envelope)?; Ok(()) } @@ -1659,3 +1780,165 @@ impl ProtoClient for ChannelClient { false } } + +#[cfg(any(test, feature = "test-support"))] +mod fake { + use std::path::PathBuf; + + use anyhow::Result; + use async_trait::async_trait; + use futures::{ + channel::{ + mpsc::{self, Sender}, + oneshot, + }, + select_biased, FutureExt, SinkExt, StreamExt, + }; + use gpui::{AsyncAppContext, BorrowAppContext, Global, SemanticVersion, Task}; + use rpc::proto::Envelope; + + use super::{ + ChannelForwarder, SshClientDelegate, SshConnectionOptions, SshPlatform, SshRemoteProcess, + }; + + pub(super) struct SshRemoteConnection { + connection_options: SshConnectionOptions, + } + + impl SshRemoteConnection { + pub(super) fn new( + connection_options: &SshConnectionOptions, + ) -> Option> { + if connection_options.host == "" { + return Some(Box::new(Self { + connection_options: connection_options.clone(), + })); + } + return None; + } + pub(super) async fn multiplex( + connection_options: SshConnectionOptions, + mut client_tx: mpsc::UnboundedSender, + mut client_rx: mpsc::UnboundedReceiver, + mut connection_activity_tx: Sender<()>, + cx: &mut AsyncAppContext, + ) -> Task>> { + let (server_tx, server_rx) = cx + .update(|cx| { + cx.update_global(|conns: &mut GlobalConnections, _| { + conns.take(connection_options.port.unwrap()) + }) + }) + .unwrap() + .into_channels() + .await; + + let (forwarder, mut proxy_tx, mut proxy_rx) = + ChannelForwarder::new(server_tx, server_rx, cx); + + cx.update(|cx| { + cx.update_global(|conns: &mut GlobalConnections, _| { + conns.replace(connection_options.port.unwrap(), forwarder) + }) + }) + .unwrap(); + + cx.background_executor().spawn(async move { + loop { + select_biased! { + server_to_client = proxy_rx.next().fuse() => { + let Some(server_to_client) = server_to_client else { + return Ok(Some(1)) + }; + connection_activity_tx.try_send(()).ok(); + client_tx.send(server_to_client).await.ok(); + } + client_to_server = client_rx.next().fuse() => { + let Some(client_to_server) = client_to_server else { + return Ok(None) + }; + proxy_tx.send(client_to_server).await.ok(); + + } + } + } + }) + } + } + + #[async_trait] + impl SshRemoteProcess for SshRemoteConnection { + async fn kill(&mut self) -> Result<()> { + Ok(()) + } + + fn ssh_args(&self) -> Vec { + Vec::new() + } + + fn connection_options(&self) -> SshConnectionOptions { + self.connection_options.clone() + } + } + + #[derive(Default)] + pub(super) struct GlobalConnections(Vec>); + impl Global for GlobalConnections {} + + impl GlobalConnections { + pub(super) fn push(&mut self, forwarder: ChannelForwarder) -> u16 { + self.0.push(Some(forwarder)); + self.0.len() as u16 - 1 + } + + pub(super) fn take(&mut self, port: u16) -> ChannelForwarder { + self.0 + .get_mut(port as usize) + .expect("no fake server for port") + .take() + .expect("fake server is already borrowed") + } + pub(super) fn replace(&mut self, port: u16, forwarder: ChannelForwarder) { + let ret = self + .0 + .get_mut(port as usize) + .expect("no fake server for port") + .replace(forwarder); + if ret.is_some() { + panic!("fake server is already replaced"); + } + } + } + + pub(super) struct Delegate; + + impl SshClientDelegate for Delegate { + fn ask_password( + &self, + _: String, + _: &mut AsyncAppContext, + ) -> oneshot::Receiver> { + unreachable!() + } + fn remote_server_binary_path( + &self, + _: SshPlatform, + _: &mut AsyncAppContext, + ) -> Result { + unreachable!() + } + fn get_server_binary( + &self, + _: SshPlatform, + _: &mut AsyncAppContext, + ) -> oneshot::Receiver> { + unreachable!() + } + fn set_status(&self, _: Option<&str>, _: &mut AsyncAppContext) { + unreachable!() + } + fn set_error(&self, _: String, _: &mut AsyncAppContext) { + unreachable!() + } + } +} diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index 41065ad550..bfe58931c9 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -641,6 +641,47 @@ async fn test_open_server_settings(cx: &mut TestAppContext, server_cx: &mut Test }) } +#[gpui::test(iterations = 10)] +async fn test_reconnect(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { + let (project, _headless, fs) = init_test(cx, server_cx).await; + + let (worktree, _) = project + .update(cx, |project, cx| { + project.find_or_create_worktree("/code/project1", true, cx) + }) + .await + .unwrap(); + + let worktree_id = worktree.read_with(cx, |worktree, _| worktree.id()); + let buffer = project + .update(cx, |project, cx| { + project.open_buffer((worktree_id, Path::new("src/lib.rs")), cx) + }) + .await + .unwrap(); + + buffer.update(cx, |buffer, cx| { + assert_eq!(buffer.text(), "fn one() -> usize { 1 }"); + let ix = buffer.text().find('1').unwrap(); + buffer.edit([(ix..ix + 1, "100")], None, cx); + }); + + let client = cx.read(|cx| project.read(cx).ssh_client().unwrap()); + client + .update(cx, |client, cx| client.simulate_disconnect(cx)) + .detach(); + + project + .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx)) + .await + .unwrap(); + + assert_eq!( + fs.load("/code/project1/src/lib.rs".as_ref()).await.unwrap(), + "fn one() -> usize { 100 }" + ); +} + fn init_logger() { if std::env::var("RUST_LOG").is_ok() { env_logger::try_init().ok(); @@ -651,9 +692,9 @@ async fn init_test( cx: &mut TestAppContext, server_cx: &mut TestAppContext, ) -> (Model, Model, Arc) { - let (ssh_remote_client, ssh_server_client) = SshRemoteClient::fake(cx, server_cx); init_logger(); + let (forwarder, ssh_server_client) = SshRemoteClient::fake_server(server_cx); let fs = FakeFs::new(server_cx.executor()); fs.insert_tree( "/code", @@ -694,8 +735,9 @@ async fn init_test( cx, ) }); - let project = build_project(ssh_remote_client, cx); + let ssh = SshRemoteClient::fake_client(forwarder, cx).await; + let project = build_project(ssh, cx); project .update(cx, { let headless = headless.clone();