From 669406d5af5f29011939e5cdf31e7852296d523c Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 7 Oct 2022 11:56:50 +0200 Subject: [PATCH] Leave room when client is the only participant --- crates/call/src/call.rs | 37 ++++++++------- crates/call/src/room.rs | 66 ++++++++++++++++++++++---- crates/collab/src/integration_tests.rs | 2 + 3 files changed, 78 insertions(+), 27 deletions(-) diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 91fee3ae87..2cfb155d11 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -109,31 +109,32 @@ impl ActiveCall { initial_project: Option>, cx: &mut ModelContext, ) -> Task> { - let room = self.room.as_ref().map(|(room, _)| room.clone()); let client = self.client.clone(); let user_store = self.user_store.clone(); cx.spawn(|this, mut cx| async move { - let room = if let Some(room) = room { - room - } else { - cx.update(|cx| Room::create(client, user_store, cx)).await? - }; + if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) { + let initial_project_id = if let Some(initial_project) = initial_project { + Some( + room.update(&mut cx, |room, cx| room.share_project(initial_project, cx)) + .await?, + ) + } else { + None + }; - let initial_project_id = if let Some(initial_project) = initial_project { - Some( - room.update(&mut cx, |room, cx| room.share_project(initial_project, cx)) - .await?, - ) + room.update(&mut cx, |room, cx| { + room.call(recipient_user_id, initial_project_id, cx) + }) + .await?; } else { - None + let room = cx + .update(|cx| { + Room::create(recipient_user_id, initial_project, client, user_store, cx) + }) + .await?; + this.update(&mut cx, |this, cx| this.set_room(Some(room), cx)); }; - this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)); - room.update(&mut cx, |room, cx| { - room.call(recipient_user_id, initial_project_id, cx) - }) - .await?; - Ok(()) }) } diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index ba2c51b39e..29e3c04259 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -21,9 +21,10 @@ pub struct Room { status: RoomStatus, remote_participants: HashMap, pending_users: Vec>, + pending_call_count: usize, client: Arc, user_store: ModelHandle, - _subscriptions: Vec, + subscriptions: Vec, _pending_room_update: Option>, } @@ -62,7 +63,8 @@ impl Room { status: RoomStatus::Online, remote_participants: Default::default(), pending_users: Default::default(), - _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)], + pending_call_count: 0, + subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)], _pending_room_update: None, client, user_store, @@ -70,13 +72,40 @@ impl Room { } pub(crate) fn create( + recipient_user_id: u64, + initial_project: Option>, client: Arc, user_store: ModelHandle, cx: &mut MutableAppContext, ) -> Task>> { cx.spawn(|mut cx| async move { - let room = client.request(proto::CreateRoom {}).await?; - Ok(cx.add_model(|cx| Self::new(room.id, client, user_store, cx))) + let response = client.request(proto::CreateRoom {}).await?; + let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx)); + let initial_project_id = if let Some(initial_project) = initial_project { + let initial_project_id = room + .update(&mut cx, |room, cx| { + room.share_project(initial_project.clone(), cx) + }) + .await?; + initial_project + .update(&mut cx, |project, cx| { + project.shared(initial_project_id, cx) + }) + .await?; + Some(initial_project_id) + } else { + None + }; + + match room + .update(&mut cx, |room, cx| { + room.call(recipient_user_id, initial_project_id, cx) + }) + .await + { + Ok(()) => Ok(room), + Err(_) => Err(anyhow!("call failed")), + } }) } @@ -96,6 +125,12 @@ impl Room { }) } + fn should_leave(&self) -> bool { + self.pending_users.is_empty() + && self.remote_participants.is_empty() + && self.pending_call_count == 0 + } + pub(crate) fn leave(&mut self, cx: &mut ModelContext) -> Result<()> { if self.status.is_offline() { return Err(anyhow!("room is offline")); @@ -104,6 +139,7 @@ impl Room { cx.notify(); self.status = RoomStatus::Offline; self.remote_participants.clear(); + self.subscriptions.clear(); self.client.send(proto::LeaveRoom { id: self.id })?; Ok(()) } @@ -134,8 +170,7 @@ impl Room { .payload .room .ok_or_else(|| anyhow!("invalid room"))?; - this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?; - Ok(()) + this.update(&mut cx, |this, cx| this.apply_room_update(room, cx)) } fn apply_room_update( @@ -209,6 +244,10 @@ impl Room { this.pending_users = pending_users; cx.notify(); } + + if this.should_leave() { + let _ = this.leave(cx); + } }); })); @@ -226,16 +265,25 @@ impl Room { return Task::ready(Err(anyhow!("room is offline"))); } + cx.notify(); let client = self.client.clone(); let room_id = self.id; - cx.foreground().spawn(async move { - client + self.pending_call_count += 1; + cx.spawn(|this, mut cx| async move { + let result = client .request(proto::Call { room_id, recipient_user_id, initial_project_id, }) - .await?; + .await; + this.update(&mut cx, |this, cx| { + this.pending_call_count -= 1; + if this.should_leave() { + this.leave(cx)?; + } + result + })?; Ok(()) }) } diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 55f1267ba2..ba344d0aab 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -383,9 +383,11 @@ async fn test_leaving_room_on_disconnection( } ); + // When user A disconnects, both client A and B clear their room on the active call. server.disconnect_client(client_a.current_user_id(cx_a)); cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); + active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); assert_eq!( room_participants(&room_a, cx_a), RoomParticipants {