diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index e62f834fbf..347db6a71a 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -43,8 +43,10 @@ CREATE TABLE "projects" ( "id" INTEGER PRIMARY KEY, "room_id" INTEGER REFERENCES rooms (id) NOT NULL, "host_user_id" INTEGER REFERENCES users (id) NOT NULL, - "host_connection_id" INTEGER NOT NULL + "host_connection_id" INTEGER NOT NULL, + "host_connection_epoch" TEXT NOT NULL ); +CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch"); CREATE TABLE "worktrees" ( "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, @@ -100,22 +102,28 @@ CREATE TABLE "project_collaborators" ( "id" INTEGER PRIMARY KEY, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, + "connection_epoch" TEXT NOT NULL, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, "is_host" BOOLEAN NOT NULL ); CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); +CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch"); CREATE TABLE "room_participants" ( "id" INTEGER PRIMARY KEY, "room_id" INTEGER NOT NULL REFERENCES rooms (id), "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, + "answering_connection_epoch" TEXT, "location_kind" INTEGER, "location_project_id" INTEGER REFERENCES projects (id), "initial_project_id" INTEGER REFERENCES projects (id), "calling_user_id" INTEGER NOT NULL REFERENCES users (id), - "calling_connection_id" INTEGER NOT NULL + "calling_connection_id" INTEGER NOT NULL, + "calling_connection_epoch" TEXT NOT NULL ); CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); +CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch"); +CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch"); diff --git a/crates/collab/migrations/20221111092550_reconnection_support.sql b/crates/collab/migrations/20221111092550_reconnection_support.sql index d23dbfa046..6278fa7a59 100644 --- a/crates/collab/migrations/20221111092550_reconnection_support.sql +++ b/crates/collab/migrations/20221111092550_reconnection_support.sql @@ -6,7 +6,9 @@ CREATE TABLE IF NOT EXISTS "rooms" ( ALTER TABLE "projects" ADD "room_id" INTEGER REFERENCES rooms (id), ADD "host_connection_id" INTEGER, + ADD "host_connection_epoch" UUID, DROP COLUMN "unregistered"; +CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch"); CREATE TABLE "worktrees" ( "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, @@ -62,22 +64,28 @@ CREATE TABLE "project_collaborators" ( "id" SERIAL PRIMARY KEY, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, + "connection_epoch" UUID NOT NULL, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, "is_host" BOOLEAN NOT NULL ); CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); +CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch"); CREATE TABLE "room_participants" ( "id" SERIAL PRIMARY KEY, "room_id" INTEGER NOT NULL REFERENCES rooms (id), "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, + "answering_connection_epoch" UUID, "location_kind" INTEGER, "location_project_id" INTEGER REFERENCES projects (id), "initial_project_id" INTEGER REFERENCES projects (id), "calling_user_id" INTEGER NOT NULL REFERENCES users (id), - "calling_connection_id" INTEGER NOT NULL + "calling_connection_id" INTEGER NOT NULL, + "calling_connection_epoch" UUID NOT NULL ); CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); +CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch"); +CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch"); diff --git a/crates/collab/src/bin/seed.rs b/crates/collab/src/bin/seed.rs index 2f7c61147c..9860b8be84 100644 --- a/crates/collab/src/bin/seed.rs +++ b/crates/collab/src/bin/seed.rs @@ -1,4 +1,4 @@ -use collab::{db, Error, Result}; +use collab::db; use db::{ConnectOptions, Database, UserId}; use serde::{de::DeserializeOwned, Deserialize}; use std::fmt::Write; diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 7395a7cc76..05d6274108 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -47,6 +47,7 @@ pub struct Database { background: Option>, #[cfg(test)] runtime: Option, + epoch: Uuid, } impl Database { @@ -59,6 +60,7 @@ impl Database { background: None, #[cfg(test)] runtime: None, + epoch: Uuid::new_v4(), }) } @@ -103,6 +105,30 @@ impl Database { Ok(new_migrations) } + pub async fn clear_stale_data(&self) -> Result<()> { + self.transact(|tx| async { + project_collaborator::Entity::delete_many() + .filter(project_collaborator::Column::ConnectionEpoch.ne(self.epoch)) + .exec(&tx) + .await?; + room_participant::Entity::delete_many() + .filter( + room_participant::Column::AnsweringConnectionEpoch + .ne(self.epoch) + .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch)), + ) + .exec(&tx) + .await?; + project::Entity::delete_many() + .filter(project::Column::HostConnectionEpoch.ne(self.epoch)) + .exec(&tx) + .await?; + tx.commit().await?; + Ok(()) + }) + .await + } + // users pub async fn create_user( @@ -983,8 +1009,10 @@ impl Database { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(user_id), answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), + answering_connection_epoch: ActiveValue::set(Some(self.epoch)), calling_user_id: ActiveValue::set(user_id), calling_connection_id: ActiveValue::set(connection_id.0 as i32), + calling_connection_epoch: ActiveValue::set(self.epoch), ..Default::default() } .insert(&tx) @@ -1010,6 +1038,7 @@ impl Database { user_id: ActiveValue::set(called_user_id), calling_user_id: ActiveValue::set(calling_user_id), calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32), + calling_connection_epoch: ActiveValue::set(self.epoch), initial_project_id: ActiveValue::set(initial_project_id), ..Default::default() } @@ -1127,6 +1156,7 @@ impl Database { ) .set(room_participant::ActiveModel { answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), + answering_connection_epoch: ActiveValue::set(Some(self.epoch)), ..Default::default() }) .exec(&tx) @@ -1489,6 +1519,7 @@ impl Database { room_id: ActiveValue::set(participant.room_id), host_user_id: ActiveValue::set(participant.user_id), host_connection_id: ActiveValue::set(connection_id.0 as i32), + host_connection_epoch: ActiveValue::set(self.epoch), ..Default::default() } .insert(&tx) @@ -1513,6 +1544,7 @@ impl Database { project_collaborator::ActiveModel { project_id: ActiveValue::set(project.id), connection_id: ActiveValue::set(connection_id.0 as i32), + connection_epoch: ActiveValue::set(self.epoch), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(ReplicaId(0)), is_host: ActiveValue::set(true), @@ -1832,6 +1864,7 @@ impl Database { let new_collaborator = project_collaborator::ActiveModel { project_id: ActiveValue::set(project_id), connection_id: ActiveValue::set(connection_id.0 as i32), + connection_epoch: ActiveValue::set(self.epoch), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(replica_id), is_host: ActiveValue::set(false), diff --git a/crates/collab/src/db/project.rs b/crates/collab/src/db/project.rs index b109ddc4b8..971a8fcefb 100644 --- a/crates/collab/src/db/project.rs +++ b/crates/collab/src/db/project.rs @@ -9,6 +9,7 @@ pub struct Model { pub room_id: RoomId, pub host_user_id: UserId, pub host_connection_id: i32, + pub host_connection_epoch: Uuid, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/db/project_collaborator.rs b/crates/collab/src/db/project_collaborator.rs index 097272fcda..5db307f5df 100644 --- a/crates/collab/src/db/project_collaborator.rs +++ b/crates/collab/src/db/project_collaborator.rs @@ -8,6 +8,7 @@ pub struct Model { pub id: ProjectCollaboratorId, pub project_id: ProjectId, pub connection_id: i32, + pub connection_epoch: Uuid, pub user_id: UserId, pub replica_id: ReplicaId, pub is_host: bool, diff --git a/crates/collab/src/db/room_participant.rs b/crates/collab/src/db/room_participant.rs index c7c804581b..783f45aa93 100644 --- a/crates/collab/src/db/room_participant.rs +++ b/crates/collab/src/db/room_participant.rs @@ -9,11 +9,13 @@ pub struct Model { pub room_id: RoomId, pub user_id: UserId, pub answering_connection_id: Option, + pub answering_connection_epoch: Option, pub location_kind: Option, pub location_project_id: Option, pub initial_project_id: Option, pub calling_user_id: UserId, pub calling_connection_id: i32, + pub calling_connection_epoch: Uuid, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 42ffe50ea3..a288e0f3ce 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -52,6 +52,8 @@ async fn main() -> Result<()> { init_tracing(&config); let state = AppState::new(config).await?; + state.db.clear_stale_data().await?; + let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port)) .expect("failed to bind TCP listener");