Rebuild SSH installation (#20220)

Closes #ISSUE

This refactors SSH installation to require less shell stuff. We'd like
to
support arbitrary remote hosts, and unfortunately csh/tcsh have quoting
rules
that make it impossible to run multi-line scripts.

The primary changes are:
* The target path now contains the version:
`./zed_server/zed-remote-server-{release_channel}-{version}`
* We do all our processing in a temporary file and `mv` it into place.
* We do fewer calls to `ssh_command` overall. With the previous two
changes we can avoid lock files, and fuser calls. Instead cleanup of old
binaries now happens in `execute_run`.
* We only try to install the remote server when the connection is
established, not on each project open.

This should also put us in a good position if we want to pre-emptively
install new versions when the auto-updater detects an update for the
running version of zed (but that's not wired up yet)

Release Notes:

- Remoting: Fixed remoting when the remote runs `tcsh`
- Remoting: Improved latency of connecting
This commit is contained in:
Conrad Irwin 2024-11-05 13:37:54 -07:00 committed by GitHub
parent 7c72929f0b
commit 87ba5fd7bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 287 additions and 585 deletions

2
Cargo.lock generated
View file

@ -9562,6 +9562,7 @@ dependencies = [
"itertools 0.13.0",
"log",
"parking_lot",
"paths",
"prost",
"release_channel",
"rpc",
@ -9613,6 +9614,7 @@ dependencies = [
"settings",
"shellexpand 2.1.2",
"smol",
"sysinfo",
"telemetry_events",
"toml 0.8.19",
"util",

View file

@ -432,6 +432,9 @@ impl AutoUpdater {
cx.notify();
}
// If you are packaging Zed and need to override the place it downloads SSH remotes from,
// you can override this function. You should also update get_remote_server_release_url to return
// Ok(None).
pub async fn download_remote_server_release(
os: &str,
arch: &str,
@ -482,7 +485,7 @@ impl AutoUpdater {
release_channel: ReleaseChannel,
version: Option<SemanticVersion>,
cx: &mut AsyncAppContext,
) -> Result<(JsonRelease, String)> {
) -> Result<Option<(String, String)>> {
let this = cx.update(|cx| {
cx.default_global::<GlobalAutoUpdate>()
.0
@ -504,7 +507,7 @@ impl AutoUpdater {
let update_request_body = build_remote_server_update_request_body(cx)?;
let body = serde_json::to_string(&update_request_body)?;
Ok((release, body))
Ok(Some((release.url, body)))
}
async fn get_release(

View file

@ -478,43 +478,17 @@ impl remote::SshClientDelegate for SshClientDelegate {
release_channel: ReleaseChannel,
version: Option<SemanticVersion>,
cx: &mut AsyncAppContext,
) -> Task<Result<(String, String)>> {
) -> Task<Result<Option<(String, String)>>> {
cx.spawn(|mut cx| async move {
let (release, request_body) = AutoUpdater::get_remote_server_release_url(
platform.os,
platform.arch,
release_channel,
version,
&mut cx,
)
.await
.map_err(|e| {
anyhow!(
"Failed to get remote server binary download url (version: {}, os: {}, arch: {}): {}",
version.map(|v| format!("{}", v)).unwrap_or("unknown".to_string()),
platform.os,
platform.arch,
e
)
})?;
Ok((release.url, request_body))
}
)
}
fn remote_server_binary_path(
&self,
platform: SshPlatform,
cx: &mut AsyncAppContext,
) -> Result<PathBuf> {
let release_channel = cx.update(|cx| ReleaseChannel::global(cx))?;
Ok(paths::remote_server_dir_relative().join(format!(
"zed-remote-server-{}-{}-{}",
release_channel.dev_name(),
platform.os,
platform.arch
)))
AutoUpdater::get_remote_server_release_url(
platform.os,
platform.arch,
release_channel,
version,
&mut cx,
)
.await
})
}
}

View file

@ -26,6 +26,7 @@ futures.workspace = true
gpui.workspace = true
itertools.workspace = true
log.workspace = true
paths.workspace = true
parking_lot.workspace = true
prost.workspace = true
rpc = { workspace = true, features = ["gpui"] }

View file

@ -22,6 +22,7 @@ use gpui::{
};
use itertools::Itertools;
use parking_lot::Mutex;
use paths;
use release_channel::{AppCommitSha, AppVersion, ReleaseChannel};
use rpc::{
proto::{self, build_typed_envelope, Envelope, EnvelopedMessage, PeerId, RequestMessage},
@ -42,7 +43,7 @@ use std::{
atomic::{AtomicU32, Ordering::SeqCst},
Arc, Weak,
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
time::{Duration, Instant},
};
use tempfile::TempDir;
use util::ResultExt;
@ -224,52 +225,19 @@ impl SshPlatform {
}
}
pub enum ServerBinary {
LocalBinary(PathBuf),
ReleaseUrl { url: String, body: String },
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ServerVersion {
Semantic(SemanticVersion),
Commit(String),
}
impl ServerVersion {
pub fn semantic_version(&self) -> Option<SemanticVersion> {
match self {
Self::Semantic(version) => Some(*version),
_ => None,
}
}
}
impl std::fmt::Display for ServerVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Semantic(version) => write!(f, "{}", version),
Self::Commit(commit) => write!(f, "{}", commit),
}
}
}
pub trait SshClientDelegate: Send + Sync {
fn ask_password(
&self,
prompt: String,
cx: &mut AsyncAppContext,
) -> oneshot::Receiver<Result<String>>;
fn remote_server_binary_path(
&self,
platform: SshPlatform,
cx: &mut AsyncAppContext,
) -> Result<PathBuf>;
fn get_download_params(
&self,
platform: SshPlatform,
release_channel: ReleaseChannel,
version: Option<SemanticVersion>,
cx: &mut AsyncAppContext,
) -> Task<Result<(String, String)>>;
) -> Task<Result<Option<(String, String)>>>;
fn download_server_binary_locally(
&self,
@ -290,16 +258,32 @@ impl SshSocket {
let mut command = process::Command::new("ssh");
let to_run = iter::once(&program)
.chain(args.iter())
.map(|token| shlex::try_quote(token).unwrap())
.map(|token| {
// We're trying to work with: sh, bash, zsh, fish, tcsh, ...?
debug_assert!(
!token.contains('\n'),
"multiline arguments do not work in all shells"
);
shlex::try_quote(token).unwrap()
})
.join(" ");
log::debug!("ssh {} {:?}", self.connection_options.ssh_url(), to_run);
self.ssh_options(&mut command)
.arg(self.connection_options.ssh_url())
.arg(to_run);
command
}
fn shell_script(&self, script: impl AsRef<str>) -> process::Command {
return self.ssh_command("sh", &["-c", script.as_ref()]);
async fn run_command(&self, program: &str, args: &[&str]) -> Result<String> {
let output = self.ssh_command(program, args).output().await?;
if output.status.success() {
Ok(String::from_utf8_lossy(&output.stdout).to_string())
} else {
Err(anyhow!(
"failed to run command: {}",
String::from_utf8_lossy(&output.stderr)
))
}
}
fn ssh_options<'a>(&self, command: &'a mut process::Command) -> &'a mut process::Command {
@ -322,18 +306,6 @@ impl SshSocket {
}
}
async fn run_cmd(mut command: process::Command) -> Result<String> {
let output = command.output().await?;
if output.status.success() {
Ok(String::from_utf8_lossy(&output.stdout).to_string())
} else {
Err(anyhow!(
"failed to run command: {}",
String::from_utf8_lossy(&output.stderr)
))
}
}
const MAX_MISSED_HEARTBEATS: usize = 5;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
@ -569,12 +541,8 @@ impl SshRemoteClient {
})?
.await
.map_err(|e| e.cloned())?;
let remote_binary_path = ssh_connection
.get_remote_binary_path(&delegate, false, &mut cx)
.await?;
let io_task = ssh_connection.start_proxy(
remote_binary_path,
unique_identifier,
false,
incoming_tx,
@ -753,12 +721,7 @@ impl SshRemoteClient {
.await
.map_err(|error| error.cloned())?;
let remote_binary_path = ssh_connection
.get_remote_binary_path(&delegate, true, &mut cx)
.await?;
let io_task = ssh_connection.start_proxy(
remote_binary_path,
unique_identifier,
true,
incoming_tx,
@ -1218,7 +1181,6 @@ trait RemoteConnection: Send + Sync {
#[allow(clippy::too_many_arguments)]
fn start_proxy(
&self,
remote_binary_path: PathBuf,
unique_identifier: String,
reconnect: bool,
incoming_tx: UnboundedSender<Envelope>,
@ -1227,12 +1189,6 @@ trait RemoteConnection: Send + Sync {
delegate: Arc<dyn SshClientDelegate>,
cx: &mut AsyncAppContext,
) -> Task<Result<i32>>;
async fn get_remote_binary_path(
&self,
delegate: &Arc<dyn SshClientDelegate>,
reconnect: bool,
cx: &mut AsyncAppContext,
) -> Result<PathBuf>;
async fn kill(&self) -> Result<()>;
fn has_been_killed(&self) -> bool;
fn ssh_args(&self) -> Vec<String>;
@ -1245,7 +1201,7 @@ trait RemoteConnection: Send + Sync {
struct SshRemoteConnection {
socket: SshSocket,
master_process: Mutex<Option<process::Child>>,
platform: SshPlatform,
remote_binary_path: Option<PathBuf>,
_temp_dir: TempDir,
}
@ -1271,28 +1227,8 @@ impl RemoteConnection for SshRemoteConnection {
fn connection_options(&self) -> SshConnectionOptions {
self.socket.connection_options.clone()
}
async fn get_remote_binary_path(
&self,
delegate: &Arc<dyn SshClientDelegate>,
reconnect: bool,
cx: &mut AsyncAppContext,
) -> Result<PathBuf> {
let platform = self.platform;
let remote_binary_path = delegate.remote_server_binary_path(platform, cx)?;
if !reconnect {
self.ensure_server_binary(&delegate, &remote_binary_path, platform, cx)
.await?;
}
let socket = self.socket.clone();
run_cmd(socket.ssh_command(&remote_binary_path.to_string_lossy(), &["version"])).await?;
Ok(remote_binary_path)
}
fn start_proxy(
&self,
remote_binary_path: PathBuf,
unique_identifier: String,
reconnect: bool,
incoming_tx: UnboundedSender<Envelope>,
@ -1303,6 +1239,10 @@ impl RemoteConnection for SshRemoteConnection {
) -> Task<Result<i32>> {
delegate.set_status(Some("Starting proxy"), cx);
let Some(remote_binary_path) = self.remote_binary_path.clone() else {
return Task::ready(Err(anyhow!("Remote binary path not set")));
};
let mut start_proxy_command = shell_script!(
"exec {binary_path} proxy --identifier {identifier}",
binary_path = &remote_binary_path.to_string_lossy(),
@ -1329,7 +1269,7 @@ impl RemoteConnection for SshRemoteConnection {
let ssh_proxy_process = match self
.socket
.shell_script(start_proxy_command)
.ssh_command("sh", &["-c", &start_proxy_command])
// IMPORTANT: we kill this process when we drop the task that uses it.
.kill_on_drop(true)
.spawn()
@ -1511,8 +1451,33 @@ impl SshRemoteConnection {
socket_path,
};
let os = run_cmd(socket.ssh_command("uname", &["-s"])).await?;
let arch = run_cmd(socket.ssh_command("uname", &["-m"])).await?;
let mut this = Self {
socket,
master_process: Mutex::new(Some(master_process)),
_temp_dir: temp_dir,
remote_binary_path: None,
};
let (release_channel, version, commit) = cx.update(|cx| {
(
ReleaseChannel::global(cx),
AppVersion::global(cx),
AppCommitSha::try_global(cx),
)
})?;
this.remote_binary_path = Some(
this.ensure_server_binary(&delegate, release_channel, version, commit, cx)
.await?,
);
Ok(this)
}
async fn platform(&self) -> Result<SshPlatform> {
let uname = self.socket.run_command("uname", &["-sm"]).await?;
let Some((os, arch)) = uname.split_once(" ") else {
Err(anyhow!("unknown uname: {uname:?}"))?
};
let os = match os.trim() {
"Darwin" => "macos",
@ -1527,14 +1492,7 @@ impl SshRemoteConnection {
Err(anyhow!("unknown uname architecture {arch:?}"))?
};
let platform = SshPlatform { os, arch };
Ok(Self {
socket,
master_process: Mutex::new(Some(master_process)),
platform,
_temp_dir: temp_dir,
})
Ok(SshPlatform { os, arch })
}
fn multiplex(
@ -1639,383 +1597,189 @@ impl SshRemoteConnection {
})
}
#[allow(unused)]
async fn ensure_server_binary(
&self,
delegate: &Arc<dyn SshClientDelegate>,
dst_path: &Path,
platform: SshPlatform,
release_channel: ReleaseChannel,
version: SemanticVersion,
commit: Option<AppCommitSha>,
cx: &mut AsyncAppContext,
) -> Result<()> {
let lock_file = dst_path.with_extension("lock");
let lock_content = {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("failed to get timestamp")?
.as_secs();
let source_port = self.get_ssh_source_port().await?;
format!("{} {}", source_port, timestamp)
};
) -> Result<PathBuf> {
let version_str = match release_channel {
ReleaseChannel::Nightly => {
let commit = commit.map(|s| s.0.to_string()).unwrap_or_default();
let lock_stale_age = Duration::from_secs(10 * 60);
let max_wait_time = Duration::from_secs(10 * 60);
let check_interval = Duration::from_secs(5);
let start_time = Instant::now();
loop {
let lock_acquired = self.create_lock_file(&lock_file, &lock_content).await?;
if lock_acquired {
delegate.set_status(Some("Acquired lock file on host"), cx);
let result = self
.update_server_binary_if_needed(delegate, dst_path, platform, cx)
.await;
self.remove_lock_file(&lock_file).await.ok();
return result;
} else {
if let Ok(is_stale) = self.is_lock_stale(&lock_file, &lock_stale_age).await {
if is_stale {
delegate.set_status(
Some("Detected lock file on host being stale. Removing"),
cx,
);
self.remove_lock_file(&lock_file).await?;
continue;
} else {
if start_time.elapsed() > max_wait_time {
return Err(anyhow!("Timeout waiting for lock to be released"));
}
log::info!(
"Found lockfile: {:?}. Will check again in {:?}",
lock_file,
check_interval
);
delegate.set_status(
Some("Waiting for another Zed instance to finish uploading binary"),
cx,
);
smol::Timer::after(check_interval).await;
continue;
}
} else {
// Unable to check lock, assume it's valid and wait
if start_time.elapsed() > max_wait_time {
return Err(anyhow!("Timeout waiting for lock to be released"));
}
smol::Timer::after(check_interval).await;
continue;
}
format!("{}-{}", version, commit)
}
}
}
async fn get_ssh_source_port(&self) -> Result<String> {
let output = run_cmd(self.socket.shell_script("echo $SSH_CLIENT | cut -d' ' -f2"))
.await
.context("failed to get source port from SSH_CLIENT on host")?;
Ok(output.trim().to_string())
}
async fn create_lock_file(&self, lock_file: &Path, content: &str) -> Result<bool> {
let parent_dir = lock_file
.parent()
.ok_or_else(|| anyhow!("Lock file path has no parent directory"))?;
let script = format!(
r#"mkdir -p "{parent_dir}" && [ ! -f "{lock_file}" ] && echo "{content}" > "{lock_file}" && echo "created" || echo "exists""#,
parent_dir = parent_dir.display(),
lock_file = lock_file.display(),
content = content,
ReleaseChannel::Dev => "build".to_string(),
_ => version.to_string(),
};
let binary_name = format!(
"zed-remote-server-{}-{}",
release_channel.dev_name(),
version_str
);
let dst_path = paths::remote_server_dir_relative().join(binary_name);
let tmp_path_gz = PathBuf::from(format!(
"{}-download-{}.gz",
dst_path.to_string_lossy(),
std::process::id()
));
let output = run_cmd(self.socket.shell_script(&script))
.await
.with_context(|| format!("failed to create a lock file at {:?}", lock_file))?;
Ok(output.trim() == "created")
}
fn generate_stale_check_script(lock_file: &Path, max_age: u64) -> String {
shell_script!(
r#"
if [ ! -f "{lock_file}" ]; then
echo "lock file does not exist"
exit 0
fi
read -r port timestamp < "{lock_file}"
# Check if port is still active
if command -v ss >/dev/null 2>&1; then
if ! ss -n | grep -q ":$port[[:space:]]"; then
echo "ss reports port $port is not open"
exit 0
fi
elif command -v netstat >/dev/null 2>&1; then
if ! netstat -n | grep -q ":$port[[:space:]]"; then
echo "netstat reports port $port is not open"
exit 0
fi
fi
# Check timestamp
if [ $(( $(date +%s) - timestamp )) -gt {max_age} ]; then
echo "timestamp in lockfile is too old"
else
echo "recent"
fi"#,
lock_file = &lock_file.to_string_lossy(),
max_age = &max_age.to_string()
)
}
async fn is_lock_stale(&self, lock_file: &Path, max_age: &Duration) -> Result<bool> {
let script = Self::generate_stale_check_script(lock_file, max_age.as_secs());
let output = run_cmd(self.socket.shell_script(script))
.await
.with_context(|| {
format!("failed to check whether lock file {:?} is stale", lock_file)
})?;
let trimmed = output.trim();
let is_stale = trimmed != "recent";
log::info!("checked lockfile for staleness. stale: {is_stale}, output: {trimmed:?}");
Ok(is_stale)
}
async fn remove_lock_file(&self, lock_file: &Path) -> Result<()> {
run_cmd(
self.socket
.ssh_command("rm", &["-f", &lock_file.to_string_lossy()]),
)
.await
.context("failed to remove lock file")?;
Ok(())
}
async fn update_server_binary_if_needed(
&self,
delegate: &Arc<dyn SshClientDelegate>,
dst_path: &Path,
platform: SshPlatform,
cx: &mut AsyncAppContext,
) -> Result<()> {
let current_version = match run_cmd(
self.socket
.ssh_command(&dst_path.to_string_lossy(), &["version"]),
)
.await
{
Ok(version_output) => {
if let Ok(version) = version_output.trim().parse::<SemanticVersion>() {
Some(ServerVersion::Semantic(version))
} else {
Some(ServerVersion::Commit(version_output.trim().to_string()))
}
}
Err(_) => None,
};
let (release_channel, wanted_version) = cx.update(|cx| {
let release_channel = ReleaseChannel::global(cx);
let wanted_version = match release_channel {
ReleaseChannel::Nightly => {
AppCommitSha::try_global(cx).map(|sha| ServerVersion::Commit(sha.0))
}
ReleaseChannel::Dev => None,
_ => Some(ServerVersion::Semantic(AppVersion::global(cx))),
};
(release_channel, wanted_version)
})?;
match (&current_version, &wanted_version) {
(Some(current), Some(wanted)) if current == wanted => {
log::info!("remote development server present and matching client version");
return Ok(());
}
(Some(ServerVersion::Semantic(current)), Some(ServerVersion::Semantic(wanted)))
if current > wanted =>
{
anyhow::bail!("The version of the remote server ({}) is newer than the Zed version ({}). Please update Zed.", current, wanted);
}
_ => {
log::info!("Installing remote development server");
}
}
if self.is_binary_in_use(dst_path).await? {
// When we're not in dev mode, we don't want to switch out the binary if it's
// still open.
// In dev mode, that's fine, since we often kill Zed processes with Ctrl-C and want
// to still replace the binary.
if cfg!(not(debug_assertions)) {
anyhow::bail!("The remote server version ({:?}) does not match the wanted version ({:?}), but is in use by another Zed client so cannot be upgraded.", &current_version, &wanted_version)
} else {
log::info!("Binary is currently in use, ignoring because this is a dev build")
}
}
if wanted_version.is_none() {
if std::env::var("ZED_BUILD_REMOTE_SERVER").is_err() {
if let Some(current_version) = current_version {
log::warn!(
"In development, using cached remote server binary version ({})",
current_version
);
return Ok(());
} else {
anyhow::bail!(
"ZED_BUILD_REMOTE_SERVER is not set, but no remote server exists at ({:?})",
dst_path
)
}
}
#[cfg(debug_assertions)]
{
let src_path = self.build_local(platform, delegate, cx).await?;
return self
.upload_local_server_binary(&src_path, dst_path, delegate, cx)
.await;
}
#[cfg(not(debug_assertions))]
anyhow::bail!("Running development build in release mode, cannot cross compile (unset ZED_BUILD_REMOTE_SERVER)")
}
let upload_binary_over_ssh = self.socket.connection_options.upload_binary_over_ssh;
if !upload_binary_over_ssh {
let (url, body) = delegate
.get_download_params(
platform,
release_channel,
wanted_version.clone().and_then(|v| v.semantic_version()),
cx,
)
#[cfg(debug_assertions)]
if std::env::var("ZED_BUILD_REMOTE_SERVER").is_ok() {
let src_path = self
.build_local(self.platform().await?, delegate, cx)
.await?;
self.upload_local_server_binary(&src_path, &tmp_path_gz, delegate, cx)
.await?;
self.extract_server_binary(&dst_path, &tmp_path_gz, delegate, cx)
.await?;
return Ok(dst_path);
}
match self
.download_binary_on_server(&url, &body, dst_path, delegate, cx)
.await
if self
.socket
.run_command(&dst_path.to_string_lossy(), &["version"])
.await
.is_ok()
{
return Ok(dst_path);
}
let wanted_version = cx.update(|cx| match release_channel {
ReleaseChannel::Nightly => Ok(None),
ReleaseChannel::Dev => {
anyhow::bail!(
"ZED_BUILD_REMOTE_SERVER is not set and no remote server exists at ({:?})",
dst_path
)
}
_ => Ok(Some(AppVersion::global(cx))),
})??;
let platform = self.platform().await?;
if !self.socket.connection_options.upload_binary_over_ssh {
if let Some((url, body)) = delegate
.get_download_params(platform, release_channel, wanted_version, cx)
.await?
{
Ok(_) => return Ok(()),
Err(e) => {
log::error!(
"Failed to download binary on server, attempting to upload server: {}",
e
)
match self
.download_binary_on_server(&url, &body, &tmp_path_gz, delegate, cx)
.await
{
Ok(_) => {
self.extract_server_binary(&dst_path, &tmp_path_gz, delegate, cx)
.await?;
return Ok(dst_path);
}
Err(e) => {
log::error!(
"Failed to download binary on server, attempting to upload server: {}",
e
)
}
}
}
}
let src_path = delegate
.download_server_binary_locally(
platform,
release_channel,
wanted_version.and_then(|v| v.semantic_version()),
cx,
)
.download_server_binary_locally(platform, release_channel, wanted_version, cx)
.await?;
self.upload_local_server_binary(&src_path, dst_path, delegate, cx)
.await
}
async fn is_binary_in_use(&self, binary_path: &Path) -> Result<bool> {
let script = shell_script!(
r#"
if command -v lsof >/dev/null 2>&1; then
if lsof "{binary_path}" >/dev/null 2>&1; then
echo "in_use"
exit 0
fi
elif command -v fuser >/dev/null 2>&1; then
if fuser "{binary_path}" >/dev/null 2>&1; then
echo "in_use"
exit 0
fi
fi
echo "not_in_use"
"#,
binary_path = &binary_path.to_string_lossy(),
);
let output = run_cmd(self.socket.shell_script(script))
.await
.context("failed to check if binary is in use")?;
Ok(output.trim() == "in_use")
self.upload_local_server_binary(&src_path, &tmp_path_gz, delegate, cx)
.await?;
self.extract_server_binary(&dst_path, &tmp_path_gz, delegate, cx)
.await?;
return Ok(dst_path);
}
async fn download_binary_on_server(
&self,
url: &str,
body: &str,
dst_path: &Path,
tmp_path_gz: &Path,
delegate: &Arc<dyn SshClientDelegate>,
cx: &mut AsyncAppContext,
) -> Result<()> {
let mut dst_path_gz = dst_path.to_path_buf();
dst_path_gz.set_extension("gz");
if let Some(parent) = dst_path.parent() {
run_cmd(
self.socket
.ssh_command("mkdir", &["-p", &parent.to_string_lossy()]),
)
.await?;
if let Some(parent) = tmp_path_gz.parent() {
self.socket
.run_command("mkdir", &["-p", &parent.to_string_lossy()])
.await?;
}
delegate.set_status(Some("Downloading remote development server on host"), cx);
let script = shell_script!(
r#"
if command -v curl >/dev/null 2>&1; then
curl -f -L -X GET -H "Content-Type: application/json" -d {body} {url} -o {dst_path} && echo "curl"
elif command -v wget >/dev/null 2>&1; then
wget --max-redirect=5 --method=GET --header="Content-Type: application/json" --body-data={body} {url} -O {dst_path} && echo "wget"
else
echo "Neither curl nor wget is available" >&2
exit 1
fi
"#,
body = body,
url = url,
dst_path = &dst_path_gz.to_string_lossy(),
);
let output = run_cmd(self.socket.shell_script(script))
match self
.socket
.run_command(
"curl",
&[
"-f",
"-L",
"-X",
"GET",
"-H",
"Content-Type: application/json",
"-d",
&body,
&url,
"-o",
&tmp_path_gz.to_string_lossy(),
],
)
.await
.context("Failed to download server binary")?;
{
Ok(_) => {}
Err(e) => {
if self.socket.run_command("which", &["curl"]).await.is_ok() {
return Err(e);
}
if !output.contains("curl") && !output.contains("wget") {
return Err(anyhow!("Failed to download server binary: {}", output));
match self
.socket
.run_command(
"wget",
&[
"--max-redirect=5",
"--method=GET",
"--header=Content-Type: application/json",
"--body-data",
&body,
&url,
"-O",
&tmp_path_gz.to_string_lossy(),
],
)
.await
{
Ok(_) => {}
Err(e) => {
if self.socket.run_command("which", &["wget"]).await.is_ok() {
return Err(e);
} else {
anyhow::bail!("Neither curl nor wget is available");
}
}
}
}
}
self.extract_server_binary(dst_path, &dst_path_gz, delegate, cx)
.await
Ok(())
}
async fn upload_local_server_binary(
&self,
src_path: &Path,
dst_path: &Path,
tmp_path_gz: &Path,
delegate: &Arc<dyn SshClientDelegate>,
cx: &mut AsyncAppContext,
) -> Result<()> {
let mut dst_path_gz = dst_path.to_path_buf();
dst_path_gz.set_extension("gz");
if let Some(parent) = dst_path.parent() {
run_cmd(
self.socket
.ssh_command("mkdir", &["-p", &parent.to_string_lossy()]),
)
.await?;
if let Some(parent) = tmp_path_gz.parent() {
self.socket
.run_command("mkdir", &["-p", &parent.to_string_lossy()])
.await?;
}
let src_stat = fs::metadata(&src_path).await?;
@ -2023,42 +1787,41 @@ impl SshRemoteConnection {
let t0 = Instant::now();
delegate.set_status(Some("Uploading remote development server"), cx);
log::info!("uploading remote development server ({}kb)", size / 1024);
self.upload_file(&src_path, &dst_path_gz)
log::info!(
"uploading remote development server to {:?} ({}kb)",
tmp_path_gz,
size / 1024
);
self.upload_file(&src_path, &tmp_path_gz)
.await
.context("failed to upload server binary")?;
log::info!("uploaded remote development server in {:?}", t0.elapsed());
self.extract_server_binary(dst_path, &dst_path_gz, delegate, cx)
.await
Ok(())
}
async fn extract_server_binary(
&self,
dst_path: &Path,
dst_path_gz: &Path,
tmp_path_gz: &Path,
delegate: &Arc<dyn SshClientDelegate>,
cx: &mut AsyncAppContext,
) -> Result<()> {
delegate.set_status(Some("Extracting remote development server"), cx);
run_cmd(
self.socket
.ssh_command("gunzip", &["-f", &dst_path_gz.to_string_lossy()]),
)
.await?;
let server_mode = 0o755;
delegate.set_status(Some("Marking remote development server executable"), cx);
run_cmd(self.socket.ssh_command(
"chmod",
&[&format!("{:o}", server_mode), &dst_path.to_string_lossy()],
))
.await?;
let script = shell_script!(
"gunzip -f {tmp_path_gz} && chmod {server_mode} {tmp_path} && mv {tmp_path} {dst_path}",
tmp_path_gz = &tmp_path_gz.to_string_lossy(),
tmp_path = &tmp_path_gz.to_string_lossy().strip_suffix(".gz").unwrap(),
server_mode = &format!("{:o}", server_mode),
dst_path = &dst_path.to_string_lossy()
);
self.socket.run_command("sh", &["-c", &script]).await?;
Ok(())
}
async fn upload_file(&self, src_path: &Path, dest_path: &Path) -> Result<()> {
log::debug!("uploading file {:?} to {:?}", src_path, dest_path);
let mut command = process::Command::new("scp");
let output = self
.socket
@ -2574,18 +2337,9 @@ mod fake {
.reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(&cx));
}
async fn get_remote_binary_path(
&self,
_delegate: &Arc<dyn SshClientDelegate>,
_reconnect: bool,
_cx: &mut AsyncAppContext,
) -> Result<PathBuf> {
Ok(PathBuf::new())
}
fn start_proxy(
&self,
_remote_binary_path: PathBuf,
_unique_identifier: String,
_reconnect: bool,
mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
@ -2652,94 +2406,10 @@ mod fake {
_release_channel: ReleaseChannel,
_version: Option<SemanticVersion>,
_cx: &mut AsyncAppContext,
) -> Task<Result<(String, String)>> {
) -> Task<Result<Option<(String, String)>>> {
unreachable!()
}
fn set_status(&self, _: Option<&str>, _: &mut AsyncAppContext) {}
fn remote_server_binary_path(
&self,
_platform: SshPlatform,
_cx: &mut AsyncAppContext,
) -> Result<PathBuf> {
unreachable!()
}
}
}
#[cfg(all(test, unix))]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
fn run_stale_check_script(
lock_file: &Path,
max_age: Duration,
simulate_port_open: Option<&str>,
) -> Result<String> {
let wrapper = format!(
r#"
# Mock ss/netstat commands
ss() {{
# Only handle the -n argument
if [ "$1" = "-n" ]; then
# If we're simulating an open port, output a line containing that port
if [ "{simulated_port}" != "" ]; then
echo "ESTAB 0 0 1.2.3.4:{simulated_port} 5.6.7.8:12345"
fi
fi
}}
netstat() {{
ss "$@"
}}
export -f ss netstat
# Real script starts here
{script}"#,
simulated_port = simulate_port_open.unwrap_or(""),
script = SshRemoteConnection::generate_stale_check_script(lock_file, max_age.as_secs())
);
let output = std::process::Command::new("bash")
.arg("-c")
.arg(&wrapper)
.output()?;
if !output.stderr.is_empty() {
eprintln!("Script stderr: {}", String::from_utf8_lossy(&output.stderr));
}
Ok(String::from_utf8(output.stdout)?.trim().to_string())
}
#[test]
fn test_lock_staleness() -> Result<()> {
let temp_dir = TempDir::new()?;
let lock_file = temp_dir.path().join("test.lock");
// Test 1: No lock file
let output = run_stale_check_script(&lock_file, Duration::from_secs(600), None)?;
assert_eq!(output, "lock file does not exist");
// Test 2: Lock file with port that's not open
fs::write(&lock_file, "54321 1234567890")?;
let output = run_stale_check_script(&lock_file, Duration::from_secs(600), Some("98765"))?;
assert_eq!(output, "ss reports port 54321 is not open");
// Test 3: Lock file with port that is open but old timestamp
let old_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - 700; // 700 seconds ago
fs::write(&lock_file, format!("54321 {}", old_timestamp))?;
let output = run_stale_check_script(&lock_file, Duration::from_secs(600), Some("54321"))?;
assert_eq!(output, "timestamp in lockfile is too old");
// Test 4: Lock file with port that is open and recent timestamp
let recent_timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - 60; // 1 minute ago
fs::write(&lock_file, format!("54321 {}", recent_timestamp))?;
let output = run_stale_check_script(&lock_file, Duration::from_secs(600), Some("54321"))?;
assert_eq!(output, "recent");
Ok(())
}
}

View file

@ -53,6 +53,7 @@ serde_json.workspace = true
settings.workspace = true
shellexpand.workspace = true
smol.workspace = true
sysinfo.workspace = true
telemetry_events.workspace = true
util.workspace = true
worktree.workspace = true

View file

@ -7,7 +7,7 @@ use fs::{Fs, RealFs};
use futures::channel::mpsc;
use futures::{select, select_biased, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt};
use git::GitHostingProviderRegistry;
use gpui::{AppContext, Context as _, Model, ModelContext, UpdateGlobal as _};
use gpui::{AppContext, Context as _, Model, ModelContext, SemanticVersion, UpdateGlobal as _};
use http_client::{read_proxy_from_env, Uri};
use language::LanguageRegistry;
use node_runtime::{NodeBinaryOptions, NodeRuntime};
@ -31,6 +31,7 @@ use smol::Async;
use smol::{net::unix::UnixListener, stream::StreamExt as _};
use std::ffi::OsStr;
use std::ops::ControlFlow;
use std::str::FromStr;
use std::{env, thread};
use std::{
io::Write,
@ -466,6 +467,10 @@ pub fn execute_run(
handle_panic_requests(&project, &session);
cx.background_executor()
.spawn(async move { cleanup_old_binaries() })
.detach();
mem::forget(project);
});
log::info!("gpui app is shut down. quitting.");
@ -874,3 +879,49 @@ unsafe fn redirect_standard_streams() -> Result<()> {
Ok(())
}
fn cleanup_old_binaries() -> Result<()> {
let server_dir = paths::remote_server_dir_relative();
let release_channel = release_channel::RELEASE_CHANNEL.dev_name();
let prefix = format!("zed-remote-server-{}-", release_channel);
for entry in std::fs::read_dir(server_dir)? {
let path = entry?.path();
if let Some(file_name) = path.file_name() {
if let Some(version) = file_name.to_string_lossy().strip_prefix(&prefix) {
if !is_new_version(version) && !is_file_in_use(file_name) {
log::info!("removing old remote server binary: {:?}", path);
std::fs::remove_file(&path)?;
}
}
}
}
Ok(())
}
fn is_new_version(version: &str) -> bool {
SemanticVersion::from_str(version)
.ok()
.zip(SemanticVersion::from_str(env!("ZED_PKG_VERSION")).ok())
.is_some_and(|(version, current_version)| version >= current_version)
}
fn is_file_in_use(file_name: &OsStr) -> bool {
let info =
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_processes(
sysinfo::ProcessRefreshKind::new().with_exe(sysinfo::UpdateKind::Always),
));
for process in info.processes().values() {
if process
.exe()
.is_some_and(|exe| exe.file_name().is_some_and(|name| name == file_name))
{
return true;
}
}
false
}