diff --git a/crates/collab/src/db/queries/servers.rs b/crates/collab/src/db/queries/servers.rs index f4e01beba1..73deaaffb6 100644 --- a/crates/collab/src/db/queries/servers.rs +++ b/crates/collab/src/db/queries/servers.rs @@ -66,6 +66,87 @@ impl Database { .await } + /// Delete all channel chat participants from previous servers + pub async fn delete_stale_channel_chat_participants( + &self, + environment: &str, + new_server_id: ServerId, + ) -> Result<()> { + self.transaction(|tx| async move { + let stale_server_epochs = self + .stale_server_ids(environment, new_server_id, &tx) + .await?; + + channel_chat_participant::Entity::delete_many() + .filter( + channel_chat_participant::Column::ConnectionServerId + .is_in(stale_server_epochs.iter().copied()), + ) + .exec(&*tx) + .await?; + + Ok(()) + }) + .await + } + + pub async fn clear_old_worktree_entries(&self, server_id: ServerId) -> Result<()> { + self.transaction(|tx| async move { + use sea_orm::Statement; + use sea_orm::sea_query::{Expr, Query}; + + loop { + let delete_query = Query::delete() + .from_table(worktree_entry::Entity) + .and_where( + Expr::tuple([ + Expr::col((worktree_entry::Entity, worktree_entry::Column::ProjectId)) + .into(), + Expr::col((worktree_entry::Entity, worktree_entry::Column::WorktreeId)) + .into(), + Expr::col((worktree_entry::Entity, worktree_entry::Column::Id)).into(), + ]) + .in_subquery( + Query::select() + .columns([ + (worktree_entry::Entity, worktree_entry::Column::ProjectId), + (worktree_entry::Entity, worktree_entry::Column::WorktreeId), + (worktree_entry::Entity, worktree_entry::Column::Id), + ]) + .from(worktree_entry::Entity) + .inner_join( + project::Entity, + Expr::col((project::Entity, project::Column::Id)).equals(( + worktree_entry::Entity, + worktree_entry::Column::ProjectId, + )), + ) + .and_where(project::Column::HostConnectionServerId.ne(server_id)) + .limit(10000) + .to_owned(), + ), + ) + .to_owned(); + + let statement = Statement::from_sql_and_values( + tx.get_database_backend(), + delete_query + .to_string(sea_orm::sea_query::PostgresQueryBuilder) + .as_str(), + vec![], + ); + + let result = tx.execute(statement).await?; + if result.rows_affected() == 0 { + break; + } + } + + Ok(()) + }) + .await + } + /// Deletes any stale servers in the environment that don't match the `new_server_id`. pub async fn delete_stale_servers( &self, @@ -86,7 +167,7 @@ impl Database { .await } - async fn stale_server_ids( + pub async fn stale_server_ids( &self, environment: &str, new_server_id: ServerId, diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 99feffa140..4364d9f677 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -433,6 +433,16 @@ impl Server { tracing::info!("waiting for cleanup timeout"); timeout.await; tracing::info!("cleanup timeout expired, retrieving stale rooms"); + + app_state + .db + .delete_stale_channel_chat_participants( + &app_state.config.zed_environment, + server_id, + ) + .await + .trace_err(); + if let Some((room_ids, channel_ids)) = app_state .db .stale_server_resource_ids(&app_state.config.zed_environment, server_id) @@ -554,6 +564,21 @@ impl Server { } } + app_state + .db + .delete_stale_channel_chat_participants( + &app_state.config.zed_environment, + server_id, + ) + .await + .trace_err(); + + app_state + .db + .clear_old_worktree_entries(server_id) + .await + .trace_err(); + app_state .db .delete_stale_servers(&app_state.config.zed_environment, server_id)