From d1b4384f80765bf5cd434b6f56c134b5aa69ecba Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 23 Feb 2022 19:04:22 +0100 Subject: [PATCH] WIP --- crates/project/src/worktree.rs | 114 +++++++++++++++++---------------- crates/rpc/proto/zed.proto | 7 +- crates/rpc/src/proto.rs | 3 - crates/server/src/rpc.rs | 81 ++++++++--------------- crates/server/src/rpc/store.rs | 30 --------- 5 files changed, 86 insertions(+), 149 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 23dd1895f4..5fd7551c6d 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -43,7 +43,7 @@ use std::{ time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap}; -use util::ResultExt; +use util::{ResultExt, TryFutureExt}; lazy_static! { static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore"); @@ -137,7 +137,7 @@ enum Registration { struct ShareState { project_id: u64, snapshots_tx: Sender, - _maintain_remote_snapshot: Option>, + _maintain_remote_snapshot: Option>>, } #[derive(Default, Deserialize)] @@ -737,6 +737,7 @@ impl LocalWorktree { worktree_id: self.id().to_proto(), root_name: self.root_name().to_string(), authorized_logins: self.authorized_logins(), + weak: self.weak, }; cx.spawn(|this, mut cx| async move { let response = client.request(register_message).await; @@ -760,61 +761,66 @@ impl LocalWorktree { &mut self, project_id: u64, cx: &mut ModelContext, - ) -> Task> { + ) -> impl Future> { + let (mut share_tx, mut share_rx) = oneshot::channel(); if self.share.is_some() { - return Task::ready(Ok(())); + let _ = share_tx.try_send(Ok(())); + } else { + let snapshot = self.snapshot(); + let rpc = self.client.clone(); + let worktree_id = cx.model_id() as u64; + let (snapshots_to_send_tx, snapshots_to_send_rx) = + smol::channel::unbounded::(); + let (mut share_tx, mut share_rx) = oneshot::channel(); + let maintain_remote_snapshot = cx.background().spawn({ + let rpc = rpc.clone(); + let snapshot = snapshot.clone(); + let diagnostic_summaries = self.diagnostic_summaries.clone(); + let weak = self.weak; + async move { + if let Err(error) = rpc + .request(proto::UpdateWorktree { + project_id, + worktree_id, + root_name: snapshot.root_name().to_string(), + updated_entries: snapshot + .entries_by_path + .iter() + .filter(|e| !e.is_ignored) + .map(Into::into) + .collect(), + removed_entries: Default::default(), + }) + .await + { + let _ = share_tx.try_send(Err(error)); + return Err(anyhow!("failed to send initial update worktree")); + } else { + let _ = share_tx.try_send(Ok(())); + } + + let mut prev_snapshot = snapshot; + while let Ok(snapshot) = snapshots_to_send_rx.recv().await { + let message = + snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); + rpc.request(message).await?; + prev_snapshot = snapshot; + } + + Ok::<_, anyhow::Error>(()) + } + .log_err() + }); + self.share = Some(ShareState { + project_id, + snapshots_tx: snapshots_to_send_tx, + _maintain_remote_snapshot: Some(maintain_remote_snapshot), + }); } - let snapshot = self.snapshot(); - let rpc = self.client.clone(); - let worktree_id = cx.model_id() as u64; - let (snapshots_to_send_tx, snapshots_to_send_rx) = - smol::channel::unbounded::(); - let (mut share_tx, mut share_rx) = oneshot::channel(); - let maintain_remote_snapshot = cx.background().spawn({ - let rpc = rpc.clone(); - let snapshot = snapshot.clone(); - let diagnostic_summaries = self.diagnostic_summaries.clone(); - let weak = self.weak; - async move { - if let Err(error) = rpc - .request(proto::ShareWorktree { - project_id, - worktree: Some(snapshot.to_proto(&diagnostic_summaries, weak)), - }) - .await - { - let _ = share_tx.try_send(Err(error)); - return; - } else { - let _ = share_tx.try_send(Ok(())); - } - - let mut prev_snapshot = snapshot; - while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = - snapshot.build_update(&prev_snapshot, project_id, worktree_id, false); - match rpc.request(message).await { - Ok(_) => { - prev_snapshot = snapshot; - } - Err(err) => log::error!("error sending snapshot diff {}", err), - } - } - } - }); - self.share = Some(ShareState { - project_id, - snapshots_tx: snapshots_to_send_tx, - _maintain_remote_snapshot: Some(maintain_remote_snapshot), - }); - - cx.foreground().spawn(async move { - match share_rx.next().await { - Some(result) => result, - None => Err(anyhow!("unshared before sharing completed")), - } - }) + async move { + share_rx.next().await; + } } pub fn unshare(&mut self) { diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index cbc2fa6d51..ebdb39942d 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -34,7 +34,6 @@ message Envelope { RegisterWorktree register_worktree = 28; UnregisterWorktree unregister_worktree = 29; - ShareWorktree share_worktree = 30; UpdateWorktree update_worktree = 31; UpdateDiagnosticSummary update_diagnostic_summary = 32; DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 33; @@ -132,6 +131,7 @@ message RegisterWorktree { uint64 worktree_id = 2; string root_name = 3; repeated string authorized_logins = 4; + bool weak = 5; } message UnregisterWorktree { @@ -139,11 +139,6 @@ message UnregisterWorktree { uint64 worktree_id = 2; } -message ShareWorktree { - uint64 project_id = 1; - Worktree worktree = 2; -} - message UpdateWorktree { uint64 project_id = 1; uint64 worktree_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 018f4aed0b..4ac61377c4 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -188,7 +188,6 @@ messages!( (SendChannelMessage, Foreground), (SendChannelMessageResponse, Foreground), (ShareProject, Foreground), - (ShareWorktree, Foreground), (Test, Foreground), (UnregisterProject, Foreground), (UnregisterWorktree, Foreground), @@ -228,7 +227,6 @@ request_messages!( (SaveBuffer, BufferSaved), (SendChannelMessage, SendChannelMessageResponse), (ShareProject, Ack), - (ShareWorktree, Ack), (Test, Test), (UpdateBuffer, Ack), (UpdateWorktree, Ack), @@ -259,7 +257,6 @@ entity_messages!( PrepareRename, RemoveProjectCollaborator, SaveBuffer, - ShareWorktree, UnregisterWorktree, UnshareProject, UpdateBuffer, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 3e4645b55b..2dc56138ff 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -16,7 +16,7 @@ use rpc::{ Connection, ConnectionId, Peer, TypedEnvelope, }; use sha1::{Digest as _, Sha1}; -use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant}; +use std::{any::TypeId, future::Future, sync::Arc, time::Instant}; use store::{Store, Worktree}; use surf::StatusCode; use tide::log; @@ -73,7 +73,6 @@ impl Server { .add_message_handler(Server::leave_project) .add_request_handler(Server::register_worktree) .add_message_handler(Server::unregister_worktree) - .add_request_handler(Server::share_worktree) .add_request_handler(Server::update_worktree) .add_message_handler(Server::update_diagnostic_summary) .add_message_handler(Server::disk_based_diagnostics_updating) @@ -419,23 +418,34 @@ impl Server { let mut contact_user_ids = HashSet::default(); contact_user_ids.insert(host_user_id); - for github_login in request.payload.authorized_logins { - let contact_user_id = self.app_state.db.create_user(&github_login, false).await?; + for github_login in &request.payload.authorized_logins { + let contact_user_id = self.app_state.db.create_user(github_login, false).await?; contact_user_ids.insert(contact_user_id); } let contact_user_ids = contact_user_ids.into_iter().collect::>(); - self.state_mut().register_worktree( - request.payload.project_id, - request.payload.worktree_id, - request.sender_id, - Worktree { - authorized_user_ids: contact_user_ids.clone(), - root_name: request.payload.root_name, - share: None, - weak: false, - }, - )?; + let guest_connection_ids; + { + let mut state = self.state_mut(); + guest_connection_ids = state + .read_project(request.payload.project_id, request.sender_id)? + .guest_connection_ids(); + state.register_worktree( + request.payload.project_id, + request.payload.worktree_id, + request.sender_id, + Worktree { + authorized_user_ids: contact_user_ids.clone(), + root_name: request.payload.root_name.clone(), + share: None, + weak: request.payload.weak, + }, + )?; + } + broadcast(request.sender_id, guest_connection_ids, |connection_id| { + self.peer + .forward_send(request.sender_id, connection_id, request.payload.clone()) + })?; self.update_contacts_for_users(&contact_user_ids)?; Ok(proto::Ack {}) } @@ -462,47 +472,6 @@ impl Server { Ok(()) } - async fn share_worktree( - mut self: Arc, - mut request: TypedEnvelope, - ) -> tide::Result { - let worktree = request - .payload - .worktree - .as_mut() - .ok_or_else(|| anyhow!("missing worktree"))?; - let entries = worktree - .entries - .iter() - .map(|entry| (entry.id, entry.clone())) - .collect(); - let diagnostic_summaries = worktree - .diagnostic_summaries - .iter() - .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone())) - .collect(); - - let shared_worktree = self.state_mut().share_worktree( - request.payload.project_id, - worktree.id, - request.sender_id, - entries, - diagnostic_summaries, - )?; - - broadcast( - request.sender_id, - shared_worktree.connection_ids, - |connection_id| { - self.peer - .forward_send(request.sender_id, connection_id, request.payload.clone()) - }, - )?; - self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?; - - Ok(proto::Ack {}) - } - async fn update_worktree( mut self: Arc, request: TypedEnvelope, diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 5cb0a0e1db..ec23c06992 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -396,36 +396,6 @@ impl Store { } } - pub fn share_worktree( - &mut self, - project_id: u64, - worktree_id: u64, - connection_id: ConnectionId, - entries: HashMap, - diagnostic_summaries: BTreeMap, - ) -> tide::Result { - let project = self - .projects - .get_mut(&project_id) - .ok_or_else(|| anyhow!("no such project"))?; - let worktree = project - .worktrees - .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))?; - if project.host_connection_id == connection_id && project.share.is_some() { - worktree.share = Some(WorktreeShare { - entries, - diagnostic_summaries, - }); - Ok(SharedWorktree { - authorized_user_ids: project.authorized_user_ids(), - connection_ids: project.guest_connection_ids(), - }) - } else { - Err(anyhow!("no such worktree"))? - } - } - pub fn update_diagnostic_summary( &mut self, project_id: u64,