Add the ability to edit remote directories over SSH (#14530)

This is a first step towards allowing you to edit remote projects
directly over SSH. We'll start with a pretty bare-bones feature set, and
incrementally add further features.

### Todo

Distribution
* [x] Build nightly releases of `zed-remote-server` binaries
    * [x] linux (arm + x86)
    * [x] mac (arm + x86)
* [x] Build stable + preview releases of `zed-remote-server`
* [x] download and cache remote server binaries as needed when opening
ssh project
* [x] ensure server has the latest version of the binary


Auth
* [x] allow specifying password at the command line
* [x] auth via ssh keys
* [x] UI password prompt

Features
* [x] upload remote server binary to server automatically
* [x] opening directories
* [x] tracking file system updates
* [x] opening, editing, saving buffers
* [ ] file operations (rename, delete, create)
* [ ] git diffs
* [ ] project search

Release Notes:

- N/A

---------

Co-authored-by: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com>
This commit is contained in:
Max Brunsfeld 2024-07-19 10:27:26 -07:00 committed by GitHub
parent 7733bf686b
commit b9a53ffa0b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
50 changed files with 2194 additions and 250 deletions

View file

@ -13,7 +13,6 @@ use futures::{
mpsc::{self, UnboundedSender},
oneshot,
},
future::BoxFuture,
select_biased,
stream::select,
task::Poll,
@ -129,13 +128,11 @@ struct ScanRequest {
pub struct RemoteWorktree {
snapshot: Snapshot,
background_snapshot: Arc<Mutex<Snapshot>>,
background_snapshot: Arc<Mutex<(Snapshot, Vec<proto::UpdateWorktree>)>>,
project_id: u64,
client: AnyProtoClient,
updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
update_observer: Arc<
Mutex<Option<Box<dyn Send + FnMut(proto::UpdateWorktree) -> BoxFuture<'static, bool>>>>,
>,
update_observer: Option<mpsc::UnboundedSender<proto::UpdateWorktree>>,
snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>,
replica_id: ReplicaId,
visible: bool,
@ -463,10 +460,9 @@ impl Worktree {
Arc::from(PathBuf::from(worktree.abs_path)),
);
let (updates_tx, mut updates_rx) = mpsc::unbounded();
let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
let background_snapshot = Arc::new(Mutex::new((snapshot.clone(), Vec::new())));
let (background_updates_tx, mut background_updates_rx) = mpsc::unbounded();
let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
let update_observer = Arc::new(Mutex::new(None));
let worktree = RemoteWorktree {
client,
@ -474,36 +470,45 @@ impl Worktree {
replica_id,
snapshot,
background_snapshot: background_snapshot.clone(),
update_observer: update_observer.clone(),
updates_tx: Some(updates_tx),
updates_tx: Some(background_updates_tx),
update_observer: None,
snapshot_subscriptions: Default::default(),
visible: worktree.visible,
disconnected: false,
};
// Apply updates to a separate snapshto in a background task, then
// send them to a foreground task which updates the model.
cx.background_executor()
.spawn(async move {
while let Some(update) = updates_rx.next().await {
let call = update_observer
.lock()
.as_mut()
.map(|observer| (observer)(update.clone()));
if let Some(call) = call {
call.await;
}
if let Err(error) = background_snapshot.lock().apply_remote_update(update) {
log::error!("error applying worktree update: {}", error);
while let Some(update) = background_updates_rx.next().await {
{
let mut lock = background_snapshot.lock();
if let Err(error) = lock.0.apply_remote_update(update.clone()) {
log::error!("error applying worktree update: {}", error);
}
lock.1.push(update);
}
snapshot_updated_tx.send(()).await.ok();
}
})
.detach();
// On the foreground task, update to the latest snapshot and notify
// any update observer of all updates that led to that snapshot.
cx.spawn(|this, mut cx| async move {
while (snapshot_updated_rx.recv().await).is_some() {
this.update(&mut cx, |this, cx| {
let this = this.as_remote_mut().unwrap();
this.snapshot = this.background_snapshot.lock().clone();
{
let mut lock = this.background_snapshot.lock();
this.snapshot = lock.0.clone();
if let Some(tx) = &this.update_observer {
for update in lock.1.drain(..) {
tx.unbounded_send(update).ok();
}
}
};
cx.emit(Event::UpdatedEntries(Arc::from([])));
cx.notify();
while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
@ -631,11 +636,7 @@ impl Worktree {
{
match self {
Worktree::Local(this) => this.observe_updates(project_id, cx, callback),
Worktree::Remote(this) => {
this.update_observer
.lock()
.replace(Box::new(move |update| callback(update).boxed()));
}
Worktree::Remote(this) => this.observe_updates(project_id, cx, callback),
}
}
@ -645,7 +646,7 @@ impl Worktree {
this.update_observer.take();
}
Worktree::Remote(this) => {
this.update_observer.lock().take();
this.update_observer.take();
}
}
}
@ -654,7 +655,7 @@ impl Worktree {
pub fn has_update_observer(&self) -> bool {
match self {
Worktree::Local(this) => this.update_observer.is_some(),
Worktree::Remote(this) => this.update_observer.lock().is_some(),
Worktree::Remote(this) => this.update_observer.is_some(),
}
}
@ -739,24 +740,7 @@ impl Worktree {
) -> Option<Task<Result<()>>> {
match self {
Worktree::Local(this) => this.delete_entry(entry_id, trash, cx),
Worktree::Remote(this) => {
let response = this.client.request(proto::DeleteProjectEntry {
project_id: this.project_id,
entry_id: entry_id.to_proto(),
use_trash: trash,
});
Some(cx.spawn(move |this, mut cx| async move {
let response = response.await?;
this.update(&mut cx, move |worktree, cx| {
worktree.as_remote_mut().unwrap().delete_entry(
entry_id,
response.worktree_scan_id as usize,
cx,
)
})?
.await
}))
}
Worktree::Remote(this) => this.delete_entry(entry_id, trash, cx),
}
}
@ -769,36 +753,7 @@ impl Worktree {
let new_path = new_path.into();
match self {
Worktree::Local(this) => this.rename_entry(entry_id, new_path, cx),
Worktree::Remote(this) => {
let response = this.client.request(proto::RenameProjectEntry {
project_id: this.project_id,
entry_id: entry_id.to_proto(),
new_path: new_path.to_string_lossy().into(),
});
cx.spawn(move |this, mut cx| async move {
let response = response.await?;
match response.entry {
Some(entry) => this
.update(&mut cx, |this, cx| {
this.as_remote_mut().unwrap().insert_entry(
entry,
response.worktree_scan_id as usize,
cx,
)
})?
.await
.map(CreatedEntry::Included),
None => {
let abs_path = this.update(&mut cx, |worktree, _| {
worktree
.absolutize(&new_path)
.with_context(|| format!("absolutizing {new_path:?}"))
})??;
Ok(CreatedEntry::Excluded { abs_path })
}
}
})
}
Worktree::Remote(this) => this.rename_entry(entry_id, new_path, cx),
}
}
@ -1825,6 +1780,40 @@ impl RemoteWorktree {
}
}
fn observe_updates<F, Fut>(
&mut self,
project_id: u64,
cx: &mut ModelContext<Worktree>,
callback: F,
) where
F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut,
Fut: 'static + Send + Future<Output = bool>,
{
let (tx, mut rx) = mpsc::unbounded();
let initial_update = self
.snapshot
.build_initial_update(project_id, self.id().to_proto());
self.updates_tx = Some(tx);
cx.spawn(|this, mut cx| async move {
let mut update = initial_update;
loop {
if !callback(update).await {
break;
}
if let Some(next_update) = rx.next().await {
update = next_update;
} else {
break;
}
}
this.update(&mut cx, |this, _| {
let this = this.as_remote_mut().unwrap();
this.updates_tx.take();
})
})
.detach();
}
fn observed_snapshot(&self, scan_id: usize) -> bool {
self.completed_scan_id >= scan_id
}
@ -1861,7 +1850,7 @@ impl RemoteWorktree {
wait_for_snapshot.await?;
this.update(&mut cx, |worktree, _| {
let worktree = worktree.as_remote_mut().unwrap();
let mut snapshot = worktree.background_snapshot.lock();
let snapshot = &mut worktree.background_snapshot.lock().0;
let entry = snapshot.insert_entry(entry);
worktree.snapshot = snapshot.clone();
entry
@ -1871,20 +1860,67 @@ impl RemoteWorktree {
fn delete_entry(
&mut self,
id: ProjectEntryId,
scan_id: usize,
entry_id: ProjectEntryId,
trash: bool,
cx: &mut ModelContext<Worktree>,
) -> Task<Result<()>> {
let wait_for_snapshot = self.wait_for_snapshot(scan_id);
) -> Option<Task<Result<()>>> {
let response = self.client.request(proto::DeleteProjectEntry {
project_id: self.project_id,
entry_id: entry_id.to_proto(),
use_trash: trash,
});
Some(cx.spawn(move |this, mut cx| async move {
let response = response.await?;
let scan_id = response.worktree_scan_id as usize;
this.update(&mut cx, move |this, _| {
this.as_remote_mut().unwrap().wait_for_snapshot(scan_id)
})?
.await?;
this.update(&mut cx, |this, _| {
let this = this.as_remote_mut().unwrap();
let snapshot = &mut this.background_snapshot.lock().0;
snapshot.delete_entry(entry_id);
this.snapshot = snapshot.clone();
})
}))
}
fn rename_entry(
&mut self,
entry_id: ProjectEntryId,
new_path: impl Into<Arc<Path>>,
cx: &mut ModelContext<Worktree>,
) -> Task<Result<CreatedEntry>> {
let new_path = new_path.into();
let response = self.client.request(proto::RenameProjectEntry {
project_id: self.project_id,
entry_id: entry_id.to_proto(),
new_path: new_path.to_string_lossy().into(),
});
cx.spawn(move |this, mut cx| async move {
wait_for_snapshot.await?;
this.update(&mut cx, |worktree, _| {
let worktree = worktree.as_remote_mut().unwrap();
let mut snapshot = worktree.background_snapshot.lock();
snapshot.delete_entry(id);
worktree.snapshot = snapshot.clone();
})?;
Ok(())
let response = response.await?;
match response.entry {
Some(entry) => this
.update(&mut cx, |this, cx| {
this.as_remote_mut().unwrap().insert_entry(
entry,
response.worktree_scan_id as usize,
cx,
)
})?
.await
.map(CreatedEntry::Included),
None => {
let abs_path = this.update(&mut cx, |worktree, _| {
worktree
.absolutize(&new_path)
.with_context(|| format!("absolutizing {new_path:?}"))
})??;
Ok(CreatedEntry::Excluded { abs_path })
}
}
})
}
}
@ -1912,6 +1948,35 @@ impl Snapshot {
&self.abs_path
}
fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree {
let mut updated_entries = self
.entries_by_path
.iter()
.map(proto::Entry::from)
.collect::<Vec<_>>();
updated_entries.sort_unstable_by_key(|e| e.id);
let mut updated_repositories = self
.repository_entries
.values()
.map(proto::RepositoryEntry::from)
.collect::<Vec<_>>();
updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
proto::UpdateWorktree {
project_id,
worktree_id,
abs_path: self.abs_path().to_string_lossy().into(),
root_name: self.root_name().to_string(),
updated_entries,
removed_entries: Vec::new(),
scan_id: self.scan_id as u64,
is_last_update: self.completed_scan_id == self.scan_id,
updated_repositories,
removed_repositories: Vec::new(),
}
}
pub fn absolutize(&self, path: &Path) -> Result<PathBuf> {
if path
.components()
@ -1978,6 +2043,12 @@ impl Snapshot {
}
pub(crate) fn apply_remote_update(&mut self, mut update: proto::UpdateWorktree) -> Result<()> {
log::trace!(
"applying remote worktree update. {} entries updated, {} removed",
update.updated_entries.len(),
update.removed_entries.len()
);
let mut entries_by_path_edits = Vec::new();
let mut entries_by_id_edits = Vec::new();
@ -2372,35 +2443,6 @@ impl LocalSnapshot {
}
}
fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree {
let mut updated_entries = self
.entries_by_path
.iter()
.map(proto::Entry::from)
.collect::<Vec<_>>();
updated_entries.sort_unstable_by_key(|e| e.id);
let mut updated_repositories = self
.repository_entries
.values()
.map(proto::RepositoryEntry::from)
.collect::<Vec<_>>();
updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
proto::UpdateWorktree {
project_id,
worktree_id,
abs_path: self.abs_path().to_string_lossy().into(),
root_name: self.root_name().to_string(),
updated_entries,
removed_entries: Vec::new(),
scan_id: self.scan_id as u64,
is_last_update: self.completed_scan_id == self.scan_id,
updated_repositories,
removed_repositories: Vec::new(),
}
}
fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
if entry.is_file() && entry.path.file_name() == Some(&GITIGNORE) {
let abs_path = self.abs_path.join(&entry.path);
@ -2999,9 +3041,9 @@ impl language::File for File {
self
}
fn to_proto(&self) -> rpc::proto::File {
fn to_proto(&self, cx: &AppContext) -> rpc::proto::File {
rpc::proto::File {
worktree_id: self.worktree.entity_id().as_u64(),
worktree_id: self.worktree.read(cx).id().to_proto(),
entry_id: self.entry_id.map(|id| id.to_proto()),
path: self.path.to_string_lossy().into(),
mtime: self.mtime.map(|time| time.into()),