ssh remote: Shutdown SSH & server process correctly on app quit (#19210)

Release Notes:

- N/A

Co-authored-by: Bennet <bennet@zed.dev>
This commit is contained in:
Thorsten Ball 2024-10-15 11:26:23 +02:00 committed by GitHub
parent 62de03f286
commit be7b24fcf7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 65 additions and 42 deletions

View file

@ -746,17 +746,6 @@ impl Project {
}); });
cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach(); cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
cx.on_release(|this, cx| {
if let Some(ssh_client) = this.ssh_client.as_ref() {
ssh_client
.read(cx)
.to_proto_client()
.send(proto::ShutdownRemoteServer {})
.log_err();
}
})
.detach();
cx.subscribe(&ssh, Self::on_ssh_event).detach(); cx.subscribe(&ssh, Self::on_ssh_event).detach();
cx.observe(&ssh, |_, _, cx| cx.notify()).detach(); cx.observe(&ssh, |_, _, cx| cx.notify()).detach();
@ -769,7 +758,22 @@ impl Project {
join_project_response_message_id: 0, join_project_response_message_id: 0,
client_state: ProjectClientState::Local, client_state: ProjectClientState::Local,
client_subscriptions: Vec::new(), client_subscriptions: Vec::new(),
_subscriptions: vec![cx.on_release(Self::release)], _subscriptions: vec![
cx.on_release(Self::release),
cx.on_app_quit(|this, cx| {
let shutdown = this.ssh_client.take().and_then(|client| {
client
.read(cx)
.shutdown_processes(Some(proto::ShutdownRemoteServer {}))
});
cx.background_executor().spawn(async move {
if let Some(shutdown) = shutdown {
shutdown.await;
}
})
}),
],
active_entry: None, active_entry: None,
snippets, snippets,
languages, languages,
@ -1094,6 +1098,20 @@ impl Project {
} }
fn release(&mut self, cx: &mut AppContext) { fn release(&mut self, cx: &mut AppContext) {
if let Some(client) = self.ssh_client.take() {
let shutdown = client
.read(cx)
.shutdown_processes(Some(proto::ShutdownRemoteServer {}));
cx.background_executor()
.spawn(async move {
if let Some(shutdown) = shutdown {
shutdown.await;
}
})
.detach()
}
match &self.client_state { match &self.client_state {
ProjectClientState::Local => {} ProjectClientState::Local => {}
ProjectClientState::Shared { .. } => { ProjectClientState::Shared { .. } => {

View file

@ -424,12 +424,6 @@ pub struct SshRemoteClient {
state: Arc<Mutex<Option<State>>>, state: Arc<Mutex<Option<State>>>,
} }
impl Drop for SshRemoteClient {
fn drop(&mut self) {
self.shutdown_processes();
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum SshRemoteEvent { pub enum SshRemoteEvent {
Disconnected, Disconnected,
@ -449,19 +443,11 @@ impl SshRemoteClient {
let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>(); let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
let client = cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx))?; let client = cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx))?;
let this = cx.new_model(|cx| { let this = cx.new_model(|_| Self {
cx.on_app_quit(|this: &mut Self, _| {
this.shutdown_processes();
futures::future::ready(())
})
.detach();
Self {
client: client.clone(), client: client.clone(),
unique_identifier: unique_identifier.clone(), unique_identifier: unique_identifier.clone(),
connection_options: connection_options.clone(), connection_options: connection_options.clone(),
state: Arc::new(Mutex::new(Some(State::Connecting))), state: Arc::new(Mutex::new(Some(State::Connecting))),
}
})?; })?;
let (proxy, proxy_incoming_tx, proxy_outgoing_rx) = let (proxy, proxy_incoming_tx, proxy_outgoing_rx) =
@ -506,25 +492,44 @@ impl SshRemoteClient {
}) })
} }
fn shutdown_processes(&self) { pub fn shutdown_processes<T: RequestMessage>(
let Some(state) = self.state.lock().take() else { &self,
return; shutdown_request: Option<T>,
}; ) -> Option<impl Future<Output = ()>> {
let state = self.state.lock().take()?;
log::info!("shutting down ssh processes"); log::info!("shutting down ssh processes");
let State::Connected { let State::Connected {
multiplex_task, multiplex_task,
heartbeat_task, heartbeat_task,
.. ssh_connection,
delegate,
forwarder,
} = state } = state
else { else {
return; return None;
}; };
let client = self.client.clone();
Some(async move {
if let Some(shutdown_request) = shutdown_request {
client.send(shutdown_request).log_err();
// We wait 50ms instead of waiting for a response, because
// waiting for a response would require us to wait on the main thread
// which we want to avoid in an `on_app_quit` callback.
Timer::after(Duration::from_millis(50)).await;
}
// Drop `multiplex_task` because it owns our ssh_proxy_process, which is a // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
// child of master_process. // child of master_process.
drop(multiplex_task); drop(multiplex_task);
// Now drop the rest of state, which kills master process. // Now drop the rest of state, which kills master process.
drop(heartbeat_task); drop(heartbeat_task);
drop(ssh_connection);
drop(delegate);
drop(forwarder);
})
} }
fn reconnect(&mut self, cx: &mut ModelContext<Self>) -> Result<()> { fn reconnect(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {