From 17c9aa181936b55f2953e5c44b9a375a027f6838 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Wed, 23 Feb 2022 11:56:09 -0700 Subject: [PATCH] Remove ShareWorktree message Instead, create an empty worktree on guests when a worktree is first *registered*, then update it via an initial UpdateWorktree message. This prevents the host from referencing a worktree in definition RPC responses that hasn't yet been observed by the guest. We could have waited until the entire worktree was shared, but this could take a long time, so instead we create an empty one on guests and proceed from there. We still have randomized test failures as of this commit: SEED=9519 MAX_PEERS=2 ITERATIONS=10000 OPERATIONS=7 ct -p zed-server test_random_collaboration Co-Authored-By: Max Brunsfeld Co-Authored-By: Antonio Scandurra --- crates/project/src/project.rs | 19 +++++++----- crates/project/src/worktree.rs | 18 ++++++++--- crates/rpc/src/proto.rs | 1 + crates/server/src/rpc.rs | 25 +++++++--------- crates/server/src/rpc/store.rs | 55 ++++++++++++++++------------------ 5 files changed, 63 insertions(+), 55 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 0d655ed87a..1de6b7b131 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -169,7 +169,7 @@ impl DiagnosticSummary { this } - pub fn to_proto(&self, path: Arc) -> proto::DiagnosticSummary { + pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary { proto::DiagnosticSummary { path: path.to_string_lossy().to_string(), error_count: self.error_count as u32, @@ -195,7 +195,7 @@ impl Project { client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updated); client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updating); client.add_entity_message_handler(Self::handle_remove_collaborator); - client.add_entity_message_handler(Self::handle_share_worktree); + client.add_entity_message_handler(Self::handle_register_worktree); client.add_entity_message_handler(Self::handle_unregister_worktree); client.add_entity_message_handler(Self::handle_unshare_project); client.add_entity_message_handler(Self::handle_update_buffer_file); @@ -2347,19 +2347,22 @@ impl Project { }) } - async fn handle_share_worktree( + async fn handle_register_worktree( this: ModelHandle, - envelope: TypedEnvelope, + envelope: TypedEnvelope, client: Arc, mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?; let replica_id = this.replica_id(); - let worktree = envelope - .payload - .worktree - .ok_or_else(|| anyhow!("invalid worktree"))?; + let worktree = proto::Worktree { + id: envelope.payload.worktree_id, + root_name: envelope.payload.root_name, + entries: Default::default(), + diagnostic_summaries: Default::default(), + weak: envelope.payload.weak, + }; let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); this.add_worktree(&worktree, cx); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 5fd7551c6d..367db436c7 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -771,12 +771,10 @@ impl LocalWorktree { let worktree_id = cx.model_id() as u64; let (snapshots_to_send_tx, snapshots_to_send_rx) = smol::channel::unbounded::(); - let (mut share_tx, mut share_rx) = oneshot::channel(); let maintain_remote_snapshot = cx.background().spawn({ let rpc = rpc.clone(); let snapshot = snapshot.clone(); let diagnostic_summaries = self.diagnostic_summaries.clone(); - let weak = self.weak; async move { if let Err(error) = rpc .request(proto::UpdateWorktree { @@ -799,6 +797,14 @@ impl LocalWorktree { let _ = share_tx.try_send(Ok(())); } + for (path, summary) in diagnostic_summaries.iter() { + rpc.send(proto::UpdateDiagnosticSummary { + project_id, + worktree_id, + summary: Some(summary.to_proto(&path.0)), + })?; + } + let mut prev_snapshot = snapshot; while let Ok(snapshot) = snapshots_to_send_rx.recv().await { let message = @@ -819,7 +825,10 @@ impl LocalWorktree { } async move { - share_rx.next().await; + share_rx + .next() + .await + .unwrap_or_else(|| Err(anyhow!("share ended"))) } } @@ -1014,6 +1023,7 @@ impl Snapshot { } impl LocalSnapshot { + #[cfg(test)] pub(crate) fn to_proto( &self, diagnostic_summaries: &TreeMap, @@ -1031,7 +1041,7 @@ impl LocalSnapshot { .collect(), diagnostic_summaries: diagnostic_summaries .iter() - .map(|(path, summary)| summary.to_proto(path.0.clone())) + .map(|(path, summary)| summary.to_proto(&path.0)) .collect(), weak, } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 4ac61377c4..333bbd400e 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -262,6 +262,7 @@ entity_messages!( UpdateBuffer, UpdateBufferFile, UpdateDiagnosticSummary, + RegisterWorktree, UpdateWorktree, ); diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 2dc56138ff..791796d494 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -334,16 +334,16 @@ impl Server { replica_id: 0, user_id: joined.project.host_user_id.to_proto(), }); - let worktrees = joined - .project + let worktrees = share .worktrees .iter() - .filter_map(|(id, worktree)| { - worktree.share.as_ref().map(|share| proto::Worktree { + .filter_map(|(id, shared_worktree)| { + let worktree = joined.project.worktrees.get(&id)?; + Some(proto::Worktree { id: *id, root_name: worktree.root_name.clone(), - entries: share.entries.values().cloned().collect(), - diagnostic_summaries: share + entries: shared_worktree.entries.values().cloned().collect(), + diagnostic_summaries: shared_worktree .diagnostic_summaries .values() .cloned() @@ -437,7 +437,6 @@ impl Server { Worktree { authorized_user_ids: contact_user_ids.clone(), root_name: request.payload.root_name.clone(), - share: None, weak: request.payload.weak, }, )?; @@ -1164,7 +1163,7 @@ mod tests { cell::Cell, env, ops::Deref, - path::Path, + path::{Path, PathBuf}, rc::Rc, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, @@ -2115,16 +2114,14 @@ mod tests { let worktree = store .project(project_id) .unwrap() + .share + .as_ref() + .unwrap() .worktrees .get(&worktree_id.to_proto()) .unwrap(); - !worktree - .share - .as_ref() - .unwrap() - .diagnostic_summaries - .is_empty() + !worktree.diagnostic_summaries.is_empty() }) .await; diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index ec23c06992..e6c4429b69 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -30,7 +30,6 @@ pub struct Project { pub struct Worktree { pub authorized_user_ids: Vec, pub root_name: String, - pub share: Option, pub weak: bool, } @@ -38,8 +37,10 @@ pub struct Worktree { pub struct ProjectShare { pub guests: HashMap, pub active_replica_ids: HashSet, + pub worktrees: HashMap, } +#[derive(Default)] pub struct WorktreeShare { pub entries: HashMap, pub diagnostic_summaries: BTreeMap, @@ -74,11 +75,6 @@ pub struct LeftProject { pub authorized_user_ids: Vec, } -pub struct SharedWorktree { - pub authorized_user_ids: Vec, - pub connection_ids: Vec, -} - impl Store { pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) { self.connections.insert( @@ -272,6 +268,9 @@ impl Store { connection.projects.insert(project_id); } project.worktrees.insert(worktree_id, worktree); + if let Ok(share) = project.share_mut() { + share.worktrees.insert(worktree_id, Default::default()); + } #[cfg(test)] self.check_invariants(); @@ -326,8 +325,9 @@ impl Store { .ok_or_else(|| anyhow!("no such worktree"))?; let mut guest_connection_ids = Vec::new(); - if let Some(share) = &project.share { + if let Ok(share) = project.share_mut() { guest_connection_ids.extend(share.guests.keys()); + share.worktrees.remove(&worktree_id); } for authorized_user_id in &worktree.authorized_user_ids { @@ -349,7 +349,11 @@ impl Store { pub fn share_project(&mut self, project_id: u64, connection_id: ConnectionId) -> bool { if let Some(project) = self.projects.get_mut(&project_id) { if project.host_connection_id == connection_id { - project.share = Some(ProjectShare::default()); + let mut share = ProjectShare::default(); + for worktree_id in project.worktrees.keys() { + share.worktrees.insert(*worktree_id, Default::default()); + } + project.share = Some(share); return true; } } @@ -380,10 +384,6 @@ impl Store { } } - for worktree in project.worktrees.values_mut() { - worktree.share.take(); - } - #[cfg(test)] self.check_invariants(); @@ -407,17 +407,16 @@ impl Store { .projects .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; - let worktree = project - .worktrees - .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))?; if project.host_connection_id == connection_id { - if let Some(share) = worktree.share.as_mut() { - share - .diagnostic_summaries - .insert(summary.path.clone().into(), summary); - return Ok(project.connection_ids()); - } + let worktree = project + .share_mut()? + .worktrees + .get_mut(&worktree_id) + .ok_or_else(|| anyhow!("no such worktree"))?; + worktree + .diagnostic_summaries + .insert(summary.path.clone().into(), summary); + return Ok(project.connection_ids()); } Err(anyhow!("no such worktree"))? @@ -508,18 +507,16 @@ impl Store { updated_entries: &[proto::Entry], ) -> tide::Result> { let project = self.write_project(project_id, connection_id)?; - let share = project + let worktree = project + .share_mut()? .worktrees .get_mut(&worktree_id) - .ok_or_else(|| anyhow!("no such worktree"))? - .share - .as_mut() - .ok_or_else(|| anyhow!("worktree is not shared"))?; + .ok_or_else(|| anyhow!("no such worktree"))?; for entry_id in removed_entries { - share.entries.remove(&entry_id); + worktree.entries.remove(&entry_id); } for entry in updated_entries { - share.entries.insert(entry.id, entry.clone()); + worktree.entries.insert(entry.id, entry.clone()); } Ok(project.connection_ids()) }