From cc58607c3b0d23d5907008d0f8eb1e9cfc0a8bab Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 11 Nov 2022 14:43:40 +0100 Subject: [PATCH] Move `Store::join_room` into `Db::join_room` --- crates/collab/src/db.rs | 85 ++++++++++++++++++++++++++++++++++ crates/collab/src/rpc.rs | 71 +++++++++++++++------------- crates/collab/src/rpc/store.rs | 51 -------------------- 3 files changed, 125 insertions(+), 82 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 506606274d..7cc0dc35fe 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1061,6 +1061,91 @@ where }) } + pub async fn join_room( + &self, + room_id: RoomId, + user_id: UserId, + connection_id: ConnectionId, + ) -> Result { + test_support!(self, { + let mut tx = self.pool.begin().await?; + sqlx::query( + " + UPDATE calls + SET answering_connection_id = $1 + WHERE room_id = $2 AND called_user_id = $3 + RETURNING 1 + ", + ) + .bind(connection_id.0 as i32) + .bind(room_id) + .bind(user_id) + .fetch_one(&mut tx) + .await?; + + sqlx::query( + " + UPDATE room_participants + SET connection_id = $1 + WHERE room_id = $2 AND user_id = $3 + RETURNING 1 + ", + ) + .bind(connection_id.0 as i32) + .bind(room_id) + .bind(user_id) + .fetch_one(&mut tx) + .await?; + + self.commit_room_transaction(room_id, tx).await + }) + + // let connection = self + // .connections + // .get_mut(&connection_id) + // .ok_or_else(|| anyhow!("no such connection"))?; + // let user_id = connection.user_id; + // let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::>(); + + // let connected_user = self + // .connected_users + // .get_mut(&user_id) + // .ok_or_else(|| anyhow!("no such connection"))?; + // let active_call = connected_user + // .active_call + // .as_mut() + // .ok_or_else(|| anyhow!("not being called"))?; + // anyhow::ensure!( + // active_call.room_id == room_id && active_call.connection_id.is_none(), + // "not being called on this room" + // ); + + // let room = self + // .rooms + // .get_mut(&room_id) + // .ok_or_else(|| anyhow!("no such room"))?; + // anyhow::ensure!( + // room.pending_participant_user_ids + // .contains(&user_id.to_proto()), + // anyhow!("no such room") + // ); + // room.pending_participant_user_ids + // .retain(|pending| *pending != user_id.to_proto()); + // room.participants.push(proto::Participant { + // user_id: user_id.to_proto(), + // peer_id: connection_id.0, + // projects: Default::default(), + // location: Some(proto::ParticipantLocation { + // variant: Some(proto::participant_location::Variant::External( + // proto::participant_location::External {}, + // )), + // }), + // }); + // active_call.connection_id = Some(connection_id); + + // Ok((room, recipient_connection_ids)) + } + pub async fn update_room_participant_location( &self, room_id: RoomId, diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 64affdb825..c7c222ee1c 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -607,42 +607,51 @@ impl Server { request: Message, response: Response, ) -> Result<()> { + let room = self + .app_state + .db + .join_room( + RoomId::from_proto(request.payload.id), + request.sender_user_id, + request.sender_connection_id, + ) + .await?; + for recipient_id in self + .store() + .await + .connection_ids_for_user(request.sender_user_id) { - let mut store = self.store().await; - let (room, recipient_connection_ids) = - store.join_room(request.payload.id, request.sender_connection_id)?; - for recipient_id in recipient_connection_ids { - self.peer - .send(recipient_id, proto::CallCanceled {}) - .trace_err(); - } + self.peer + .send(recipient_id, proto::CallCanceled {}) + .trace_err(); + } - let live_kit_connection_info = - if let Some(live_kit) = self.app_state.live_kit_client.as_ref() { - if let Some(token) = live_kit - .room_token( - &room.live_kit_room, - &request.sender_connection_id.to_string(), - ) - .trace_err() - { - Some(proto::LiveKitConnectionInfo { - server_url: live_kit.url().into(), - token, - }) - } else { - None - } + let live_kit_connection_info = + if let Some(live_kit) = self.app_state.live_kit_client.as_ref() { + if let Some(token) = live_kit + .room_token( + &room.live_kit_room, + &request.sender_connection_id.to_string(), + ) + .trace_err() + { + Some(proto::LiveKitConnectionInfo { + server_url: live_kit.url().into(), + token, + }) } else { None - }; + } + } else { + None + }; + + self.room_updated(&room); + response.send(proto::JoinRoomResponse { + room: Some(room), + live_kit_connection_info, + })?; - response.send(proto::JoinRoomResponse { - room: Some(room.clone()), - live_kit_connection_info, - })?; - self.room_updated(room); - } self.update_user_contacts(request.sender_user_id).await?; Ok(()) } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index f16910fac5..dfd534dbe9 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -257,57 +257,6 @@ impl Store { } } - pub fn join_room( - &mut self, - room_id: RoomId, - connection_id: ConnectionId, - ) -> Result<(&proto::Room, Vec)> { - let connection = self - .connections - .get_mut(&connection_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let user_id = connection.user_id; - let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::>(); - - let connected_user = self - .connected_users - .get_mut(&user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let active_call = connected_user - .active_call - .as_mut() - .ok_or_else(|| anyhow!("not being called"))?; - anyhow::ensure!( - active_call.room_id == room_id && active_call.connection_id.is_none(), - "not being called on this room" - ); - - let room = self - .rooms - .get_mut(&room_id) - .ok_or_else(|| anyhow!("no such room"))?; - anyhow::ensure!( - room.pending_participant_user_ids - .contains(&user_id.to_proto()), - anyhow!("no such room") - ); - room.pending_participant_user_ids - .retain(|pending| *pending != user_id.to_proto()); - room.participants.push(proto::Participant { - user_id: user_id.to_proto(), - peer_id: connection_id.0, - projects: Default::default(), - location: Some(proto::ParticipantLocation { - variant: Some(proto::participant_location::Variant::External( - proto::participant_location::External {}, - )), - }), - }); - active_call.connection_id = Some(connection_id); - - Ok((room, recipient_connection_ids)) - } - pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result { let connection = self .connections