diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 14a3109517..9480297c0e 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -355,6 +355,7 @@ impl Room { if let Some(handle) = project.upgrade(cx) { let project = handle.read(cx); if let Some(project_id) = project.remote_id() { + projects.insert(project_id, handle.clone()); rejoined_projects.push(proto::RejoinProject { project_id, worktrees: project @@ -387,10 +388,18 @@ impl Room { this.status = RoomStatus::Online; this.apply_room_update(room_proto, cx)?; - for shared_project in response.reshared_projects { - if let Some(project) = projects.get(&shared_project.id) { + for reshared_project in response.reshared_projects { + if let Some(project) = projects.get(&reshared_project.id) { project.update(cx, |project, cx| { - project.reshared(shared_project, cx).log_err(); + project.reshared(reshared_project, cx).log_err(); + }); + } + } + + for rejoined_project in response.rejoined_projects { + if let Some(project) = projects.get(&rejoined_project.id) { + project.update(cx, |project, cx| { + project.rejoined(rejoined_project, cx).log_err(); }); } } diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 6679922855..14be4e2732 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1347,6 +1347,7 @@ impl Database { user_id: UserId, connection_id: ConnectionId, ) -> Result { + println!("=============="); todo!() } @@ -1573,11 +1574,8 @@ impl Database { .await } - pub async fn connection_lost( - &self, - connection: ConnectionId, - ) -> Result>> { - self.room_transaction(|tx| async move { + pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> { + self.transaction(|tx| async move { let participant = room_participant::Entity::find() .filter( Condition::all() @@ -1593,7 +1591,6 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("not a participant in any room"))?; - let room_id = participant.room_id; room_participant::Entity::update(room_participant::ActiveModel { answering_connection_lost: ActiveValue::set(true), @@ -1602,63 +1599,7 @@ impl Database { .exec(&*tx) .await?; - let guest_collaborators_and_projects = project_collaborator::Entity::find() - .find_also_related(project::Entity) - .filter( - Condition::all() - .add(project_collaborator::Column::IsHost.eq(false)) - .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) - .add( - project_collaborator::Column::ConnectionServerId - .eq(connection.owner_id as i32), - ), - ) - .all(&*tx) - .await?; - - project_collaborator::Entity::delete_many() - .filter( - project_collaborator::Column::Id - .is_in(guest_collaborators_and_projects.iter().map(|e| e.0.id)), - ) - .exec(&*tx) - .await?; - - let mut left_projects = Vec::new(); - for (_, project) in guest_collaborators_and_projects { - let Some(project) = project else { continue }; - let collaborators = project - .find_related(project_collaborator::Entity) - .all(&*tx) - .await?; - let connection_ids = collaborators - .into_iter() - .map(|collaborator| ConnectionId { - id: collaborator.connection_id as u32, - owner_id: collaborator.connection_server_id.0 as u32, - }) - .collect(); - - left_projects.push(LeftProject { - id: project.id, - host_user_id: project.host_user_id, - host_connection_id: project.host_connection()?, - connection_ids, - }); - } - - project::Entity::delete_many() - .filter( - Condition::all() - .add(project::Column::HostConnectionId.eq(connection.id as i32)) - .add( - project::Column::HostConnectionServerId.eq(connection.owner_id as i32), - ), - ) - .exec(&*tx) - .await?; - - Ok((room_id, left_projects)) + Ok(()) }) .await } diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index eeb9015876..2b41143d37 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1351,19 +1351,27 @@ async fn test_host_reconnect( .unwrap(); let project_b = client_b.build_remote_project(project_id, cx_b).await; - assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared())); + deterministic.run_until_parked(); + + let worktree_id = worktree_a.read_with(cx_a, |worktree, _| { + assert!(worktree.as_local().unwrap().is_shared()); + worktree.id() + }); // Drop client A's connection. server.forbid_connections(); server.disconnect_client(client_a.peer_id().unwrap()); deterministic.advance_clock(RECEIVE_TIMEOUT); project_a.read_with(cx_a, |project, _| { - assert!(project.collaborators().is_empty()) + assert!(project.is_shared()); + assert_eq!(project.collaborators().len(), 1); + }); + project_b.read_with(cx_b, |project, _| { + assert!(!project.is_read_only()); + assert_eq!(project.collaborators().len(), 1); }); - project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); - project_b.read_with(cx_b, |project, _| assert!(project.is_read_only())); worktree_a.read_with(cx_a, |tree, _| { - assert!(!tree.as_local().unwrap().is_shared()) + assert!(tree.as_local().unwrap().is_shared()) }); // While disconnected, add and remove files from the client A's project. @@ -1393,9 +1401,60 @@ async fn test_host_reconnect( // Client A reconnects. Their project is re-shared, and client B re-joins it. server.allow_connections(); - deterministic.advance_clock(RECEIVE_TIMEOUT); - project_a.read_with(cx_a, |project, _| assert!(project.is_shared())); - project_b.read_with(cx_b, |project, _| assert!(!project.is_read_only())); + client_a + .authenticate_and_connect(false, &cx_a.to_async()) + .await + .unwrap(); + deterministic.run_until_parked(); + project_a.read_with(cx_a, |project, cx| { + assert!(project.is_shared()); + assert_eq!( + worktree_a + .read(cx) + .snapshot() + .paths() + .map(|p| p.to_str().unwrap()) + .collect::>(), + vec![ + "a.txt", + "b.txt", + "subdir1", + "subdir1/c.txt", + "subdir1/d.txt", + "subdir1/e.txt", + "subdir2", + "subdir2/f.txt", + "subdir2/g.txt", + "subdir2/h.txt", + "subdir2/i.txt" + ] + ); + }); + project_b.read_with(cx_b, |project, cx| { + assert!(!project.is_read_only()); + let worktree_b = project.worktree_for_id(worktree_id, cx).unwrap(); + assert_eq!( + worktree_b + .read(cx) + .snapshot() + .paths() + .map(|p| p.to_str().unwrap()) + .collect::>(), + vec![ + "a.txt", + "b.txt", + "subdir1", + "subdir1/c.txt", + "subdir1/d.txt", + "subdir1/e.txt", + "subdir2", + "subdir2/f.txt", + "subdir2/g.txt", + "subdir2/h.txt", + "subdir2/i.txt" + ] + ); + }); } #[gpui::test(iterations = 10)] @@ -6169,7 +6228,6 @@ async fn test_random_collaboration( let mut user_ids = Vec::new(); let mut op_start_signals = Vec::new(); let mut next_entity_id = 100000; - let mut can_disconnect = rng.lock().gen_bool(0.2); let mut operations = 0; while operations < max_operations { diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index beeb666da6..d75605d49a 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -799,17 +799,12 @@ async fn sign_out( .await .remove_connection(session.connection_id)?; - if let Some(mut left_projects) = session + session .db() .await .connection_lost(session.connection_id) .await - .trace_err() - { - for left_project in mem::take(&mut *left_projects) { - project_left(&left_project, &session); - } - } + .trace_err(); futures::select_biased! { _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index e71971c3d7..c260fd9449 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -156,7 +156,6 @@ enum ProjectClientState { sharing_has_stopped: bool, remote_id: u64, replica_id: ReplicaId, - _detect_unshare: Task>, }, } @@ -495,21 +494,6 @@ impl Project { sharing_has_stopped: false, remote_id, replica_id, - _detect_unshare: cx.spawn_weak(move |this, mut cx| { - async move { - let mut status = client.status(); - let is_connected = - status.next().await.map_or(false, |s| s.is_connected()); - // Even if we're initially connected, any future change of the status means we momentarily disconnected. - if !is_connected || status.next().await.is_some() { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| this.disconnected_from_host(cx)) - } - } - Ok(()) - } - .log_err() - }), }), language_servers: Default::default(), language_server_ids: Default::default(), @@ -1100,6 +1084,15 @@ impl Project { Ok(()) } + pub fn rejoined( + &mut self, + message: proto::RejoinedProject, + cx: &mut ModelContext, + ) -> Result<()> { + self.set_collaborators_from_proto(message.collaborators, cx)?; + Ok(()) + } + pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec { self.worktrees(cx) .map(|worktree| {