diff --git a/Cargo.lock b/Cargo.lock index 08e183810d..4738e69852 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4459,6 +4459,7 @@ dependencies = [ "anyhow", "client", "collections", + "futures", "gpui", "project", ] diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 30513172ad..7834c3da7f 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -86,7 +86,7 @@ async fn test_basic_calls( .await .unwrap(); assert_eq!( - participants(&room_a, &client_a, cx_a).await, + room_participants(&room_a, &client_a, cx_a).await, RoomParticipants { remote: Default::default(), pending: Default::default() @@ -104,7 +104,7 @@ async fn test_basic_calls( deterministic.run_until_parked(); assert_eq!( - participants(&room_a, &client_a, cx_a).await, + room_participants(&room_a, &client_a, cx_a).await, RoomParticipants { remote: Default::default(), pending: vec!["user_b".to_string()] @@ -121,14 +121,14 @@ async fn test_basic_calls( deterministic.run_until_parked(); assert_eq!( - participants(&room_a, &client_a, cx_a).await, + room_participants(&room_a, &client_a, cx_a).await, RoomParticipants { remote: vec!["user_b".to_string()], pending: Default::default() } ); assert_eq!( - participants(&room_b, &client_b, cx_b).await, + room_participants(&room_b, &client_b, cx_b).await, RoomParticipants { remote: vec!["user_a".to_string()], pending: Default::default() @@ -146,14 +146,14 @@ async fn test_basic_calls( deterministic.run_until_parked(); assert_eq!( - participants(&room_a, &client_a, cx_a).await, + room_participants(&room_a, &client_a, cx_a).await, RoomParticipants { remote: vec!["user_b".to_string()], pending: vec!["user_c".to_string()] } ); assert_eq!( - participants(&room_b, &client_b, cx_b).await, + room_participants(&room_b, &client_b, cx_b).await, RoomParticipants { remote: vec!["user_a".to_string()], pending: vec!["user_c".to_string()] @@ -170,14 +170,14 @@ async fn test_basic_calls( deterministic.run_until_parked(); assert_eq!( - participants(&room_a, &client_a, cx_a).await, + room_participants(&room_a, &client_a, cx_a).await, RoomParticipants { remote: vec!["user_b".to_string()], pending: Default::default() } ); assert_eq!( - participants(&room_b, &client_b, cx_b).await, + room_participants(&room_b, &client_b, cx_b).await, RoomParticipants { remote: vec!["user_a".to_string()], pending: Default::default() @@ -185,61 +185,90 @@ async fn test_basic_calls( ); // User A leaves the room. - cx_a.update(|_| drop(room_a)); + room_a.update(cx_a, |room, cx| room.leave(cx)).unwrap(); deterministic.run_until_parked(); assert_eq!( - participants(&room_b, &client_b, cx_b).await, + room_participants(&room_a, &client_a, cx_a).await, RoomParticipants { remote: Default::default(), pending: Default::default() } ); - - #[derive(Debug, Eq, PartialEq)] - struct RoomParticipants { - remote: Vec, - pending: Vec, - } - - async fn participants( - room: &ModelHandle, - client: &TestClient, - cx: &mut TestAppContext, - ) -> RoomParticipants { - let remote_users = room.update(cx, |room, cx| { - room.remote_participants() - .values() - .map(|participant| { - client - .user_store - .update(cx, |users, cx| users.get_user(participant.user_id, cx)) - }) - .collect::>() - }); - let remote_users = futures::future::try_join_all(remote_users).await.unwrap(); - let pending_users = room.update(cx, |room, cx| { - room.pending_user_ids() - .iter() - .map(|user_id| { - client - .user_store - .update(cx, |users, cx| users.get_user(*user_id, cx)) - }) - .collect::>() - }); - let pending_users = futures::future::try_join_all(pending_users).await.unwrap(); - + assert_eq!( + room_participants(&room_b, &client_b, cx_b).await, RoomParticipants { - remote: remote_users - .into_iter() - .map(|user| user.github_login.clone()) - .collect(), - pending: pending_users - .into_iter() - .map(|user| user.github_login.clone()) - .collect(), + remote: Default::default(), + pending: Default::default() } - } + ); +} + +#[gpui::test(iterations = 10)] +async fn test_leaving_room_on_disconnection( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + server + .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)]) + .await; + + let room_a = cx_a + .update(|cx| Room::create(client_a.clone(), cx)) + .await + .unwrap(); + + // Call user B from client A. + let mut incoming_call_b = client_b + .user_store + .update(cx_b, |user, _| user.incoming_call()); + room_a + .update(cx_a, |room, cx| room.call(client_b.user_id().unwrap(), cx)) + .await + .unwrap(); + + // User B receives the call and joins the room. + let call_b = incoming_call_b.next().await.unwrap().unwrap(); + let room_b = cx_b + .update(|cx| Room::join(&call_b, client_b.clone(), cx)) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, &client_a, cx_a).await, + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, &client_b, cx_b).await, + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: Default::default() + } + ); + + server.disconnect_client(client_a.current_user_id(cx_a)); + cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); + assert_eq!( + room_participants(&room_a, &client_a, cx_a).await, + RoomParticipants { + remote: Default::default(), + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, &client_b, cx_b).await, + RoomParticipants { + remote: Default::default(), + pending: Default::default() + } + ); } #[gpui::test(iterations = 10)] @@ -6169,3 +6198,49 @@ fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> { }) .collect() } + +#[derive(Debug, Eq, PartialEq)] +struct RoomParticipants { + remote: Vec, + pending: Vec, +} + +async fn room_participants( + room: &ModelHandle, + client: &TestClient, + cx: &mut TestAppContext, +) -> RoomParticipants { + let remote_users = room.update(cx, |room, cx| { + room.remote_participants() + .values() + .map(|participant| { + client + .user_store + .update(cx, |users, cx| users.get_user(participant.user_id, cx)) + }) + .collect::>() + }); + let remote_users = futures::future::try_join_all(remote_users).await.unwrap(); + let pending_users = room.update(cx, |room, cx| { + room.pending_user_ids() + .iter() + .map(|user_id| { + client + .user_store + .update(cx, |users, cx| users.get_user(*user_id, cx)) + }) + .collect::>() + }); + let pending_users = futures::future::try_join_all(pending_users).await.unwrap(); + + RoomParticipants { + remote: remote_users + .into_iter() + .map(|user| user.github_login.clone()) + .collect(), + pending: pending_users + .into_iter() + .map(|user| user.github_login.clone()) + .collect(), + } +} diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 4b366387e4..04eaad4edb 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -528,6 +528,13 @@ impl Server { } } + if let Some(room) = removed_connection + .room_id + .and_then(|room_id| store.room(room_id)) + { + self.room_updated(room); + } + removed_user_id = removed_connection.user_id; }; diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index ba4e644f5c..f55da1763b 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -13,7 +13,7 @@ pub type RoomId = u64; #[derive(Default, Serialize)] pub struct Store { connections: BTreeMap, - connections_by_user_id: BTreeMap, + connected_users: BTreeMap, next_room_id: RoomId, rooms: BTreeMap, projects: BTreeMap, @@ -22,9 +22,9 @@ pub struct Store { } #[derive(Default, Serialize)] -struct UserConnectionState { +struct ConnectedUser { connection_ids: HashSet, - room: Option, + active_call: Option, } #[derive(Serialize)] @@ -37,9 +37,9 @@ struct ConnectionState { } #[derive(Copy, Clone, Eq, PartialEq, Serialize)] -enum RoomState { - Joined { room_id: RoomId }, - Calling { room_id: RoomId }, +struct CallState { + room_id: RoomId, + joined: bool, } #[derive(Serialize)] @@ -89,6 +89,7 @@ pub struct RemovedConnectionState { pub hosted_projects: HashMap, pub guest_project_ids: HashSet, pub contact_ids: HashSet, + pub room_id: Option, } pub struct LeftProject { @@ -156,7 +157,7 @@ impl Store { channels: Default::default(), }, ); - self.connections_by_user_id + self.connected_users .entry(user_id) .or_default() .connection_ids @@ -196,10 +197,32 @@ impl Store { } } - let user_connection_state = self.connections_by_user_id.get_mut(&user_id).unwrap(); - user_connection_state.connection_ids.remove(&connection_id); - if user_connection_state.connection_ids.is_empty() { - self.connections_by_user_id.remove(&user_id); + let connected_user = self.connected_users.get_mut(&user_id).unwrap(); + connected_user.connection_ids.remove(&connection_id); + if let Some(active_call) = connected_user.active_call.as_ref() { + if let Some(room) = self.rooms.get_mut(&active_call.room_id) { + let prev_participant_count = room.participants.len(); + room.participants + .retain(|participant| participant.peer_id != connection_id.0); + if prev_participant_count == room.participants.len() { + if connected_user.connection_ids.is_empty() { + room.pending_user_ids + .retain(|pending_user_id| *pending_user_id != user_id.to_proto()); + result.room_id = Some(active_call.room_id); + connected_user.active_call = None; + } + } else { + result.room_id = Some(active_call.room_id); + connected_user.active_call = None; + } + } else { + tracing::error!("disconnected user claims to be in a room that does not exist"); + connected_user.active_call = None; + } + } + + if connected_user.connection_ids.is_empty() { + self.connected_users.remove(&user_id); } self.connections.remove(&connection_id).unwrap(); @@ -247,7 +270,7 @@ impl Store { &self, user_id: UserId, ) -> impl Iterator + '_ { - self.connections_by_user_id + self.connected_users .get(&user_id) .into_iter() .map(|state| &state.connection_ids) @@ -257,7 +280,7 @@ impl Store { pub fn is_user_online(&self, user_id: UserId) -> bool { !self - .connections_by_user_id + .connected_users .get(&user_id) .unwrap_or(&Default::default()) .connection_ids @@ -308,7 +331,7 @@ impl Store { } pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec { - let user_connection_state = self.connections_by_user_id.get(&user_id); + let user_connection_state = self.connected_users.get(&user_id); let project_ids = user_connection_state.iter().flat_map(|state| { state .connection_ids @@ -347,13 +370,13 @@ impl Store { .connections .get_mut(&creator_connection_id) .ok_or_else(|| anyhow!("no such connection"))?; - let user_connection_state = self - .connections_by_user_id + let connected_user = self + .connected_users .get_mut(&connection.user_id) .ok_or_else(|| anyhow!("no such connection"))?; anyhow::ensure!( - user_connection_state.room.is_none(), - "cannot participate in more than one room at once" + connected_user.active_call.is_none(), + "can't create a room with an active call" ); let mut room = proto::Room::default(); @@ -370,7 +393,10 @@ impl Store { let room_id = post_inc(&mut self.next_room_id); self.rooms.insert(room_id, room); - user_connection_state.room = Some(RoomState::Joined { room_id }); + connected_user.active_call = Some(CallState { + room_id, + joined: true, + }); Ok(room_id) } @@ -386,15 +412,15 @@ impl Store { let user_id = connection.user_id; let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::>(); - let mut user_connection_state = self - .connections_by_user_id + let mut connected_user = self + .connected_users .get_mut(&user_id) .ok_or_else(|| anyhow!("no such connection"))?; anyhow::ensure!( - user_connection_state - .room - .map_or(true, |room| room == RoomState::Calling { room_id }), - "cannot participate in more than one room at once" + connected_user + .active_call + .map_or(false, |call| call.room_id == room_id && !call.joined), + "not being called on this room" ); let room = self @@ -417,7 +443,10 @@ impl Store { )), }), }); - user_connection_state.room = Some(RoomState::Joined { room_id }); + connected_user.active_call = Some(CallState { + room_id, + joined: true, + }); Ok((room, recipient_connection_ids)) } @@ -433,14 +462,14 @@ impl Store { .ok_or_else(|| anyhow!("no such connection"))?; let user_id = connection.user_id; - let mut user_connection_state = self - .connections_by_user_id + let mut connected_user = self + .connected_users .get_mut(&user_id) .ok_or_else(|| anyhow!("no such connection"))?; anyhow::ensure!( - user_connection_state - .room - .map_or(false, |room| room == RoomState::Joined { room_id }), + connected_user + .active_call + .map_or(false, |call| call.room_id == room_id && call.joined), "cannot leave a room before joining it" ); @@ -450,26 +479,32 @@ impl Store { .ok_or_else(|| anyhow!("no such room"))?; room.participants .retain(|participant| participant.peer_id != connection_id.0); - user_connection_state.room = None; + connected_user.active_call = None; Ok(room) } + pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> { + self.rooms.get(&room_id) + } + pub fn call( &mut self, room_id: RoomId, from_connection_id: ConnectionId, - to_user_id: UserId, + recipient_id: UserId, ) -> Result<(UserId, Vec, &proto::Room)> { let from_user_id = self.user_id_for_connection(from_connection_id)?; - let to_connection_ids = self.connection_ids_for_user(to_user_id).collect::>(); - let mut to_user_connection_state = self - .connections_by_user_id - .get_mut(&to_user_id) + let recipient_connection_ids = self + .connection_ids_for_user(recipient_id) + .collect::>(); + let mut recipient = self + .connected_users + .get_mut(&recipient_id) .ok_or_else(|| anyhow!("no such connection"))?; anyhow::ensure!( - to_user_connection_state.room.is_none(), + recipient.active_call.is_none(), "recipient is already on another call" ); @@ -486,22 +521,27 @@ impl Store { anyhow::ensure!( room.pending_user_ids .iter() - .all(|user_id| UserId::from_proto(*user_id) != to_user_id), + .all(|user_id| UserId::from_proto(*user_id) != recipient_id), "cannot call the same user more than once" ); - room.pending_user_ids.push(to_user_id.to_proto()); - to_user_connection_state.room = Some(RoomState::Calling { room_id }); + room.pending_user_ids.push(recipient_id.to_proto()); + recipient.active_call = Some(CallState { + room_id, + joined: false, + }); - Ok((from_user_id, to_connection_ids, room)) + Ok((from_user_id, recipient_connection_ids, room)) } pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> { - let mut to_user_connection_state = self - .connections_by_user_id + let mut recipient = self + .connected_users .get_mut(&to_user_id) .ok_or_else(|| anyhow!("no such connection"))?; - anyhow::ensure!(to_user_connection_state.room == Some(RoomState::Calling { room_id })); - to_user_connection_state.room = None; + anyhow::ensure!(recipient + .active_call + .map_or(false, |call| call.room_id == room_id && !call.joined)); + recipient.active_call = None; let room = self .rooms .get_mut(&room_id) @@ -516,18 +556,17 @@ impl Store { recipient_connection_id: ConnectionId, ) -> Result<(&proto::Room, Vec)> { let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?; - let mut to_user_connection_state = self - .connections_by_user_id + let recipient = self + .connected_users .get_mut(&recipient_user_id) .ok_or_else(|| anyhow!("no such connection"))?; - if let Some(RoomState::Calling { room_id }) = to_user_connection_state.room { - to_user_connection_state.room = None; + if let Some(active_call) = recipient.active_call.take() { let recipient_connection_ids = self .connection_ids_for_user(recipient_user_id) .collect::>(); let room = self .rooms - .get_mut(&room_id) + .get_mut(&active_call.room_id) .ok_or_else(|| anyhow!("no such room"))?; room.pending_user_ids .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id); @@ -650,7 +689,7 @@ impl Store { for requester_user_id in project.join_requests.keys() { if let Some(requester_user_connection_state) = - self.connections_by_user_id.get_mut(requester_user_id) + self.connected_users.get_mut(requester_user_id) { for requester_connection_id in &requester_user_connection_state.connection_ids @@ -1007,14 +1046,14 @@ impl Store { assert!(channel.connection_ids.contains(connection_id)); } assert!(self - .connections_by_user_id + .connected_users .get(&connection.user_id) .unwrap() .connection_ids .contains(connection_id)); } - for (user_id, state) in &self.connections_by_user_id { + for (user_id, state) in &self.connected_users { for connection_id in &state.connection_ids { assert_eq!( self.connections.get(connection_id).unwrap().user_id, diff --git a/crates/room/Cargo.toml b/crates/room/Cargo.toml index f329d5ae87..33b6620b27 100644 --- a/crates/room/Cargo.toml +++ b/crates/room/Cargo.toml @@ -16,12 +16,14 @@ test-support = [ ] [dependencies] -anyhow = "1.0.38" client = { path = "../client" } collections = { path = "../collections" } gpui = { path = "../gpui" } project = { path = "../project" } +anyhow = "1.0.38" +futures = "0.3" + [dev-dependencies] client = { path = "../client", features = ["test-support"] } collections = { path = "../collections", features = ["test-support"] } diff --git a/crates/room/src/room.rs b/crates/room/src/room.rs index 2a9318f1d7..8d80b47508 100644 --- a/crates/room/src/room.rs +++ b/crates/room/src/room.rs @@ -3,6 +3,7 @@ mod participant; use anyhow::{anyhow, Result}; use client::{call::Call, proto, Client, PeerId, TypedEnvelope}; use collections::HashMap; +use futures::StreamExt; use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task}; use participant::{LocalParticipant, ParticipantLocation, RemoteParticipant}; use project::Project; @@ -12,13 +13,9 @@ pub enum Event { PeerChangedActiveProject, } -pub enum CallResponse { - Accepted, - Rejected, -} - pub struct Room { id: u64, + status: RoomStatus, local_participant: LocalParticipant, remote_participants: HashMap, pending_user_ids: Vec, @@ -32,8 +29,24 @@ impl Entity for Room { impl Room { fn new(id: u64, client: Arc, cx: &mut ModelContext) -> Self { + let mut client_status = client.status(); + cx.spawn_weak(|this, mut cx| async move { + let is_connected = client_status + .next() + .await + .map_or(false, |s| s.is_connected()); + // Even if we're initially connected, any future change of the status means we momentarily disconnected. + if !is_connected || client_status.next().await.is_some() { + if let Some(this) = this.upgrade(&cx) { + let _ = this.update(&mut cx, |this, cx| this.leave(cx)); + } + } + }) + .detach(); + Self { id, + status: RoomStatus::Online, local_participant: LocalParticipant { projects: Default::default(), }, @@ -69,6 +82,18 @@ impl Room { }) } + pub fn leave(&mut self, cx: &mut ModelContext) -> Result<()> { + if self.status.is_offline() { + return Err(anyhow!("room is offline")); + } + + self.status = RoomStatus::Offline; + self.remote_participants.clear(); + self.client.send(proto::LeaveRoom { id: self.id })?; + cx.notify(); + Ok(()) + } + pub fn remote_participants(&self) -> &HashMap { &self.remote_participants } @@ -112,6 +137,10 @@ impl Room { } pub fn call(&mut self, to_user_id: u64, cx: &mut ModelContext) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + let client = self.client.clone(); let room_id = self.id; cx.foreground().spawn(async move { @@ -125,32 +154,58 @@ impl Room { }) } - pub async fn publish_project(&mut self, project: ModelHandle) -> Result<()> { + pub fn publish_project(&mut self, project: ModelHandle) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + todo!() } - pub async fn unpublish_project(&mut self, project: ModelHandle) -> Result<()> { + pub fn unpublish_project(&mut self, project: ModelHandle) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + todo!() } - pub async fn set_active_project( + pub fn set_active_project( &mut self, project: Option<&ModelHandle>, - ) -> Result<()> { + ) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + todo!() } - pub async fn mute(&mut self) -> Result<()> { + pub fn mute(&mut self) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + todo!() } - pub async fn unmute(&mut self) -> Result<()> { + pub fn unmute(&mut self) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + todo!() } } -impl Drop for Room { - fn drop(&mut self) { - let _ = self.client.send(proto::LeaveRoom { id: self.id }); +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum RoomStatus { + Online, + Offline, +} + +impl RoomStatus { + fn is_offline(&self) -> bool { + matches!(self, RoomStatus::Offline) } }