diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 6460a0900f..7527a69326 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -66,7 +66,7 @@ impl Entity for Room { fn release(&mut self, _: &mut MutableAppContext) { if self.status.is_online() { log::info!("room was released, sending leave message"); - self.client.send(proto::LeaveRoom {}).log_err(); + let _ = self.client.send(proto::LeaveRoom {}); } } } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 62db247b9f..92d4935b23 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -672,15 +672,17 @@ impl<'a> Drop for ConnectionPoolGuard<'a> { } fn broadcast( - sender_id: ConnectionId, + sender_id: Option, receiver_ids: impl IntoIterator, mut f: F, ) where F: FnMut(ConnectionId) -> anyhow::Result<()>, { for receiver_id in receiver_ids { - if receiver_id != sender_id { - f(receiver_id).trace_err(); + if Some(receiver_id) != sender_id { + if let Err(error) = f(receiver_id) { + tracing::error!("failed to send to {:?} {}", receiver_id, error); + } } } } @@ -998,7 +1000,7 @@ async fn rejoin_room( } broadcast( - session.connection_id, + Some(session.connection_id), project .collaborators .iter() @@ -1279,7 +1281,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re .await?; broadcast( - session.connection_id, + Some(session.connection_id), guest_connection_ids.iter().copied(), |conn_id| session.peer.send(conn_id, message.clone()), ); @@ -1430,7 +1432,7 @@ async fn update_project( .update_project(project_id, session.connection_id, &request.worktrees) .await?; broadcast( - session.connection_id, + Some(session.connection_id), guest_connection_ids.iter().copied(), |connection_id| { session @@ -1456,7 +1458,7 @@ async fn update_worktree( .await?; broadcast( - session.connection_id, + Some(session.connection_id), guest_connection_ids.iter().copied(), |connection_id| { session @@ -1479,7 +1481,7 @@ async fn update_diagnostic_summary( .await?; broadcast( - session.connection_id, + Some(session.connection_id), guest_connection_ids.iter().copied(), |connection_id| { session @@ -1502,7 +1504,7 @@ async fn start_language_server( .await?; broadcast( - session.connection_id, + Some(session.connection_id), guest_connection_ids.iter().copied(), |connection_id| { session @@ -1525,7 +1527,7 @@ async fn update_language_server( .project_connection_ids(project_id, session.connection_id) .await?; broadcast( - session.connection_id, + Some(session.connection_id), project_connection_ids.iter().copied(), |connection_id| { session @@ -1600,11 +1602,15 @@ async fn save_buffer( let project_connection_ids = collaborators .iter() .map(|collaborator| collaborator.connection_id); - broadcast(host_connection_id, project_connection_ids, |conn_id| { - session - .peer - .forward_send(host_connection_id, conn_id, response_payload.clone()) - }); + broadcast( + Some(host_connection_id), + project_connection_ids, + |conn_id| { + session + .peer + .forward_send(host_connection_id, conn_id, response_payload.clone()) + }, + ); response.send(response_payload)?; Ok(()) } @@ -1637,7 +1643,7 @@ async fn update_buffer( session.executor.record_backtrace(); broadcast( - session.connection_id, + Some(session.connection_id), project_connection_ids.iter().copied(), |connection_id| { session @@ -1658,7 +1664,7 @@ async fn update_buffer_file(request: proto::UpdateBufferFile, session: Session) .await?; broadcast( - session.connection_id, + Some(session.connection_id), project_connection_ids.iter().copied(), |connection_id| { session @@ -1677,7 +1683,7 @@ async fn buffer_reloaded(request: proto::BufferReloaded, session: Session) -> Re .project_connection_ids(project_id, session.connection_id) .await?; broadcast( - session.connection_id, + Some(session.connection_id), project_connection_ids.iter().copied(), |connection_id| { session @@ -1696,7 +1702,7 @@ async fn buffer_saved(request: proto::BufferSaved, session: Session) -> Result<( .project_connection_ids(project_id, session.connection_id) .await?; broadcast( - session.connection_id, + Some(session.connection_id), project_connection_ids.iter().copied(), |connection_id| { session @@ -1988,7 +1994,7 @@ async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> R .project_connection_ids(project_id, session.connection_id) .await?; broadcast( - session.connection_id, + Some(session.connection_id), project_connection_ids.iter().copied(), |connection_id| { session @@ -2098,21 +2104,20 @@ fn contact_for_user( } fn room_updated(room: &proto::Room, peer: &Peer) { - for participant in &room.participants { - if let Some(peer_id) = participant - .peer_id - .ok_or_else(|| anyhow!("invalid participant peer id")) - .trace_err() - { + broadcast( + None, + room.participants + .iter() + .filter_map(|participant| Some(participant.peer_id?.into())), + |peer_id| { peer.send( peer_id.into(), proto::RoomUpdated { room: Some(room.clone()), }, ) - .trace_err(); - } - } + }, + ); } async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> { diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index 530ca05af8..a42d4f7d32 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -290,10 +290,11 @@ async fn test_random_collaboration( assert_eq!( guest_snapshot.entries(false).collect::>(), host_snapshot.entries(false).collect::>(), - "{} has different snapshot than the host for worktree {} ({:?})", + "{} has different snapshot than the host for worktree {} ({:?}) and project {:?}", client.username, id, - host_snapshot.abs_path() + host_snapshot.abs_path(), + host_project.read_with(host_cx, |project, _| project.remote_id()) ); assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id()); } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 0dff99e77e..271d0f242b 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1037,7 +1037,7 @@ impl Project { if update_project.is_ok() { for worktree in worktrees { worktree.update(&mut cx, |worktree, cx| { - let worktree = &mut worktree.as_local_mut().unwrap(); + let worktree = worktree.as_local_mut().unwrap(); worktree.share(project_id, cx).detach_and_log_err(cx) }); } @@ -1062,17 +1062,7 @@ impl Project { cx: &mut ModelContext, ) -> Result<()> { self.set_collaborators_from_proto(message.collaborators, cx)?; - for worktree in self.worktrees.iter() { - if let Some(worktree) = worktree.upgrade(&cx) { - worktree.update(cx, |worktree, _| { - if let Some(worktree) = worktree.as_local_mut() { - worktree.reshare() - } else { - Ok(()) - } - })?; - } - } + let _ = self.metadata_changed(cx); Ok(()) } @@ -6259,18 +6249,14 @@ impl Entity for Project { fn release(&mut self, _: &mut gpui::MutableAppContext) { match &self.client_state { Some(ProjectClientState::Local { remote_id, .. }) => { - self.client - .send(proto::UnshareProject { - project_id: *remote_id, - }) - .log_err(); + let _ = self.client.send(proto::UnshareProject { + project_id: *remote_id, + }); } Some(ProjectClientState::Remote { remote_id, .. }) => { - self.client - .send(proto::LeaveProject { - project_id: *remote_id, - }) - .log_err(); + let _ = self.client.send(proto::LeaveProject { + project_id: *remote_id, + }); } _ => {} } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 3a8c69c704..f22e915785 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -166,7 +166,7 @@ enum ScanState { struct ShareState { project_id: u64, snapshots_tx: watch::Sender, - reshared: watch::Sender<()>, + resume_updates: watch::Sender<()>, _maintain_remote_snapshot: Task>, } @@ -975,12 +975,12 @@ impl LocalWorktree { pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { let (share_tx, share_rx) = oneshot::channel(); - if self.share.is_some() { + if let Some(share) = self.share.as_mut() { let _ = share_tx.send(()); + *share.resume_updates.borrow_mut() = (); } else { let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot()); - let (reshared_tx, mut reshared_rx) = watch::channel(); - let _ = reshared_rx.try_recv(); + let (resume_updates_tx, mut resume_updates_rx) = watch::channel(); let worktree_id = cx.model_id() as u64; for (path, summary) in self.diagnostic_summaries.iter() { @@ -1022,10 +1022,11 @@ impl LocalWorktree { let update = snapshot.build_update(&prev_snapshot, project_id, worktree_id, true); for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) { + let _ = resume_updates_rx.try_recv(); while let Err(error) = client.request(update.clone()).await { log::error!("failed to send worktree update: {}", error); - log::info!("waiting for worktree to be reshared"); - if reshared_rx.next().await.is_none() { + log::info!("waiting to resume updates"); + if resume_updates_rx.next().await.is_none() { return Ok(()); } } @@ -1046,7 +1047,7 @@ impl LocalWorktree { self.share = Some(ShareState { project_id, snapshots_tx, - reshared: reshared_tx, + resume_updates: resume_updates_tx, _maintain_remote_snapshot, }); } @@ -1059,15 +1060,6 @@ impl LocalWorktree { self.share.take(); } - pub fn reshare(&mut self) -> Result<()> { - let share = self - .share - .as_mut() - .ok_or_else(|| anyhow!("can't reshare a worktree that wasn't shared"))?; - *share.reshared.borrow_mut() = (); - Ok(()) - } - pub fn is_shared(&self) -> bool { self.share.is_some() }