diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 4a9983e600..823990eaf8 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -503,6 +503,11 @@ pub struct RefreshedRoom { pub canceled_calls_to_user_ids: Vec, } +pub struct RefreshedChannelBuffer { + pub connection_ids: Vec, + pub removed_collaborators: Vec, +} + pub struct Project { pub collaborators: Vec, pub worktrees: BTreeMap, diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index 79e20a2622..813255b80e 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -123,10 +123,11 @@ impl Database { // Find the collaborator record for this user's previous lost // connection. Update it with the new connection id. - let Some(self_collaborator) = collaborators - .iter_mut() - .find(|c| c.user_id == user_id && c.connection_lost) - else { + let server_id = ServerId(connection_id.owner_id as i32); + let Some(self_collaborator) = collaborators.iter_mut().find(|c| { + c.user_id == user_id + && (c.connection_lost || c.connection_server_id != server_id) + }) else { continue; }; let old_connection_id = self_collaborator.connection(); @@ -195,6 +196,25 @@ impl Database { .await } + pub async fn refresh_channel_buffer( + &self, + channel_id: ChannelId, + server_id: ServerId, + ) -> Result { + self.transaction(|tx| async move { + let mut connection_ids = Vec::new(); + let mut removed_collaborators = Vec::new(); + + // TODO + + Ok(RefreshedChannelBuffer { + connection_ids, + removed_collaborators, + }) + }) + .await + } + pub async fn leave_channel_buffer( &self, channel_id: ChannelId, diff --git a/crates/collab/src/db/queries/servers.rs b/crates/collab/src/db/queries/servers.rs index 08a2bda16a..2b1d0d2c0c 100644 --- a/crates/collab/src/db/queries/servers.rs +++ b/crates/collab/src/db/queries/servers.rs @@ -14,31 +14,48 @@ impl Database { .await } - pub async fn stale_room_ids( + pub async fn stale_server_resource_ids( &self, environment: &str, new_server_id: ServerId, - ) -> Result> { + ) -> Result<(Vec, Vec)> { self.transaction(|tx| async move { #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { + enum QueryRoomIds { RoomId, } + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryChannelIds { + ChannelId, + } + let stale_server_epochs = self .stale_server_ids(environment, new_server_id, &tx) .await?; - Ok(room_participant::Entity::find() + let room_ids = room_participant::Entity::find() .select_only() .column(room_participant::Column::RoomId) .distinct() .filter( room_participant::Column::AnsweringConnectionServerId - .is_in(stale_server_epochs), + .is_in(stale_server_epochs.iter().copied()), ) - .into_values::<_, QueryAs>() + .into_values::<_, QueryRoomIds>() .all(&*tx) - .await?) + .await?; + let channel_ids = channel_buffer_collaborator::Entity::find() + .select_only() + .column(channel_buffer_collaborator::Column::ChannelId) + .distinct() + .filter( + channel_buffer_collaborator::Column::ConnectionServerId + .is_in(stale_server_epochs.iter().copied()), + ) + .into_values::<_, QueryChannelIds>() + .all(&*tx) + .await?; + Ok((room_ids, channel_ids)) }) .await } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d221d1c99e..95307ba725 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -278,13 +278,29 @@ impl Server { tracing::info!("waiting for cleanup timeout"); timeout.await; tracing::info!("cleanup timeout expired, retrieving stale rooms"); - if let Some(room_ids) = app_state + if let Some((room_ids, channel_ids)) = app_state .db - .stale_room_ids(&app_state.config.zed_environment, server_id) + .stale_server_resource_ids(&app_state.config.zed_environment, server_id) .await .trace_err() { tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms"); + + for channel_id in channel_ids { + if let Some(refreshed_channel_buffer) = app_state + .db + .refresh_channel_buffer(channel_id, server_id) + .await + .trace_err() + { + for connection_id in refreshed_channel_buffer.connection_ids { + for message in &refreshed_channel_buffer.removed_collaborators { + peer.send(connection_id, message.clone()).trace_err(); + } + } + } + } + for room_id in room_ids { let mut contacts_to_update = HashSet::default(); let mut canceled_calls_to_user_ids = Vec::new(); diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index e48753ed41..309fcf7e44 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -307,10 +307,10 @@ async fn apply_server_operation( server.start().await.unwrap(); deterministic.advance_clock(CLEANUP_TIMEOUT); let environment = &server.app_state.config.zed_environment; - let stale_room_ids = server + let (stale_room_ids, _) = server .app_state .db - .stale_room_ids(environment, server.id()) + .stale_server_resource_ids(environment, server.id()) .await .unwrap(); assert_eq!(stale_room_ids, vec![]);