Remove unnecessary Arc<Mutex<>> around SshRemoteClient's state
This commit is contained in:
parent
d24cad30f3
commit
6ce1403aec
2 changed files with 30 additions and 34 deletions
|
@ -1307,10 +1307,12 @@ impl Project {
|
||||||
cx.on_release(Self::release),
|
cx.on_release(Self::release),
|
||||||
cx.on_app_quit(|this, cx| {
|
cx.on_app_quit(|this, cx| {
|
||||||
let shutdown = this.ssh_client.take().and_then(|client| {
|
let shutdown = this.ssh_client.take().and_then(|client| {
|
||||||
client.read(cx).shutdown_processes(
|
client.update(cx, |client, cx| {
|
||||||
Some(proto::ShutdownRemoteServer {}),
|
client.shutdown_processes(
|
||||||
cx.background_executor().clone(),
|
Some(proto::ShutdownRemoteServer {}),
|
||||||
)
|
cx.background_executor().clone(),
|
||||||
|
)
|
||||||
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
cx.background_executor().spawn(async move {
|
cx.background_executor().spawn(async move {
|
||||||
|
@ -1662,10 +1664,12 @@ impl Project {
|
||||||
|
|
||||||
fn release(&mut self, cx: &mut App) {
|
fn release(&mut self, cx: &mut App) {
|
||||||
if let Some(client) = self.ssh_client.take() {
|
if let Some(client) = self.ssh_client.take() {
|
||||||
let shutdown = client.read(cx).shutdown_processes(
|
let shutdown = client.update(cx, |client, cx| {
|
||||||
Some(proto::ShutdownRemoteServer {}),
|
client.shutdown_processes(
|
||||||
cx.background_executor().clone(),
|
Some(proto::ShutdownRemoteServer {}),
|
||||||
);
|
cx.background_executor().clone(),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
cx.background_spawn(async move {
|
cx.background_spawn(async move {
|
||||||
if let Some(shutdown) = shutdown {
|
if let Some(shutdown) = shutdown {
|
||||||
|
|
|
@ -652,7 +652,7 @@ pub struct SshRemoteClient {
|
||||||
unique_identifier: String,
|
unique_identifier: String,
|
||||||
connection_options: SshConnectionOptions,
|
connection_options: SshConnectionOptions,
|
||||||
path_style: PathStyle,
|
path_style: PathStyle,
|
||||||
state: Arc<Mutex<Option<State>>>,
|
state: Option<State>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -729,7 +729,7 @@ impl SshRemoteClient {
|
||||||
unique_identifier: unique_identifier.clone(),
|
unique_identifier: unique_identifier.clone(),
|
||||||
connection_options,
|
connection_options,
|
||||||
path_style,
|
path_style,
|
||||||
state: Arc::new(Mutex::new(Some(State::Connecting))),
|
state: Some(State::Connecting),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let io_task = ssh_connection.start_proxy(
|
let io_task = ssh_connection.start_proxy(
|
||||||
|
@ -752,7 +752,7 @@ impl SshRemoteClient {
|
||||||
let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
|
let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
|
||||||
|
|
||||||
this.update(cx, |this, _| {
|
this.update(cx, |this, _| {
|
||||||
*this.state.lock() = Some(State::Connected {
|
this.state = Some(State::Connected {
|
||||||
ssh_connection,
|
ssh_connection,
|
||||||
delegate,
|
delegate,
|
||||||
multiplex_task,
|
multiplex_task,
|
||||||
|
@ -782,11 +782,11 @@ impl SshRemoteClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn shutdown_processes<T: RequestMessage>(
|
pub fn shutdown_processes<T: RequestMessage>(
|
||||||
&self,
|
&mut self,
|
||||||
shutdown_request: Option<T>,
|
shutdown_request: Option<T>,
|
||||||
executor: BackgroundExecutor,
|
executor: BackgroundExecutor,
|
||||||
) -> Option<impl Future<Output = ()> + use<T>> {
|
) -> Option<impl Future<Output = ()> + use<T>> {
|
||||||
let state = self.state.lock().take()?;
|
let state = self.state.take()?;
|
||||||
log::info!("shutting down ssh processes");
|
log::info!("shutting down ssh processes");
|
||||||
|
|
||||||
let State::Connected {
|
let State::Connected {
|
||||||
|
@ -821,15 +821,14 @@ impl SshRemoteClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
|
fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
|
||||||
let mut lock = self.state.lock();
|
let can_reconnect = self
|
||||||
|
.state
|
||||||
let can_reconnect = lock
|
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|state| state.can_reconnect())
|
.map(|state| state.can_reconnect())
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
if !can_reconnect {
|
if !can_reconnect {
|
||||||
log::info!("aborting reconnect, because not in state that allows reconnecting");
|
log::info!("aborting reconnect, because not in state that allows reconnecting");
|
||||||
let error = if let Some(state) = lock.as_ref() {
|
let error = if let Some(state) = self.state.as_ref() {
|
||||||
format!("invalid state, cannot reconnect while in state {state}")
|
format!("invalid state, cannot reconnect while in state {state}")
|
||||||
} else {
|
} else {
|
||||||
"no state set".to_string()
|
"no state set".to_string()
|
||||||
|
@ -837,7 +836,7 @@ impl SshRemoteClient {
|
||||||
anyhow::bail!(error);
|
anyhow::bail!(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
let state = lock.take().unwrap();
|
let state = self.state.take().unwrap();
|
||||||
let (attempts, ssh_connection, delegate) = match state {
|
let (attempts, ssh_connection, delegate) = match state {
|
||||||
State::Connected {
|
State::Connected {
|
||||||
ssh_connection,
|
ssh_connection,
|
||||||
|
@ -874,11 +873,9 @@ impl SshRemoteClient {
|
||||||
"Failed to reconnect to after {} attempts, giving up",
|
"Failed to reconnect to after {} attempts, giving up",
|
||||||
MAX_RECONNECT_ATTEMPTS
|
MAX_RECONNECT_ATTEMPTS
|
||||||
);
|
);
|
||||||
drop(lock);
|
|
||||||
self.set_state(State::ReconnectExhausted, cx);
|
self.set_state(State::ReconnectExhausted, cx);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
drop(lock);
|
|
||||||
|
|
||||||
self.set_state(State::Reconnecting, cx);
|
self.set_state(State::Reconnecting, cx);
|
||||||
|
|
||||||
|
@ -1076,7 +1073,7 @@ impl SshRemoteClient {
|
||||||
missed_heartbeats: usize,
|
missed_heartbeats: usize,
|
||||||
cx: &mut Context<Self>,
|
cx: &mut Context<Self>,
|
||||||
) -> ControlFlow<()> {
|
) -> ControlFlow<()> {
|
||||||
let state = self.state.lock().take().unwrap();
|
let state = self.state.take().unwrap();
|
||||||
let next_state = if missed_heartbeats > 0 {
|
let next_state = if missed_heartbeats > 0 {
|
||||||
state.heartbeat_missed()
|
state.heartbeat_missed()
|
||||||
} else {
|
} else {
|
||||||
|
@ -1139,25 +1136,23 @@ impl SshRemoteClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
|
fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
|
||||||
self.state.lock().as_ref().is_some_and(check)
|
self.state.as_ref().is_some_and(check)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_set_state(&self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
|
fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
|
||||||
let mut lock = self.state.lock();
|
let new_state = self.state.as_ref().and_then(map);
|
||||||
let new_state = lock.as_ref().and_then(map);
|
|
||||||
|
|
||||||
if let Some(new_state) = new_state {
|
if let Some(new_state) = new_state {
|
||||||
lock.replace(new_state);
|
self.state.replace(new_state);
|
||||||
cx.notify();
|
cx.notify();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_state(&self, state: State, cx: &mut Context<Self>) {
|
fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
|
||||||
log::info!("setting state to '{}'", &state);
|
log::info!("setting state to '{}'", &state);
|
||||||
|
|
||||||
let is_reconnect_exhausted = state.is_reconnect_exhausted();
|
let is_reconnect_exhausted = state.is_reconnect_exhausted();
|
||||||
let is_server_not_running = state.is_server_not_running();
|
let is_server_not_running = state.is_server_not_running();
|
||||||
self.state.lock().replace(state);
|
self.state.replace(state);
|
||||||
|
|
||||||
if is_reconnect_exhausted || is_server_not_running {
|
if is_reconnect_exhausted || is_server_not_running {
|
||||||
cx.emit(SshRemoteEvent::Disconnected);
|
cx.emit(SshRemoteEvent::Disconnected);
|
||||||
|
@ -1167,7 +1162,6 @@ impl SshRemoteClient {
|
||||||
|
|
||||||
pub fn ssh_info(&self) -> Option<SshInfo> {
|
pub fn ssh_info(&self) -> Option<SshInfo> {
|
||||||
self.state
|
self.state
|
||||||
.lock()
|
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|state| state.ssh_connection())
|
.and_then(|state| state.ssh_connection())
|
||||||
.map(|ssh_connection| SshInfo {
|
.map(|ssh_connection| SshInfo {
|
||||||
|
@ -1183,8 +1177,7 @@ impl SshRemoteClient {
|
||||||
dest_path: RemotePathBuf,
|
dest_path: RemotePathBuf,
|
||||||
cx: &App,
|
cx: &App,
|
||||||
) -> Task<Result<()>> {
|
) -> Task<Result<()>> {
|
||||||
let state = self.state.lock();
|
let Some(connection) = self.state.as_ref().and_then(|state| state.ssh_connection()) else {
|
||||||
let Some(connection) = state.as_ref().and_then(|state| state.ssh_connection()) else {
|
|
||||||
return Task::ready(Err(anyhow!("no ssh connection")));
|
return Task::ready(Err(anyhow!("no ssh connection")));
|
||||||
};
|
};
|
||||||
connection.upload_directory(src_path, dest_path, cx)
|
connection.upload_directory(src_path, dest_path, cx)
|
||||||
|
@ -1204,7 +1197,6 @@ impl SshRemoteClient {
|
||||||
|
|
||||||
pub fn connection_state(&self) -> ConnectionState {
|
pub fn connection_state(&self) -> ConnectionState {
|
||||||
self.state
|
self.state
|
||||||
.lock()
|
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(ConnectionState::from)
|
.map(ConnectionState::from)
|
||||||
.unwrap_or(ConnectionState::Disconnected)
|
.unwrap_or(ConnectionState::Disconnected)
|
||||||
|
@ -2501,7 +2493,7 @@ impl ChannelClient {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
|
fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
|
||||||
log::debug!("ssh send name:{}", T::NAME);
|
log::debug!("ssh send name:{}", T::NAME);
|
||||||
self.send_dynamic(payload.into_envelope(0, None, None))
|
self.send_dynamic(payload.into_envelope(0, None, None))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue