From 6c58a4f885a8761ecbb8cae3f06a34a64b6afe8f Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 14 Dec 2022 17:34:24 -0800 Subject: [PATCH] Fix stale server queries, use foreign keys from connectionsn to servers --- crates/collab/.env.toml | 1 + .../20221109000000_test_schema.sql | 10 +- ...4346_change_epoch_from_uuid_to_integer.sql | 34 ++-- crates/collab/src/db.rs | 151 +++++++++--------- crates/collab/src/db/project.rs | 4 +- crates/collab/src/db/project_collaborator.rs | 4 +- crates/collab/src/db/room_participant.rs | 6 +- crates/collab/src/db/server.rs | 4 +- crates/collab/src/db/tests.rs | 18 +-- crates/collab/src/rpc.rs | 18 +-- 10 files changed, 128 insertions(+), 122 deletions(-) diff --git a/crates/collab/.env.toml b/crates/collab/.env.toml index 1945d9cb66..b4a6694e5e 100644 --- a/crates/collab/.env.toml +++ b/crates/collab/.env.toml @@ -2,6 +2,7 @@ DATABASE_URL = "postgres://postgres@localhost/zed" HTTP_PORT = 8080 API_TOKEN = "secret" INVITE_LINK_PREFIX = "http://localhost:3000/invites/" +ZED_ENVIRONMENT = "development" LIVE_KIT_SERVER = "http://localhost:7880" LIVE_KIT_KEY = "devkey" LIVE_KIT_SECRET = "secret" diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index fc048be48e..4ae6f54e37 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -44,7 +44,7 @@ CREATE TABLE "projects" ( "room_id" INTEGER REFERENCES rooms (id) NOT NULL, "host_user_id" INTEGER REFERENCES users (id) NOT NULL, "host_connection_id" INTEGER NOT NULL, - "host_connection_epoch" INTEGER NOT NULL, + "host_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, "unregistered" BOOLEAN NOT NULL DEFAULT FALSE ); CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch"); @@ -103,7 +103,7 @@ CREATE TABLE "project_collaborators" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, - "connection_epoch" INTEGER NOT NULL, + "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, "is_host" BOOLEAN NOT NULL @@ -119,14 +119,14 @@ CREATE TABLE "room_participants" ( "room_id" INTEGER NOT NULL REFERENCES rooms (id), "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, - "answering_connection_epoch" INTEGER, + "answering_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE CASCADE, "answering_connection_lost" BOOLEAN NOT NULL, "location_kind" INTEGER, "location_project_id" INTEGER, "initial_project_id" INTEGER, "calling_user_id" INTEGER NOT NULL REFERENCES users (id), "calling_connection_id" INTEGER NOT NULL, - "calling_connection_epoch" INTEGER NOT NULL + "calling_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE SET NULL ); CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id"); @@ -136,6 +136,6 @@ CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_parti CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch"); CREATE TABLE "servers" ( - "epoch" INTEGER PRIMARY KEY AUTOINCREMENT, + "id" INTEGER PRIMARY KEY AUTOINCREMENT, "environment" VARCHAR NOT NULL ); diff --git a/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql b/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql index addd5451e8..e9878a96e2 100644 --- a/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql +++ b/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql @@ -1,14 +1,22 @@ -ALTER TABLE "projects" - ALTER COLUMN "host_connection_epoch" TYPE INTEGER USING -1; - -ALTER TABLE "project_collaborators" - ALTER COLUMN "connection_epoch" TYPE INTEGER USING -1; - -ALTER TABLE "room_participants" - ALTER COLUMN "answering_connection_epoch" TYPE INTEGER USING -1, - ALTER COLUMN "calling_connection_epoch" TYPE INTEGER USING -1; - -CREATE TABLE "servers" ( - "epoch" SERIAL PRIMARY KEY, - "environment" VARCHAR NOT NULL +CREATE TABLE servers ( + id SERIAL PRIMARY KEY, + environment VARCHAR NOT NULL ); + +DELETE FROM projects; +ALTER TABLE projects + DROP COLUMN host_connection_epoch, + ADD COLUMN host_connection_server_id INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE; + +DELETE FROM project_collaborators; +ALTER TABLE project_collaborators + DROP COLUMN connection_epoch, + ADD COLUMN connection_server_id INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE; + +DELETE FROM room_participants; +ALTER TABLE room_participants + DROP COLUMN answering_connection_epoch, + DROP COLUMN calling_connection_epoch, + ADD COLUMN answering_connection_server_id INTEGER REFERENCES servers (id) ON DELETE CASCADE, + ADD COLUMN calling_connection_server_id INTEGER REFERENCES servers (id) ON DELETE SET NULL; + diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 46ed2ce9f7..71faf16b45 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -110,7 +110,7 @@ impl Database { Ok(new_migrations) } - pub async fn create_server(&self, environment: &str) -> Result { + pub async fn create_server(&self, environment: &str) -> Result { self.transaction(|tx| async move { let server = server::ActiveModel { environment: ActiveValue::set(environment.into()), @@ -118,30 +118,29 @@ impl Database { } .insert(&*tx) .await?; - Ok(server.epoch) + Ok(server.id) }) .await } pub async fn delete_stale_projects( &self, - new_epoch: ServerEpoch, + new_epoch: ServerId, environment: &str, ) -> Result<()> { self.transaction(|tx| async move { - let stale_server_epochs = self - .stale_server_epochs(environment, new_epoch, &tx) - .await?; + let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?; project_collaborator::Entity::delete_many() .filter( - project_collaborator::Column::ConnectionEpoch + project_collaborator::Column::ConnectionServerId .is_in(stale_server_epochs.iter().copied()), ) .exec(&*tx) .await?; project::Entity::delete_many() .filter( - project::Column::HostConnectionEpoch.is_in(stale_server_epochs.iter().copied()), + project::Column::HostConnectionServerId + .is_in(stale_server_epochs.iter().copied()), ) .exec(&*tx) .await?; @@ -152,7 +151,7 @@ impl Database { pub async fn stale_room_ids( &self, - new_epoch: ServerEpoch, + new_epoch: ServerId, environment: &str, ) -> Result> { self.transaction(|tx| async move { @@ -161,15 +160,14 @@ impl Database { RoomId, } - let stale_server_epochs = self - .stale_server_epochs(environment, new_epoch, &tx) - .await?; + let stale_server_epochs = self.stale_server_ids(environment, new_epoch, &tx).await?; Ok(room_participant::Entity::find() .select_only() .column(room_participant::Column::RoomId) .distinct() .filter( - room_participant::Column::AnsweringConnectionEpoch.is_in(stale_server_epochs), + room_participant::Column::AnsweringConnectionServerId + .is_in(stale_server_epochs), ) .into_values::<_, QueryAs>() .all(&*tx) @@ -181,13 +179,13 @@ impl Database { pub async fn refresh_room( &self, room_id: RoomId, - new_epoch: ServerEpoch, + new_epoch: ServerId, ) -> Result> { self.room_transaction(|tx| async move { let stale_participant_filter = Condition::all() .add(room_participant::Column::RoomId.eq(room_id)) .add(room_participant::Column::AnsweringConnectionId.is_not_null()) - .add(room_participant::Column::AnsweringConnectionEpoch.ne(new_epoch)); + .add(room_participant::Column::AnsweringConnectionServerId.ne(new_epoch)); let stale_participant_user_ids = room_participant::Entity::find() .filter(stale_participant_filter.clone()) @@ -231,19 +229,13 @@ impl Database { .await } - pub async fn delete_stale_servers( - &self, - new_epoch: ServerEpoch, - environment: &str, - ) -> Result<()> { + pub async fn delete_stale_servers(&self, new_epoch: ServerId, environment: &str) -> Result<()> { self.transaction(|tx| async move { server::Entity::delete_many() .filter( - Condition::all().add( - server::Column::Environment - .eq(environment) - .add(server::Column::Epoch.ne(new_epoch)), - ), + Condition::all() + .add(server::Column::Environment.eq(environment)) + .add(server::Column::Id.ne(new_epoch)), ) .exec(&*tx) .await?; @@ -252,26 +244,21 @@ impl Database { .await } - async fn stale_server_epochs( + async fn stale_server_ids( &self, environment: &str, - new_epoch: ServerEpoch, + new_epoch: ServerId, tx: &DatabaseTransaction, - ) -> Result> { + ) -> Result> { let stale_servers = server::Entity::find() .filter( - Condition::all().add( - server::Column::Environment - .eq(environment) - .add(server::Column::Epoch.ne(new_epoch)), - ), + Condition::all() + .add(server::Column::Environment.eq(environment)) + .add(server::Column::Id.ne(new_epoch)), ) .all(&*tx) .await?; - Ok(stale_servers - .into_iter() - .map(|server| server.epoch) - .collect()) + Ok(stale_servers.into_iter().map(|server| server.id).collect()) } // users @@ -1167,11 +1154,15 @@ impl Database { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(user_id), answering_connection_id: ActiveValue::set(Some(connection.id as i32)), - answering_connection_epoch: ActiveValue::set(Some(connection.epoch as i32)), + answering_connection_server_id: ActiveValue::set(Some(ServerId( + connection.epoch as i32, + ))), answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(user_id), calling_connection_id: ActiveValue::set(connection.id as i32), - calling_connection_epoch: ActiveValue::set(connection.epoch as i32), + calling_connection_server_id: ActiveValue::set(Some(ServerId( + connection.epoch as i32, + ))), ..Default::default() } .insert(&*tx) @@ -1198,7 +1189,9 @@ impl Database { answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(calling_user_id), calling_connection_id: ActiveValue::set(calling_connection.id as i32), - calling_connection_epoch: ActiveValue::set(calling_connection.epoch as i32), + calling_connection_server_id: ActiveValue::set(Some(ServerId( + calling_connection.epoch as i32, + ))), initial_project_id: ActiveValue::set(initial_project_id), ..Default::default() } @@ -1280,7 +1273,7 @@ impl Database { .eq(calling_connection.id as i32), ) .add( - room_participant::Column::CallingConnectionEpoch + room_participant::Column::CallingConnectionServerId .eq(calling_connection.epoch as i32), ) .add(room_participant::Column::AnsweringConnectionId.is_null()), @@ -1320,14 +1313,16 @@ impl Database { .add(room_participant::Column::AnsweringConnectionId.is_null()) .add(room_participant::Column::AnsweringConnectionLost.eq(true)) .add( - room_participant::Column::AnsweringConnectionEpoch + room_participant::Column::AnsweringConnectionServerId .ne(connection.epoch as i32), ), ), ) .set(room_participant::ActiveModel { answering_connection_id: ActiveValue::set(Some(connection.id as i32)), - answering_connection_epoch: ActiveValue::set(Some(connection.epoch as i32)), + answering_connection_server_id: ActiveValue::set(Some(ServerId( + connection.epoch as i32, + ))), answering_connection_lost: ActiveValue::set(false), ..Default::default() }) @@ -1353,7 +1348,7 @@ impl Database { .eq(connection.id as i32), ) .add( - room_participant::Column::AnsweringConnectionEpoch + room_participant::Column::AnsweringConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1376,7 +1371,7 @@ impl Database { .eq(connection.id as i32), ) .add( - room_participant::Column::CallingConnectionEpoch + room_participant::Column::CallingConnectionServerId .eq(connection.epoch as i32), ) .add(room_participant::Column::AnsweringConnectionId.is_null()), @@ -1412,7 +1407,7 @@ impl Database { project_collaborator::Column::ConnectionId.eq(connection.id as i32), ) .add( - project_collaborator::Column::ConnectionEpoch + project_collaborator::Column::ConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1437,7 +1432,7 @@ impl Database { }); let collaborator_connection_id = ConnectionId { - epoch: collaborator.connection_epoch as u32, + epoch: collaborator.connection_server_id.0 as u32, id: collaborator.connection_id as u32, }; if collaborator_connection_id != connection { @@ -1459,7 +1454,7 @@ impl Database { project_collaborator::Column::ConnectionId.eq(connection.id as i32), ) .add( - project_collaborator::Column::ConnectionEpoch + project_collaborator::Column::ConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1472,7 +1467,9 @@ impl Database { Condition::all() .add(project::Column::RoomId.eq(room_id)) .add(project::Column::HostConnectionId.eq(connection.id as i32)) - .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), + .add( + project::Column::HostConnectionServerId.eq(connection.epoch as i32), + ), ) .exec(&*tx) .await?; @@ -1538,7 +1535,7 @@ impl Database { .eq(connection.id as i32), ) .add( - room_participant::Column::AnsweringConnectionEpoch + room_participant::Column::AnsweringConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1573,7 +1570,7 @@ impl Database { .eq(connection.id as i32), ) .add( - room_participant::Column::AnsweringConnectionEpoch + room_participant::Column::AnsweringConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1595,7 +1592,7 @@ impl Database { Condition::all() .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) .add( - project_collaborator::Column::ConnectionEpoch + project_collaborator::Column::ConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1606,7 +1603,7 @@ impl Database { Condition::all() .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) .add( - project_collaborator::Column::ConnectionEpoch + project_collaborator::Column::ConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1624,7 +1621,7 @@ impl Database { .into_iter() .map(|collaborator| ConnectionId { id: collaborator.connection_id as u32, - epoch: collaborator.connection_epoch as u32, + epoch: collaborator.connection_server_id.0 as u32, }) .collect(); @@ -1633,7 +1630,7 @@ impl Database { host_user_id: project.host_user_id, host_connection_id: ConnectionId { id: project.host_connection_id as u32, - epoch: project.host_connection_epoch as u32, + epoch: project.host_connection_server_id.0 as u32, }, connection_ids, }); @@ -1644,7 +1641,7 @@ impl Database { .filter( Condition::all() .add(project::Column::HostConnectionId.eq(connection.id as i32)) - .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), + .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)), ) .exec(&*tx) .await?; @@ -1696,9 +1693,9 @@ impl Database { let mut pending_participants = Vec::new(); while let Some(db_participant) = db_participants.next().await { let db_participant = db_participant?; - if let Some((answering_connection_id, answering_connection_epoch)) = db_participant + if let Some((answering_connection_id, answering_connection_server_id)) = db_participant .answering_connection_id - .zip(db_participant.answering_connection_epoch) + .zip(db_participant.answering_connection_server_id) { let location = match ( db_participant.location_kind, @@ -1720,7 +1717,7 @@ impl Database { }; let answering_connection = ConnectionId { - epoch: answering_connection_epoch as u32, + epoch: answering_connection_server_id.0 as u32, id: answering_connection_id as u32, }; participants.insert( @@ -1751,7 +1748,7 @@ impl Database { while let Some(row) = db_projects.next().await { let (db_project, db_worktree) = row?; let host_connection = ConnectionId { - epoch: db_project.host_connection_epoch as u32, + epoch: db_project.host_connection_server_id.0 as u32, id: db_project.host_connection_id as u32, }; if let Some(participant) = participants.get_mut(&host_connection) { @@ -1820,7 +1817,7 @@ impl Database { .eq(connection.id as i32), ) .add( - room_participant::Column::AnsweringConnectionEpoch + room_participant::Column::AnsweringConnectionServerId .eq(connection.epoch as i32), ), ) @@ -1835,7 +1832,7 @@ impl Database { room_id: ActiveValue::set(participant.room_id), host_user_id: ActiveValue::set(participant.user_id), host_connection_id: ActiveValue::set(connection.id as i32), - host_connection_epoch: ActiveValue::set(connection.epoch as i32), + host_connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)), ..Default::default() } .insert(&*tx) @@ -1860,7 +1857,7 @@ impl Database { project_collaborator::ActiveModel { project_id: ActiveValue::set(project.id), connection_id: ActiveValue::set(connection.id as i32), - connection_epoch: ActiveValue::set(connection.epoch as i32), + connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(ReplicaId(0)), is_host: ActiveValue::set(true), @@ -1888,7 +1885,7 @@ impl Database { .await? .ok_or_else(|| anyhow!("project not found"))?; let host_connection = ConnectionId { - epoch: project.host_connection_epoch as u32, + epoch: project.host_connection_server_id.0 as u32, id: project.host_connection_id as u32, }; if host_connection == connection { @@ -1916,7 +1913,7 @@ impl Database { .filter( Condition::all() .add(project::Column::HostConnectionId.eq(connection.id as i32)) - .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), + .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)), ) .one(&*tx) .await? @@ -1974,7 +1971,7 @@ impl Database { .filter( Condition::all() .add(project::Column::HostConnectionId.eq(connection.id as i32)) - .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), + .add(project::Column::HostConnectionServerId.eq(connection.epoch as i32)), ) .one(&*tx) .await? @@ -2071,7 +2068,7 @@ impl Database { .await? .ok_or_else(|| anyhow!("no such project"))?; let host_connection = ConnectionId { - epoch: project.host_connection_epoch as u32, + epoch: project.host_connection_server_id.0 as u32, id: project.host_connection_id as u32, }; if host_connection != connection { @@ -2128,7 +2125,7 @@ impl Database { .await? .ok_or_else(|| anyhow!("no such project"))?; let host_connection = ConnectionId { - epoch: project.host_connection_epoch as u32, + epoch: project.host_connection_server_id.0 as u32, id: project.host_connection_id as u32, }; if host_connection != connection { @@ -2173,7 +2170,7 @@ impl Database { .eq(connection.id as i32), ) .add( - room_participant::Column::AnsweringConnectionEpoch + room_participant::Column::AnsweringConnectionServerId .eq(connection.epoch as i32), ), ) @@ -2204,7 +2201,7 @@ impl Database { let new_collaborator = project_collaborator::ActiveModel { project_id: ActiveValue::set(project_id), connection_id: ActiveValue::set(connection.id as i32), - connection_epoch: ActiveValue::set(connection.epoch as i32), + connection_server_id: ActiveValue::set(ServerId(connection.epoch as i32)), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(replica_id), is_host: ActiveValue::set(false), @@ -2315,7 +2312,7 @@ impl Database { .add(project_collaborator::Column::ProjectId.eq(project_id)) .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) .add( - project_collaborator::Column::ConnectionEpoch + project_collaborator::Column::ConnectionServerId .eq(connection.epoch as i32), ), ) @@ -2336,7 +2333,7 @@ impl Database { let connection_ids = collaborators .into_iter() .map(|collaborator| ConnectionId { - epoch: collaborator.connection_epoch as u32, + epoch: collaborator.connection_server_id.0 as u32, id: collaborator.connection_id as u32, }) .collect(); @@ -2345,7 +2342,7 @@ impl Database { id: project_id, host_user_id: project.host_user_id, host_connection_id: ConnectionId { - epoch: project.host_connection_epoch as u32, + epoch: project.host_connection_server_id.0 as u32, id: project.host_connection_id as u32, }, connection_ids, @@ -2372,7 +2369,7 @@ impl Database { if collaborators.iter().any(|collaborator| { let collaborator_connection = ConnectionId { - epoch: collaborator.connection_epoch as u32, + epoch: collaborator.connection_server_id.0 as u32, id: collaborator.connection_id as u32, }; collaborator_connection == connection @@ -2404,7 +2401,7 @@ impl Database { while let Some(participant) = participants.next().await { let participant = participant?; connection_ids.insert(ConnectionId { - epoch: participant.connection_epoch as u32, + epoch: participant.connection_server_id.0 as u32, id: participant.connection_id as u32, }); } @@ -2436,7 +2433,7 @@ impl Database { while let Some(participant) = participants.next().await { let participant = participant?; guest_connection_ids.push(ConnectionId { - epoch: participant.connection_epoch as u32, + epoch: participant.connection_server_id.0 as u32, id: participant.connection_id as u32, }); } @@ -2817,7 +2814,7 @@ id_type!(RoomParticipantId); id_type!(ProjectId); id_type!(ProjectCollaboratorId); id_type!(ReplicaId); -id_type!(ServerEpoch); +id_type!(ServerId); id_type!(SignupId); id_type!(UserId); diff --git a/crates/collab/src/db/project.rs b/crates/collab/src/db/project.rs index b8cf321e51..1279d7d449 100644 --- a/crates/collab/src/db/project.rs +++ b/crates/collab/src/db/project.rs @@ -1,4 +1,4 @@ -use super::{ProjectId, RoomId, UserId}; +use super::{ProjectId, RoomId, ServerId, UserId}; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -9,7 +9,7 @@ pub struct Model { pub room_id: RoomId, pub host_user_id: UserId, pub host_connection_id: i32, - pub host_connection_epoch: i32, + pub host_connection_server_id: ServerId, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/db/project_collaborator.rs b/crates/collab/src/db/project_collaborator.rs index e12b113ff9..a1a99d1170 100644 --- a/crates/collab/src/db/project_collaborator.rs +++ b/crates/collab/src/db/project_collaborator.rs @@ -1,4 +1,4 @@ -use super::{ProjectCollaboratorId, ProjectId, ReplicaId, UserId}; +use super::{ProjectCollaboratorId, ProjectId, ReplicaId, ServerId, UserId}; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -8,7 +8,7 @@ pub struct Model { pub id: ProjectCollaboratorId, pub project_id: ProjectId, pub connection_id: i32, - pub connection_epoch: i32, + pub connection_server_id: ServerId, pub user_id: UserId, pub replica_id: ReplicaId, pub is_host: bool, diff --git a/crates/collab/src/db/room_participant.rs b/crates/collab/src/db/room_participant.rs index 265febd545..f939a3bfb8 100644 --- a/crates/collab/src/db/room_participant.rs +++ b/crates/collab/src/db/room_participant.rs @@ -1,4 +1,4 @@ -use super::{ProjectId, RoomId, RoomParticipantId, UserId}; +use super::{ProjectId, RoomId, RoomParticipantId, ServerId, UserId}; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] @@ -9,14 +9,14 @@ pub struct Model { pub room_id: RoomId, pub user_id: UserId, pub answering_connection_id: Option, - pub answering_connection_epoch: Option, + pub answering_connection_server_id: Option, pub answering_connection_lost: bool, pub location_kind: Option, pub location_project_id: Option, pub initial_project_id: Option, pub calling_user_id: UserId, pub calling_connection_id: i32, - pub calling_connection_epoch: i32, + pub calling_connection_server_id: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/db/server.rs b/crates/collab/src/db/server.rs index 6735770a0f..e3905f2448 100644 --- a/crates/collab/src/db/server.rs +++ b/crates/collab/src/db/server.rs @@ -1,11 +1,11 @@ -use super::ServerEpoch; +use super::ServerId; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] #[sea_orm(table_name = "servers")] pub struct Model { #[sea_orm(primary_key)] - pub epoch: ServerEpoch, + pub id: ServerId, pub environment: String, } diff --git a/crates/collab/src/db/tests.rs b/crates/collab/src/db/tests.rs index b6b8a780a7..c229f0a617 100644 --- a/crates/collab/src/db/tests.rs +++ b/crates/collab/src/db/tests.rs @@ -410,6 +410,8 @@ test_both_dbs!( test_project_count_sqlite, db, { + let epoch = db.create_server("test").await.unwrap().0 as u32; + let user1 = db .create_user( &format!("admin@example.com"), @@ -436,7 +438,7 @@ test_both_dbs!( .unwrap(); let room_id = RoomId::from_proto( - db.create_room(user1.user_id, ConnectionId { epoch: 0, id: 0 }, "") + db.create_room(user1.user_id, ConnectionId { epoch, id: 0 }, "") .await .unwrap() .id, @@ -444,36 +446,34 @@ test_both_dbs!( db.call( room_id, user1.user_id, - ConnectionId { epoch: 0, id: 0 }, + ConnectionId { epoch, id: 0 }, user2.user_id, None, ) .await .unwrap(); - db.join_room(room_id, user2.user_id, ConnectionId { epoch: 0, id: 1 }) + db.join_room(room_id, user2.user_id, ConnectionId { epoch, id: 1 }) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0); - db.share_project(room_id, ConnectionId { epoch: 0, id: 1 }, &[]) + db.share_project(room_id, ConnectionId { epoch, id: 1 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 1); - db.share_project(room_id, ConnectionId { epoch: 0, id: 1 }, &[]) + db.share_project(room_id, ConnectionId { epoch, id: 1 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2); // Projects shared by admins aren't counted. - db.share_project(room_id, ConnectionId { epoch: 0, id: 0 }, &[]) + db.share_project(room_id, ConnectionId { epoch, id: 0 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2); - db.leave_room(ConnectionId { epoch: 0, id: 1 }) - .await - .unwrap(); + db.leave_room(ConnectionId { epoch, id: 1 }).await.unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0); } ); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d3a1c0ebf1..9ed50e802c 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,7 @@ mod connection_pool; use crate::{ auth, - db::{self, Database, ProjectId, RoomId, ServerEpoch, User, UserId}, + db::{self, Database, ProjectId, RoomId, ServerId, User, UserId}, executor::Executor, AppState, Result, }; @@ -138,7 +138,7 @@ impl Deref for DbHandle { } pub struct Server { - epoch: parking_lot::Mutex, + epoch: parking_lot::Mutex, peer: Arc, pub(crate) connection_pool: Arc>, app_state: Arc, @@ -169,7 +169,7 @@ where } impl Server { - pub fn new(epoch: ServerEpoch, app_state: Arc, executor: Executor) -> Arc { + pub fn new(epoch: ServerId, app_state: Arc, executor: Executor) -> Arc { let mut server = Self { epoch: parking_lot::Mutex::new(epoch), peer: Peer::new(epoch.0 as u32), @@ -370,7 +370,7 @@ impl Server { } #[cfg(test)] - pub fn reset(&self, epoch: ServerEpoch) { + pub fn reset(&self, epoch: ServerId) { self.teardown(); *self.epoch.lock() = epoch; self.peer.reset(epoch.0 as u32); @@ -1156,7 +1156,7 @@ async fn join_project( .iter() .map(|collaborator| { let peer_id = proto::PeerId { - epoch: collaborator.connection_epoch as u32, + epoch: collaborator.connection_server_id.0 as u32, id: collaborator.connection_id as u32, }; proto::Collaborator { @@ -1412,7 +1412,7 @@ where .find(|collaborator| collaborator.is_host) .ok_or_else(|| anyhow!("host not found"))?; ConnectionId { - epoch: host.connection_epoch as u32, + epoch: host.connection_server_id.0 as u32, id: host.connection_id as u32, } }; @@ -1443,7 +1443,7 @@ async fn save_buffer( .find(|collaborator| collaborator.is_host) .ok_or_else(|| anyhow!("host not found"))?; ConnectionId { - epoch: host.connection_epoch as u32, + epoch: host.connection_server_id.0 as u32, id: host.connection_id as u32, } }; @@ -1459,13 +1459,13 @@ async fn save_buffer( .await?; collaborators.retain(|collaborator| { let collaborator_connection = ConnectionId { - epoch: collaborator.connection_epoch as u32, + epoch: collaborator.connection_server_id.0 as u32, id: collaborator.connection_id as u32, }; collaborator_connection != session.connection_id }); let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId { - epoch: collaborator.connection_epoch as u32, + epoch: collaborator.connection_server_id.0 as u32, id: collaborator.connection_id as u32, }); broadcast(host_connection_id, project_connection_ids, |conn_id| {