From a5492b3ea6b00577a8461223844f1a3c47ac1d4f Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 18 Oct 2024 16:08:56 -0600 Subject: [PATCH] Revert "SSH reconnect reliability (#19398)" (#19440) This reverts commit 98ecb43b2dac3d70f43d745ab32be5a3d4bf323b. Tests fail on main?! Closes #ISSUE Release Notes: - N/A --- Cargo.lock | 1 - .../remote_editing_collaboration_tests.rs | 3 +- crates/project/src/project.rs | 4 - crates/proto/proto/zed.proto | 8 +- crates/proto/src/macros.rs | 1 - crates/proto/src/proto.rs | 2 - crates/remote/Cargo.toml | 1 - crates/remote/src/ssh_session.rs | 437 +++--------------- .../remote_server/src/remote_editing_tests.rs | 46 +- 9 files changed, 80 insertions(+), 423 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f32d24dbfb..a27005e5d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9119,7 +9119,6 @@ name = "remote" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "collections", "fs", "futures 0.3.30", diff --git a/crates/collab/src/tests/remote_editing_collaboration_tests.rs b/crates/collab/src/tests/remote_editing_collaboration_tests.rs index a23aadf5e0..dae3345755 100644 --- a/crates/collab/src/tests/remote_editing_collaboration_tests.rs +++ b/crates/collab/src/tests/remote_editing_collaboration_tests.rs @@ -26,7 +26,7 @@ async fn test_sharing_an_ssh_remote_project( .await; // Set up project on remote FS - let (forwarder, server_ssh) = SshRemoteClient::fake_server(server_cx); + let (client_ssh, server_ssh) = SshRemoteClient::fake(cx_a, server_cx); let remote_fs = FakeFs::new(server_cx.executor()); remote_fs .insert_tree( @@ -67,7 +67,6 @@ async fn test_sharing_an_ssh_remote_project( ) }); - let client_ssh = SshRemoteClient::fake_client(forwarder, cx_a).await; let (project_a, worktree_id) = client_a .build_ssh_project("/code/project1", client_ssh, cx_a) .await; diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8ea9e78cb7..70d5962647 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1243,10 +1243,6 @@ impl Project { self.client.clone() } - pub fn ssh_client(&self) -> Option> { - self.ssh_client.clone() - } - pub fn user_store(&self) -> Model { self.user_store.clone() } diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index e089fcdeba..09891de6fe 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -12,7 +12,6 @@ message Envelope { uint32 id = 1; optional uint32 responding_to = 2; optional PeerId original_sender_id = 3; - optional uint32 ack_id = 266; oneof payload { Hello hello = 4; @@ -296,9 +295,7 @@ message Envelope { OpenServerSettings open_server_settings = 263; GetPermalinkToLine get_permalink_to_line = 264; - GetPermalinkToLineResponse get_permalink_to_line_response = 265; - - FlushBufferedMessages flush_buffered_messages = 267; + GetPermalinkToLineResponse get_permalink_to_line_response = 265; // current max } reserved 87 to 88; @@ -2524,6 +2521,3 @@ message GetPermalinkToLine { message GetPermalinkToLineResponse { string permalink = 1; } - -message FlushBufferedMessages {} -message FlushBufferedMessagesResponse {} diff --git a/crates/proto/src/macros.rs b/crates/proto/src/macros.rs index 2ce0c0df25..4fdbfff81b 100644 --- a/crates/proto/src/macros.rs +++ b/crates/proto/src/macros.rs @@ -32,7 +32,6 @@ macro_rules! messages { responding_to, original_sender_id, payload: Some(envelope::Payload::$name(self)), - ack_id: None, } } diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 8179473fea..ffbbeb49c2 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -372,7 +372,6 @@ messages!( (OpenServerSettings, Foreground), (GetPermalinkToLine, Foreground), (GetPermalinkToLineResponse, Foreground), - (FlushBufferedMessages, Foreground), ); request_messages!( @@ -499,7 +498,6 @@ request_messages!( (RemoveWorktree, Ack), (OpenServerSettings, OpenBufferResponse), (GetPermalinkToLine, GetPermalinkToLineResponse), - (FlushBufferedMessages, Ack), ); entity_messages!( diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index 937a69ee59..b8c5f34cc5 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -19,7 +19,6 @@ test-support = ["fs/test-support"] [dependencies] anyhow.workspace = true -async-trait.workspace = true collections.workspace = true fs.workspace = true futures.workspace = true diff --git a/crates/remote/src/ssh_session.rs b/crates/remote/src/ssh_session.rs index 74ee837e46..f7ef74ce39 100644 --- a/crates/remote/src/ssh_session.rs +++ b/crates/remote/src/ssh_session.rs @@ -6,7 +6,6 @@ use crate::{ proxy::ProxyLaunchError, }; use anyhow::{anyhow, Context as _, Result}; -use async_trait::async_trait; use collections::HashMap; use futures::{ channel::{ @@ -32,7 +31,6 @@ use smol::{ }; use std::{ any::TypeId, - collections::VecDeque, ffi::OsStr, fmt, ops::ControlFlow, @@ -278,7 +276,7 @@ async fn run_cmd(command: &mut process::Command) -> Result { } } -pub struct ChannelForwarder { +struct ChannelForwarder { quit_tx: UnboundedSender<()>, forwarding_task: Task<(UnboundedSender, UnboundedReceiver)>, } @@ -349,7 +347,7 @@ const MAX_RECONNECT_ATTEMPTS: usize = 3; enum State { Connecting, Connected { - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -359,7 +357,7 @@ enum State { HeartbeatMissed { missed_heartbeats: usize, - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -368,7 +366,7 @@ enum State { }, Reconnecting, ReconnectFailed { - ssh_connection: Box, + ssh_connection: SshRemoteConnection, delegate: Arc, forwarder: ChannelForwarder, @@ -394,11 +392,11 @@ impl fmt::Display for State { } impl State { - fn ssh_connection(&self) -> Option<&dyn SshRemoteProcess> { + fn ssh_connection(&self) -> Option<&SshRemoteConnection> { match self { - Self::Connected { ssh_connection, .. } => Some(ssh_connection.as_ref()), - Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.as_ref()), - Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.as_ref()), + Self::Connected { ssh_connection, .. } => Some(ssh_connection), + Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection), + Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection), _ => None, } } @@ -543,19 +541,23 @@ impl SshRemoteClient { let (proxy, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); - let (ssh_connection, io_task) = Self::establish_connection( + let (ssh_connection, ssh_proxy_process) = Self::establish_connection( unique_identifier, false, connection_options, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, delegate.clone(), &mut cx, ) .await?; - let multiplex_task = Self::monitor(this.downgrade(), io_task, &cx); + let multiplex_task = Self::multiplex( + this.downgrade(), + ssh_proxy_process, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + &mut cx, + ); if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { log::error!("failed to establish connection: {}", error); @@ -701,24 +703,30 @@ impl SshRemoteClient { }; } - if let Err(error) = ssh_connection.kill().await.context("Failed to kill ssh process") { + if let Err(error) = ssh_connection.master_process.kill() { failed!(error, attempts, ssh_connection, delegate, forwarder); }; - let connection_options = ssh_connection.connection_options(); + if let Err(error) = ssh_connection + .master_process + .status() + .await + .context("Failed to kill ssh process") + { + failed!(error, attempts, ssh_connection, delegate, forwarder); + } + + let connection_options = ssh_connection.socket.connection_options.clone(); let (incoming_tx, outgoing_rx) = forwarder.into_channels().await; let (forwarder, proxy_incoming_tx, proxy_outgoing_rx) = ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx); let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1); - let (ssh_connection, io_task) = match Self::establish_connection( + let (ssh_connection, ssh_process) = match Self::establish_connection( identifier, true, connection_options, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, delegate.clone(), &mut cx, ) @@ -730,9 +738,16 @@ impl SshRemoteClient { } }; - let multiplex_task = Self::monitor(this.clone(), io_task, &cx); + let multiplex_task = Self::multiplex( + this.clone(), + ssh_process, + proxy_incoming_tx, + proxy_outgoing_rx, + connection_activity_tx, + &mut cx, + ); - if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await { + if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await { failed!(error, attempts, ssh_connection, delegate, forwarder); }; @@ -896,17 +911,18 @@ impl SshRemoteClient { } fn multiplex( + this: WeakModel, mut ssh_proxy_process: Child, incoming_tx: UnboundedSender, mut outgoing_rx: UnboundedReceiver, mut connection_activity_tx: Sender<()>, cx: &AsyncAppContext, - ) -> Task>> { + ) -> Task> { let mut child_stderr = ssh_proxy_process.stderr.take().unwrap(); let mut child_stdout = ssh_proxy_process.stdout.take().unwrap(); let mut child_stdin = ssh_proxy_process.stdin.take().unwrap(); - cx.background_executor().spawn(async move { + let io_task = cx.background_executor().spawn(async move { let mut stdin_buffer = Vec::new(); let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); @@ -985,14 +1001,8 @@ impl SshRemoteClient { } } } - }) - } + }); - fn monitor( - this: WeakModel, - io_task: Task>>, - cx: &AsyncAppContext, - ) -> Task> { cx.spawn(|mut cx| async move { let result = io_task.await; @@ -1051,40 +1061,21 @@ impl SshRemoteClient { cx.notify(); } - #[allow(clippy::too_many_arguments)] async fn establish_connection( unique_identifier: String, reconnect: bool, connection_options: SshConnectionOptions, - proxy_incoming_tx: UnboundedSender, - proxy_outgoing_rx: UnboundedReceiver, - connection_activity_tx: Sender<()>, delegate: Arc, cx: &mut AsyncAppContext, - ) -> Result<(Box, Task>>)> { - #[cfg(any(test, feature = "test-support"))] - if let Some(fake) = fake::SshRemoteConnection::new(&connection_options) { - let io_task = fake::SshRemoteConnection::multiplex( - fake.connection_options(), - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - cx, - ) - .await; - return Ok((fake, io_task)); - } - + ) -> Result<(SshRemoteConnection, Child)> { let ssh_connection = SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?; let platform = ssh_connection.query_platform().await?; let remote_binary_path = delegate.remote_server_binary_path(platform, cx)?; - if !reconnect { - ssh_connection - .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) - .await?; - } + ssh_connection + .ensure_server_binary(&delegate, &remote_binary_path, platform, cx) + .await?; let socket = ssh_connection.socket.clone(); run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?; @@ -1109,15 +1100,7 @@ impl SshRemoteClient { .spawn() .context("failed to spawn remote server")?; - let io_task = Self::multiplex( - ssh_proxy_process, - proxy_incoming_tx, - proxy_outgoing_rx, - connection_activity_tx, - &cx, - ); - - Ok((Box::new(ssh_connection), io_task)) + Ok((ssh_connection, ssh_proxy_process)) } pub fn subscribe_to_entity(&self, remote_id: u64, entity: &Model) { @@ -1129,7 +1112,7 @@ impl SshRemoteClient { .lock() .as_ref() .and_then(|state| state.ssh_connection()) - .map(|ssh_connection| ssh_connection.ssh_args()) + .map(|ssh_connection| ssh_connection.socket.ssh_args()) } pub fn proto_client(&self) -> AnyProtoClient { @@ -1144,6 +1127,7 @@ impl SshRemoteClient { self.connection_options.clone() } + #[cfg(not(any(test, feature = "test-support")))] pub fn connection_state(&self) -> ConnectionState { self.state .lock() @@ -1152,69 +1136,37 @@ impl SshRemoteClient { .unwrap_or(ConnectionState::Disconnected) } + #[cfg(any(test, feature = "test-support"))] + pub fn connection_state(&self) -> ConnectionState { + ConnectionState::Connected + } + pub fn is_disconnected(&self) -> bool { self.connection_state() == ConnectionState::Disconnected } #[cfg(any(test, feature = "test-support"))] - pub fn simulate_disconnect(&self, cx: &mut AppContext) -> Task<()> { - use gpui::BorrowAppContext; - - let port = self.connection_options().port.unwrap(); - - let disconnect = - cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.take(port).into_channels()); - cx.spawn(|mut cx| async move { - let (input_rx, output_tx) = disconnect.await; - let (forwarder, _, _) = ChannelForwarder::new(input_rx, output_tx, &mut cx); - cx.update_global(|c: &mut fake::GlobalConnections, _cx| c.replace(port, forwarder)) - .unwrap() - }) - } - - #[cfg(any(test, feature = "test-support"))] - pub fn fake_server( - server_cx: &mut gpui::TestAppContext, - ) -> (ChannelForwarder, Arc) { - server_cx.update(|cx| { - let (outgoing_tx, outgoing_rx) = mpsc::unbounded::(); - let (incoming_tx, incoming_rx) = mpsc::unbounded::(); - - // We use the forwarder on the server side (in production we only use one on the client side) - // the idea is that we can simulate a disconnect/reconnect by just messing with the forwarder. - let (forwarder, _, _) = - ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx.to_async()); - - let client = ChannelClient::new(incoming_rx, outgoing_tx, cx); - (forwarder, client) - }) - } - - #[cfg(any(test, feature = "test-support"))] - pub async fn fake_client( - forwarder: ChannelForwarder, + pub fn fake( client_cx: &mut gpui::TestAppContext, - ) -> Model { - use gpui::BorrowAppContext; - client_cx - .update(|cx| { - let port = cx.update_default_global(|c: &mut fake::GlobalConnections, _cx| { - c.push(forwarder) - }); + server_cx: &mut gpui::TestAppContext, + ) -> (Model, Arc) { + use gpui::Context; - Self::new( - "fake".to_string(), - SshConnectionOptions { - host: "".to_string(), - port: Some(port), - ..Default::default() - }, - Arc::new(fake::Delegate), - cx, - ) - }) - .await - .unwrap() + let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded(); + let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded(); + + ( + client_cx.update(|cx| { + let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx); + cx.new_model(|_| Self { + client, + unique_identifier: "fake".to_string(), + connection_options: SshConnectionOptions::default(), + state: Arc::new(Mutex::new(None)), + }) + }), + server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)), + ) } } @@ -1224,13 +1176,6 @@ impl From for AnyProtoClient { } } -#[async_trait] -trait SshRemoteProcess: Send + Sync { - async fn kill(&mut self) -> Result<()>; - fn ssh_args(&self) -> Vec; - fn connection_options(&self) -> SshConnectionOptions; -} - struct SshRemoteConnection { socket: SshSocket, master_process: process::Child, @@ -1245,25 +1190,6 @@ impl Drop for SshRemoteConnection { } } -#[async_trait] -impl SshRemoteProcess for SshRemoteConnection { - async fn kill(&mut self) -> Result<()> { - self.master_process.kill()?; - - self.master_process.status().await?; - - Ok(()) - } - - fn ssh_args(&self) -> Vec { - self.socket.ssh_args() - } - - fn connection_options(&self) -> SshConnectionOptions { - self.socket.connection_options.clone() - } -} - impl SshRemoteConnection { #[cfg(not(unix))] async fn new( @@ -1546,10 +1472,8 @@ type ResponseChannels = Mutex, - buffer: Mutex>, - response_channels: ResponseChannels, - message_handlers: Mutex, - max_received: AtomicU32, + response_channels: ResponseChannels, // Lock + message_handlers: Mutex, // Lock } impl ChannelClient { @@ -1561,10 +1485,8 @@ impl ChannelClient { let this = Arc::new(Self { outgoing_tx, next_message_id: AtomicU32::new(0), - max_received: AtomicU32::new(0), response_channels: ResponseChannels::default(), message_handlers: Default::default(), - buffer: Mutex::new(VecDeque::new()), }); Self::start_handling_messages(this.clone(), incoming_rx, cx); @@ -1585,27 +1507,6 @@ impl ChannelClient { let Some(this) = this.upgrade() else { return anyhow::Ok(()); }; - if let Some(ack_id) = incoming.ack_id { - let mut buffer = this.buffer.lock(); - while buffer.front().is_some_and(|msg| msg.id <= ack_id) { - buffer.pop_front(); - } - } - if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = - &incoming.payload - { - { - let buffer = this.buffer.lock(); - for envelope in buffer.iter() { - this.outgoing_tx.unbounded_send(envelope.clone()).ok(); - } - } - let response = proto::Ack {}.into_envelope(0, Some(incoming.id), None); - this.send_dynamic(response).ok(); - continue; - } - - this.max_received.store(incoming.id, SeqCst); if let Some(request_id) = incoming.responding_to { let request_id = MessageId(request_id); @@ -1682,23 +1583,6 @@ impl ChannelClient { } } - pub async fn resync(&self, timeout: Duration) -> Result<()> { - smol::future::or( - async { - self.request(proto::FlushBufferedMessages {}).await?; - for envelope in self.buffer.lock().iter() { - self.outgoing_tx.unbounded_send(envelope.clone()).ok(); - } - Ok(()) - }, - async { - smol::Timer::after(timeout).await; - Err(anyhow!("Timeout detected")) - }, - ) - .await - } - pub async fn ping(&self, timeout: Duration) -> Result<()> { smol::future::or( async { @@ -1728,8 +1612,7 @@ impl ChannelClient { let mut response_channels_lock = self.response_channels.lock(); response_channels_lock.insert(MessageId(envelope.id), tx); drop(response_channels_lock); - - let result = self.send_buffered(envelope); + let result = self.outgoing_tx.unbounded_send(envelope); async move { if let Err(error) = &result { log::error!("failed to send message: {}", error); @@ -1746,12 +1629,6 @@ impl ChannelClient { pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> { envelope.id = self.next_message_id.fetch_add(1, SeqCst); - self.send_buffered(envelope) - } - - pub fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> { - envelope.ack_id = Some(self.max_received.load(SeqCst)); - self.buffer.lock().push_back(envelope.clone()); self.outgoing_tx.unbounded_send(envelope)?; Ok(()) } @@ -1782,165 +1659,3 @@ impl ProtoClient for ChannelClient { false } } - -#[cfg(any(test, feature = "test-support"))] -mod fake { - use std::path::PathBuf; - - use anyhow::Result; - use async_trait::async_trait; - use futures::{ - channel::{ - mpsc::{self, Sender}, - oneshot, - }, - select_biased, FutureExt, SinkExt, StreamExt, - }; - use gpui::{AsyncAppContext, BorrowAppContext, Global, SemanticVersion, Task}; - use rpc::proto::Envelope; - - use super::{ - ChannelForwarder, SshClientDelegate, SshConnectionOptions, SshPlatform, SshRemoteProcess, - }; - - pub(super) struct SshRemoteConnection { - connection_options: SshConnectionOptions, - } - - impl SshRemoteConnection { - pub(super) fn new( - connection_options: &SshConnectionOptions, - ) -> Option> { - if connection_options.host == "" { - return Some(Box::new(Self { - connection_options: connection_options.clone(), - })); - } - return None; - } - pub(super) async fn multiplex( - connection_options: SshConnectionOptions, - mut client_tx: mpsc::UnboundedSender, - mut client_rx: mpsc::UnboundedReceiver, - mut connection_activity_tx: Sender<()>, - cx: &mut AsyncAppContext, - ) -> Task>> { - let (server_tx, server_rx) = cx - .update(|cx| { - cx.update_global(|conns: &mut GlobalConnections, _| { - conns.take(connection_options.port.unwrap()) - }) - }) - .unwrap() - .into_channels() - .await; - - let (forwarder, mut proxy_tx, mut proxy_rx) = - ChannelForwarder::new(server_tx, server_rx, cx); - - cx.update(|cx| { - cx.update_global(|conns: &mut GlobalConnections, _| { - conns.replace(connection_options.port.unwrap(), forwarder) - }) - }) - .unwrap(); - - cx.background_executor().spawn(async move { - loop { - select_biased! { - server_to_client = proxy_rx.next().fuse() => { - let Some(server_to_client) = server_to_client else { - return Ok(Some(1)) - }; - connection_activity_tx.try_send(()).ok(); - client_tx.send(server_to_client).await.ok(); - } - client_to_server = client_rx.next().fuse() => { - let Some(client_to_server) = client_to_server else { - return Ok(None) - }; - proxy_tx.send(client_to_server).await.ok(); - - } - } - } - }) - } - } - - #[async_trait] - impl SshRemoteProcess for SshRemoteConnection { - async fn kill(&mut self) -> Result<()> { - Ok(()) - } - - fn ssh_args(&self) -> Vec { - Vec::new() - } - - fn connection_options(&self) -> SshConnectionOptions { - self.connection_options.clone() - } - } - - #[derive(Default)] - pub(super) struct GlobalConnections(Vec>); - impl Global for GlobalConnections {} - - impl GlobalConnections { - pub(super) fn push(&mut self, forwarder: ChannelForwarder) -> u16 { - self.0.push(Some(forwarder)); - self.0.len() as u16 - 1 - } - - pub(super) fn take(&mut self, port: u16) -> ChannelForwarder { - self.0 - .get_mut(port as usize) - .expect("no fake server for port") - .take() - .expect("fake server is already borrowed") - } - pub(super) fn replace(&mut self, port: u16, forwarder: ChannelForwarder) { - let ret = self - .0 - .get_mut(port as usize) - .expect("no fake server for port") - .replace(forwarder); - if ret.is_some() { - panic!("fake server is already replaced"); - } - } - } - - pub(super) struct Delegate; - - impl SshClientDelegate for Delegate { - fn ask_password( - &self, - _: String, - _: &mut AsyncAppContext, - ) -> oneshot::Receiver> { - unreachable!() - } - fn remote_server_binary_path( - &self, - _: SshPlatform, - _: &mut AsyncAppContext, - ) -> Result { - unreachable!() - } - fn get_server_binary( - &self, - _: SshPlatform, - _: &mut AsyncAppContext, - ) -> oneshot::Receiver> { - unreachable!() - } - fn set_status(&self, _: Option<&str>, _: &mut AsyncAppContext) { - unreachable!() - } - fn set_error(&self, _: String, _: &mut AsyncAppContext) { - unreachable!() - } - } -} diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index bfe58931c9..41065ad550 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -641,47 +641,6 @@ async fn test_open_server_settings(cx: &mut TestAppContext, server_cx: &mut Test }) } -#[gpui::test(iterations = 10)] -async fn test_reconnect(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { - let (project, _headless, fs) = init_test(cx, server_cx).await; - - let (worktree, _) = project - .update(cx, |project, cx| { - project.find_or_create_worktree("/code/project1", true, cx) - }) - .await - .unwrap(); - - let worktree_id = worktree.read_with(cx, |worktree, _| worktree.id()); - let buffer = project - .update(cx, |project, cx| { - project.open_buffer((worktree_id, Path::new("src/lib.rs")), cx) - }) - .await - .unwrap(); - - buffer.update(cx, |buffer, cx| { - assert_eq!(buffer.text(), "fn one() -> usize { 1 }"); - let ix = buffer.text().find('1').unwrap(); - buffer.edit([(ix..ix + 1, "100")], None, cx); - }); - - let client = cx.read(|cx| project.read(cx).ssh_client().unwrap()); - client - .update(cx, |client, cx| client.simulate_disconnect(cx)) - .detach(); - - project - .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx)) - .await - .unwrap(); - - assert_eq!( - fs.load("/code/project1/src/lib.rs".as_ref()).await.unwrap(), - "fn one() -> usize { 100 }" - ); -} - fn init_logger() { if std::env::var("RUST_LOG").is_ok() { env_logger::try_init().ok(); @@ -692,9 +651,9 @@ async fn init_test( cx: &mut TestAppContext, server_cx: &mut TestAppContext, ) -> (Model, Model, Arc) { + let (ssh_remote_client, ssh_server_client) = SshRemoteClient::fake(cx, server_cx); init_logger(); - let (forwarder, ssh_server_client) = SshRemoteClient::fake_server(server_cx); let fs = FakeFs::new(server_cx.executor()); fs.insert_tree( "/code", @@ -735,9 +694,8 @@ async fn init_test( cx, ) }); + let project = build_project(ssh_remote_client, cx); - let ssh = SshRemoteClient::fake_client(forwarder, cx).await; - let project = build_project(ssh, cx); project .update(cx, { let headless = headless.clone();