diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index dfe4f39e0e..f9edddf374 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -264,12 +264,13 @@ impl ActiveCall { Ok(()) } - pub fn hang_up(&mut self, cx: &mut ModelContext) -> Result<()> { + pub fn hang_up(&mut self, cx: &mut ModelContext) -> Task> { + cx.notify(); if let Some((room, _)) = self.room.take() { - room.update(cx, |room, cx| room.leave(cx))?; - cx.notify(); + room.update(cx, |room, cx| room.leave(cx)) + } else { + Task::ready(Ok(())) } - Ok(()) } pub fn share_project( diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 901ae08fc0..257ef52c19 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -17,7 +17,7 @@ use language::LanguageRegistry; use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate}; use postage::stream::Stream; use project::Project; -use std::{mem, sync::Arc, time::Duration}; +use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration}; use util::{post_inc, ResultExt, TryFutureExt}; pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); @@ -64,10 +64,27 @@ pub struct Room { impl Entity for Room { type Event = Event; - fn release(&mut self, _: &mut MutableAppContext) { + fn release(&mut self, cx: &mut MutableAppContext) { if self.status.is_online() { - log::info!("room was released, sending leave message"); - let _ = self.client.send(proto::LeaveRoom {}); + self.leave_internal(cx).detach_and_log_err(cx); + } + } + + fn app_will_quit( + &mut self, + cx: &mut MutableAppContext, + ) -> Option>>> { + if self.status.is_online() { + let leave = self.leave_internal(cx); + Some( + cx.background() + .spawn(async move { + leave.await.log_err(); + }) + .boxed(), + ) + } else { + None } } } @@ -234,13 +251,17 @@ impl Room { && self.pending_call_count == 0 } - pub(crate) fn leave(&mut self, cx: &mut ModelContext) -> Result<()> { - if self.status.is_offline() { - return Err(anyhow!("room is offline")); - } - + pub(crate) fn leave(&mut self, cx: &mut ModelContext) -> Task> { cx.notify(); cx.emit(Event::Left); + self.leave_internal(cx) + } + + fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task> { + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } + log::info!("leaving room"); for project in self.shared_projects.drain() { @@ -266,8 +287,12 @@ impl Room { self.live_kit.take(); self.pending_room_update.take(); self.maintain_connection.take(); - self.client.send(proto::LeaveRoom {})?; - Ok(()) + + let leave_room = self.client.request(proto::LeaveRoom {}); + cx.background().spawn(async move { + leave_room.await?; + anyhow::Ok(()) + }) } async fn maintain_connection( @@ -758,10 +783,10 @@ impl Room { this.update(&mut cx, |this, cx| { this.pending_call_count -= 1; if this.should_leave() { - this.leave(cx)?; + this.leave(cx).detach_and_log_err(cx); } - result - })?; + }); + result?; Ok(()) }) } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 1deaefec1a..52b5e80413 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -186,7 +186,7 @@ impl Server { .add_request_handler(create_room) .add_request_handler(join_room) .add_request_handler(rejoin_room) - .add_message_handler(leave_room) + .add_request_handler(leave_room) .add_request_handler(call) .add_request_handler(cancel_call) .add_message_handler(decline_call) @@ -1102,8 +1102,14 @@ async fn rejoin_room( Ok(()) } -async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> { - leave_room_for_session(&session).await +async fn leave_room( + _: proto::LeaveRoom, + response: Response, + session: Session, +) -> Result<()> { + leave_room_for_session(&session).await?; + response.send(proto::Ack {})?; + Ok(()) } async fn call( diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 44a2839f27..c48f6b1604 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -274,10 +274,14 @@ async fn test_basic_calls( } // User A leaves the room. - active_call_a.update(cx_a, |call, cx| { - call.hang_up(cx).unwrap(); - assert!(call.room().is_none()); - }); + active_call_a + .update(cx_a, |call, cx| { + let hang_up = call.hang_up(cx); + assert!(call.room().is_none()); + hang_up + }) + .await + .unwrap(); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), @@ -557,6 +561,7 @@ async fn test_room_uniqueness( // Client C can successfully call client B after client B leaves the room. active_call_b .update(cx_b, |call, cx| call.hang_up(cx)) + .await .unwrap(); deterministic.run_until_parked(); active_call_c @@ -936,6 +941,7 @@ async fn test_server_restarts( // User D hangs up. active_call_d .update(cx_d, |call, cx| call.hang_up(cx)) + .await .unwrap(); deterministic.run_until_parked(); assert_eq!( @@ -1099,7 +1105,10 @@ async fn test_calls_on_multiple_connections( .unwrap(); // User B hangs up, and user A calls them again. - active_call_b2.update(cx_b2, |call, cx| call.hang_up(cx).unwrap()); + active_call_b2 + .update(cx_b2, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); active_call_a .update(cx_a, |call, cx| { @@ -1134,7 +1143,10 @@ async fn test_calls_on_multiple_connections( assert!(incoming_call_b2.next().await.unwrap().is_some()); // User A hangs up, causing both connections to stop ringing. - active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); + active_call_a + .update(cx_a, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); @@ -1371,7 +1383,10 @@ async fn test_unshare_project( .unwrap(); // When client B leaves the room, the project becomes read-only. - active_call_b.update(cx_b, |call, cx| call.hang_up(cx).unwrap()); + active_call_b + .update(cx_b, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); assert!(project_b.read_with(cx_b, |project, _| project.is_read_only())); @@ -1400,7 +1415,10 @@ async fn test_unshare_project( .unwrap(); // When client A (the host) leaves the room, the project gets unshared and guests are notified. - active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); + active_call_a + .update(cx_a, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); project_c2.read_with(cx_c, |project, _| { @@ -5455,7 +5473,10 @@ async fn test_contacts( [("user_b".to_string(), "online", "busy")] ); - active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); + active_call_a + .update(cx_a, |call, cx| call.hang_up(cx)) + .await + .unwrap(); deterministic.run_until_parked(); assert_eq!( contacts(&client_a, cx_a), diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index 950f12d186..960f9bfc1d 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -641,7 +641,7 @@ async fn randomly_mutate_active_call( if can_hang_up && active_call.read_with(cx, |call, _| call.room().is_some()) => { log::info!("{}: hanging up", client.username); - active_call.update(cx, |call, cx| call.hang_up(cx))?; + active_call.update(cx, |call, cx| call.hang_up(cx)).await?; } _ => {} } diff --git a/crates/collab_ui/src/collab_titlebar_item.rs b/crates/collab_ui/src/collab_titlebar_item.rs index 28db4f6aa3..36128a907a 100644 --- a/crates/collab_ui/src/collab_titlebar_item.rs +++ b/crates/collab_ui/src/collab_titlebar_item.rs @@ -342,7 +342,7 @@ impl CollabTitlebarItem { fn leave_call(&mut self, _: &LeaveCall, cx: &mut ViewContext) { ActiveCall::global(cx) .update(cx, |call, cx| call.hang_up(cx)) - .log_err(); + .detach_and_log_err(cx); } fn render_toggle_contacts_button( diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 51208f3930..b4d88a7270 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -588,17 +588,20 @@ impl MutableAppContext { pub fn quit(&mut self) { let mut futures = Vec::new(); - for model_id in self.cx.models.keys().copied().collect::>() { - let mut model = self.cx.models.remove(&model_id).unwrap(); - futures.extend(model.app_will_quit(self)); - self.cx.models.insert(model_id, model); - } - for view_id in self.cx.views.keys().copied().collect::>() { - let mut view = self.cx.views.remove(&view_id).unwrap(); - futures.extend(view.app_will_quit(self)); - self.cx.views.insert(view_id, view); - } + self.update(|cx| { + for model_id in cx.models.keys().copied().collect::>() { + let mut model = cx.cx.models.remove(&model_id).unwrap(); + futures.extend(model.app_will_quit(cx)); + cx.cx.models.insert(model_id, model); + } + + for view_id in cx.views.keys().copied().collect::>() { + let mut view = cx.cx.views.remove(&view_id).unwrap(); + futures.extend(view.app_will_quit(cx)); + cx.cx.views.insert(view_id, view); + } + }); self.remove_all_windows(); diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 1a56abc783..823ffa7a19 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -269,6 +269,7 @@ request_messages!( (JoinChannel, JoinChannelResponse), (JoinProject, JoinProjectResponse), (JoinRoom, JoinRoomResponse), + (LeaveRoom, Ack), (RejoinRoom, RejoinRoomResponse), (IncomingCall, Ack), (OpenBufferById, OpenBufferResponse), diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index 4f5f126516..bec518b707 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 49; +pub const PROTOCOL_VERSION: u32 = 50; diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 43ae12b732..4a9cdea8a4 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -1064,7 +1064,10 @@ impl Workspace { if answer == Some(1) { return anyhow::Ok(false); } else { - active_call.update(&mut cx, |call, cx| call.hang_up(cx))?; + active_call + .update(&mut cx, |call, cx| call.hang_up(cx)) + .await + .log_err(); } } }