From 40430cf01ba433a5f5a634b4779273f4c4f5c5da Mon Sep 17 00:00:00 2001 From: Mikayla Date: Tue, 10 Oct 2023 12:38:20 -0700 Subject: [PATCH] Update channel rooms to be ephemeral Remove redundant live kit initialization code Fix bug in recent channel links changes where channel rooms would have the incorrect release set co-authored-by: Conrad Irwin co-authored-by: Max --- .../20221109000000_test_schema.sql | 1 + ...0_add_unique_index_on_rooms_channel_id.sql | 1 + crates/collab/src/db/queries/channels.rs | 57 +++++++++-------- crates/collab/src/db/queries/rooms.rs | 5 +- crates/collab/src/db/tests/buffer_tests.rs | 6 +- crates/collab/src/db/tests/channel_tests.rs | 63 +++++++------------ crates/collab/src/db/tests/message_tests.rs | 20 ++---- crates/collab/src/rpc.rs | 17 ++--- crates/collab/src/tests/channel_tests.rs | 2 + .../src/tests/random_channel_buffer_tests.rs | 7 +-- crates/live_kit_client/src/test.rs | 5 +- 11 files changed, 73 insertions(+), 111 deletions(-) create mode 100644 crates/collab/migrations/20231010114600_add_unique_index_on_rooms_channel_id.sql diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index a9b8d9709d..e5104839b8 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -40,6 +40,7 @@ CREATE TABLE "rooms" ( "release_channel" VARCHAR, "channel_id" INTEGER REFERENCES channels (id) ON DELETE CASCADE ); +CREATE UNIQUE INDEX "index_rooms_on_channel_id" ON "rooms" ("channel_id"); CREATE TABLE "projects" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/crates/collab/migrations/20231010114600_add_unique_index_on_rooms_channel_id.sql b/crates/collab/migrations/20231010114600_add_unique_index_on_rooms_channel_id.sql new file mode 100644 index 0000000000..21ec4cfbb7 --- /dev/null +++ b/crates/collab/migrations/20231010114600_add_unique_index_on_rooms_channel_id.sql @@ -0,0 +1 @@ +CREATE UNIQUE INDEX "index_rooms_on_channel_id" ON "rooms" ("channel_id"); diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index ab31f59541..b2fa2eb9b5 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -19,21 +19,14 @@ impl Database { .await } - pub async fn create_root_channel( - &self, - name: &str, - live_kit_room: &str, - creator_id: UserId, - ) -> Result { - self.create_channel(name, None, live_kit_room, creator_id) - .await + pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result { + self.create_channel(name, None, creator_id).await } pub async fn create_channel( &self, name: &str, parent: Option, - live_kit_room: &str, creator_id: UserId, ) -> Result { let name = Self::sanitize_channel_name(name)?; @@ -90,14 +83,6 @@ impl Database { .insert(&*tx) .await?; - room::ActiveModel { - channel_id: ActiveValue::Set(Some(channel.id)), - live_kit_room: ActiveValue::Set(live_kit_room.to_string()), - ..Default::default() - } - .insert(&*tx) - .await?; - Ok(channel.id) }) .await @@ -797,18 +782,36 @@ impl Database { .await } - pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result { + pub async fn get_or_create_channel_room( + &self, + channel_id: ChannelId, + live_kit_room: &str, + enviroment: &str, + ) -> Result { self.transaction(|tx| async move { let tx = tx; - let room = channel::Model { - id: channel_id, - ..Default::default() - } - .find_related(room::Entity) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("invalid channel"))?; - Ok(room.id) + + let room = room::Entity::find() + .filter(room::Column::ChannelId.eq(channel_id)) + .one(&*tx) + .await?; + + let room_id = if let Some(room) = room { + room.id + } else { + let result = room::Entity::insert(room::ActiveModel { + channel_id: ActiveValue::Set(Some(channel_id)), + live_kit_room: ActiveValue::Set(live_kit_room.to_string()), + release_channel: ActiveValue::Set(Some(enviroment.to_string())), + ..Default::default() + }) + .exec(&*tx) + .await?; + + result.last_insert_id + }; + + Ok(room_id) }) .await } diff --git a/crates/collab/src/db/queries/rooms.rs b/crates/collab/src/db/queries/rooms.rs index 6589f23791..4763b3f5cd 100644 --- a/crates/collab/src/db/queries/rooms.rs +++ b/crates/collab/src/db/queries/rooms.rs @@ -832,10 +832,7 @@ impl Database { let (channel_id, room) = self.get_channel_room(room_id, &tx).await?; let deleted = if room.participants.is_empty() { - let result = room::Entity::delete_by_id(room_id) - .filter(room::Column::ChannelId.is_null()) - .exec(&*tx) - .await?; + let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?; result.rows_affected > 0 } else { false diff --git a/crates/collab/src/db/tests/buffer_tests.rs b/crates/collab/src/db/tests/buffer_tests.rs index f6e91b91f0..0ac41a8b0b 100644 --- a/crates/collab/src/db/tests/buffer_tests.rs +++ b/crates/collab/src/db/tests/buffer_tests.rs @@ -54,7 +54,7 @@ async fn test_channel_buffers(db: &Arc) { let owner_id = db.create_server("production").await.unwrap().0 as u32; - let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap(); + let zed_id = db.create_root_channel("zed", a_id).await.unwrap(); db.invite_channel_member(zed_id, b_id, a_id, false) .await @@ -141,7 +141,7 @@ async fn test_channel_buffers(db: &Arc) { assert_eq!(left_buffer.connections, &[connection_id_a],); - let cargo_id = db.create_root_channel("cargo", "2", a_id).await.unwrap(); + let cargo_id = db.create_root_channel("cargo", a_id).await.unwrap(); let _ = db .join_channel_buffer(cargo_id, a_id, connection_id_a) .await @@ -207,7 +207,7 @@ async fn test_channel_buffers_last_operations(db: &Database) { let mut text_buffers = Vec::new(); for i in 0..3 { let channel = db - .create_root_channel(&format!("channel-{i}"), &format!("room-{i}"), user_id) + .create_root_channel(&format!("channel-{i}"), user_id) .await .unwrap(); diff --git a/crates/collab/src/db/tests/channel_tests.rs b/crates/collab/src/db/tests/channel_tests.rs index 2631e0d191..7d2bc04a35 100644 --- a/crates/collab/src/db/tests/channel_tests.rs +++ b/crates/collab/src/db/tests/channel_tests.rs @@ -45,7 +45,7 @@ async fn test_channels(db: &Arc) { .unwrap() .user_id; - let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap(); + let zed_id = db.create_root_channel("zed", a_id).await.unwrap(); // Make sure that people cannot read channels they haven't been invited to assert!(db.get_channel(zed_id, b_id).await.unwrap().is_none()); @@ -58,16 +58,13 @@ async fn test_channels(db: &Arc) { .await .unwrap(); - let crdb_id = db - .create_channel("crdb", Some(zed_id), "2", a_id) - .await - .unwrap(); + let crdb_id = db.create_channel("crdb", Some(zed_id), a_id).await.unwrap(); let livestreaming_id = db - .create_channel("livestreaming", Some(zed_id), "3", a_id) + .create_channel("livestreaming", Some(zed_id), a_id) .await .unwrap(); let replace_id = db - .create_channel("replace", Some(zed_id), "4", a_id) + .create_channel("replace", Some(zed_id), a_id) .await .unwrap(); @@ -75,14 +72,14 @@ async fn test_channels(db: &Arc) { members.sort(); assert_eq!(members, &[a_id, b_id]); - let rust_id = db.create_root_channel("rust", "5", a_id).await.unwrap(); + let rust_id = db.create_root_channel("rust", a_id).await.unwrap(); let cargo_id = db - .create_channel("cargo", Some(rust_id), "6", a_id) + .create_channel("cargo", Some(rust_id), a_id) .await .unwrap(); let cargo_ra_id = db - .create_channel("cargo-ra", Some(cargo_id), "7", a_id) + .create_channel("cargo-ra", Some(cargo_id), a_id) .await .unwrap(); @@ -202,11 +199,11 @@ async fn test_joining_channels(db: &Arc) { .unwrap() .user_id; - let channel_1 = db - .create_root_channel("channel_1", "1", user_1) + let channel_1 = db.create_root_channel("channel_1", user_1).await.unwrap(); + let room_1 = db + .get_or_create_channel_room(channel_1, "1", TEST_RELEASE_CHANNEL) .await .unwrap(); - let room_1 = db.room_id_for_channel(channel_1).await.unwrap(); // can join a room with membership to its channel let joined_room = db @@ -283,15 +280,9 @@ async fn test_channel_invites(db: &Arc) { .unwrap() .user_id; - let channel_1_1 = db - .create_root_channel("channel_1", "1", user_1) - .await - .unwrap(); + let channel_1_1 = db.create_root_channel("channel_1", user_1).await.unwrap(); - let channel_1_2 = db - .create_root_channel("channel_2", "2", user_1) - .await - .unwrap(); + let channel_1_2 = db.create_root_channel("channel_2", user_1).await.unwrap(); db.invite_channel_member(channel_1_1, user_2, user_1, false) .await @@ -353,7 +344,7 @@ async fn test_channel_invites(db: &Arc) { .unwrap(); let channel_1_3 = db - .create_channel("channel_3", Some(channel_1_1), "1", user_1) + .create_channel("channel_3", Some(channel_1_1), user_1) .await .unwrap(); @@ -415,7 +406,7 @@ async fn test_channel_renames(db: &Arc) { .unwrap() .user_id; - let zed_id = db.create_root_channel("zed", "1", user_1).await.unwrap(); + let zed_id = db.create_root_channel("zed", user_1).await.unwrap(); db.rename_channel(zed_id, user_1, "#zed-archive") .await @@ -460,25 +451,22 @@ async fn test_db_channel_moving(db: &Arc) { .unwrap() .user_id; - let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap(); + let zed_id = db.create_root_channel("zed", a_id).await.unwrap(); - let crdb_id = db - .create_channel("crdb", Some(zed_id), "2", a_id) - .await - .unwrap(); + let crdb_id = db.create_channel("crdb", Some(zed_id), a_id).await.unwrap(); let gpui2_id = db - .create_channel("gpui2", Some(zed_id), "3", a_id) + .create_channel("gpui2", Some(zed_id), a_id) .await .unwrap(); let livestreaming_id = db - .create_channel("livestreaming", Some(crdb_id), "4", a_id) + .create_channel("livestreaming", Some(crdb_id), a_id) .await .unwrap(); let livestreaming_dag_id = db - .create_channel("livestreaming_dag", Some(livestreaming_id), "5", a_id) + .create_channel("livestreaming_dag", Some(livestreaming_id), a_id) .await .unwrap(); @@ -531,12 +519,7 @@ async fn test_db_channel_moving(db: &Arc) { // ======================================================================== // Create a new channel below a channel with multiple parents let livestreaming_dag_sub_id = db - .create_channel( - "livestreaming_dag_sub", - Some(livestreaming_dag_id), - "6", - a_id, - ) + .create_channel("livestreaming_dag_sub", Some(livestreaming_dag_id), a_id) .await .unwrap(); @@ -826,15 +809,15 @@ async fn test_db_channel_moving_bugs(db: &Arc) { .unwrap() .user_id; - let zed_id = db.create_root_channel("zed", "1", user_id).await.unwrap(); + let zed_id = db.create_root_channel("zed", user_id).await.unwrap(); let projects_id = db - .create_channel("projects", Some(zed_id), "2", user_id) + .create_channel("projects", Some(zed_id), user_id) .await .unwrap(); let livestreaming_id = db - .create_channel("livestreaming", Some(projects_id), "3", user_id) + .create_channel("livestreaming", Some(projects_id), user_id) .await .unwrap(); diff --git a/crates/collab/src/db/tests/message_tests.rs b/crates/collab/src/db/tests/message_tests.rs index 464aaba207..e758fcfb5d 100644 --- a/crates/collab/src/db/tests/message_tests.rs +++ b/crates/collab/src/db/tests/message_tests.rs @@ -25,10 +25,7 @@ async fn test_channel_message_retrieval(db: &Arc) { .await .unwrap() .user_id; - let channel = db - .create_channel("channel", None, "room", user) - .await - .unwrap(); + let channel = db.create_channel("channel", None, user).await.unwrap(); let owner_id = db.create_server("test").await.unwrap().0 as u32; db.join_channel_chat(channel, rpc::ConnectionId { owner_id, id: 0 }, user) @@ -90,10 +87,7 @@ async fn test_channel_message_nonces(db: &Arc) { .await .unwrap() .user_id; - let channel = db - .create_channel("channel", None, "room", user) - .await - .unwrap(); + let channel = db.create_channel("channel", None, user).await.unwrap(); let owner_id = db.create_server("test").await.unwrap().0 as u32; @@ -157,15 +151,9 @@ async fn test_channel_message_new_notification(db: &Arc) { .unwrap() .user_id; - let channel_1 = db - .create_channel("channel", None, "room", user) - .await - .unwrap(); + let channel_1 = db.create_channel("channel", None, user).await.unwrap(); - let channel_2 = db - .create_channel("channel-2", None, "room", user) - .await - .unwrap(); + let channel_2 = db.create_channel("channel-2", None, user).await.unwrap(); db.invite_channel_member(channel_1, observer, user, false) .await diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 268228077f..e5c6d94ce0 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -938,11 +938,6 @@ async fn create_room( util::async_iife!({ let live_kit = live_kit?; - live_kit - .create_room(live_kit_room.clone()) - .await - .trace_err()?; - let token = live_kit .room_token(&live_kit_room, &session.user_id.to_string()) .trace_err()?; @@ -2206,15 +2201,10 @@ async fn create_channel( session: Session, ) -> Result<()> { let db = session.db().await; - let live_kit_room = format!("channel-{}", nanoid::nanoid!(30)); - - if let Some(live_kit) = session.live_kit_client.as_ref() { - live_kit.create_room(live_kit_room.clone()).await?; - } let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id)); let id = db - .create_channel(&request.name, parent_id, &live_kit_room, session.user_id) + .create_channel(&request.name, parent_id, session.user_id) .await?; let channel = proto::Channel { @@ -2619,12 +2609,15 @@ async fn join_channel( session: Session, ) -> Result<()> { let channel_id = ChannelId::from_proto(request.channel_id); + let live_kit_room = format!("channel-{}", nanoid::nanoid!(30)); let joined_room = { leave_room_for_session(&session).await?; let db = session.db().await; - let room_id = db.room_id_for_channel(channel_id).await?; + let room_id = db + .get_or_create_channel_room(channel_id, &live_kit_room, &*RELEASE_CHANNEL_NAME) + .await?; let joined_room = db .join_room( diff --git a/crates/collab/src/tests/channel_tests.rs b/crates/collab/src/tests/channel_tests.rs index 6bdcee6af3..7cfcce832b 100644 --- a/crates/collab/src/tests/channel_tests.rs +++ b/crates/collab/src/tests/channel_tests.rs @@ -380,6 +380,8 @@ async fn test_channel_room( // Give everyone a chance to observe user A joining deterministic.run_until_parked(); + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + room_a.read_with(cx_a, |room, _| assert!(room.is_connected())); client_a.channel_store().read_with(cx_a, |channels, _| { assert_participants_eq( diff --git a/crates/collab/src/tests/random_channel_buffer_tests.rs b/crates/collab/src/tests/random_channel_buffer_tests.rs index ad0181602c..6e0bef225c 100644 --- a/crates/collab/src/tests/random_channel_buffer_tests.rs +++ b/crates/collab/src/tests/random_channel_buffer_tests.rs @@ -46,12 +46,7 @@ impl RandomizedTest for RandomChannelBufferTest { let db = &server.app_state.db; for ix in 0..CHANNEL_COUNT { let id = db - .create_channel( - &format!("channel-{ix}"), - None, - &format!("livekit-room-{ix}"), - users[0].user_id, - ) + .create_channel(&format!("channel-{ix}"), None, users[0].user_id) .await .unwrap(); for user in &users[1..] { diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 704760bab7..8df8ab4abb 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -91,9 +91,8 @@ impl TestServer { let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); let mut server_rooms = self.rooms.lock(); - let room = server_rooms - .get_mut(&*room_name) - .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?; + let room = (*server_rooms).entry(room_name.to_string()).or_default(); + if room.client_rooms.contains_key(&identity) { Err(anyhow!( "{:?} attempted to join room {:?} twice",