Leave room on quit

Co-Authored-By: Max Brunsfeld <max@zed.dev>
This commit is contained in:
Antonio Scandurra 2023-03-13 17:52:10 +01:00
parent f6b0c56a47
commit bca1acf6d3
10 changed files with 104 additions and 44 deletions

View file

@ -264,12 +264,13 @@ impl ActiveCall {
Ok(()) Ok(())
} }
pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Result<()> { pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if let Some((room, _)) = self.room.take() {
room.update(cx, |room, cx| room.leave(cx))?;
cx.notify(); cx.notify();
if let Some((room, _)) = self.room.take() {
room.update(cx, |room, cx| room.leave(cx))
} else {
Task::ready(Ok(()))
} }
Ok(())
} }
pub fn share_project( pub fn share_project(

View file

@ -17,7 +17,7 @@ use language::LanguageRegistry;
use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate}; use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
use postage::stream::Stream; use postage::stream::Stream;
use project::Project; use project::Project;
use std::{mem, sync::Arc, time::Duration}; use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
use util::{post_inc, ResultExt, TryFutureExt}; use util::{post_inc, ResultExt, TryFutureExt};
pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
@ -64,10 +64,27 @@ pub struct Room {
impl Entity for Room { impl Entity for Room {
type Event = Event; type Event = Event;
fn release(&mut self, _: &mut MutableAppContext) { fn release(&mut self, cx: &mut MutableAppContext) {
if self.status.is_online() { if self.status.is_online() {
log::info!("room was released, sending leave message"); self.leave_internal(cx).detach_and_log_err(cx);
let _ = self.client.send(proto::LeaveRoom {}); }
}
fn app_will_quit(
&mut self,
cx: &mut MutableAppContext,
) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
if self.status.is_online() {
let leave = self.leave_internal(cx);
Some(
cx.background()
.spawn(async move {
leave.await.log_err();
})
.boxed(),
)
} else {
None
} }
} }
} }
@ -234,13 +251,17 @@ impl Room {
&& self.pending_call_count == 0 && self.pending_call_count == 0
} }
pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> { pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if self.status.is_offline() {
return Err(anyhow!("room is offline"));
}
cx.notify(); cx.notify();
cx.emit(Event::Left); cx.emit(Event::Left);
self.leave_internal(cx)
}
fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task<Result<()>> {
if self.status.is_offline() {
return Task::ready(Err(anyhow!("room is offline")));
}
log::info!("leaving room"); log::info!("leaving room");
for project in self.shared_projects.drain() { for project in self.shared_projects.drain() {
@ -266,8 +287,12 @@ impl Room {
self.live_kit.take(); self.live_kit.take();
self.pending_room_update.take(); self.pending_room_update.take();
self.maintain_connection.take(); self.maintain_connection.take();
self.client.send(proto::LeaveRoom {})?;
Ok(()) let leave_room = self.client.request(proto::LeaveRoom {});
cx.background().spawn(async move {
leave_room.await?;
anyhow::Ok(())
})
} }
async fn maintain_connection( async fn maintain_connection(
@ -758,10 +783,10 @@ impl Room {
this.update(&mut cx, |this, cx| { this.update(&mut cx, |this, cx| {
this.pending_call_count -= 1; this.pending_call_count -= 1;
if this.should_leave() { if this.should_leave() {
this.leave(cx)?; this.leave(cx).detach_and_log_err(cx);
} }
result });
})?; result?;
Ok(()) Ok(())
}) })
} }

View file

@ -186,7 +186,7 @@ impl Server {
.add_request_handler(create_room) .add_request_handler(create_room)
.add_request_handler(join_room) .add_request_handler(join_room)
.add_request_handler(rejoin_room) .add_request_handler(rejoin_room)
.add_message_handler(leave_room) .add_request_handler(leave_room)
.add_request_handler(call) .add_request_handler(call)
.add_request_handler(cancel_call) .add_request_handler(cancel_call)
.add_message_handler(decline_call) .add_message_handler(decline_call)
@ -1102,8 +1102,14 @@ async fn rejoin_room(
Ok(()) Ok(())
} }
async fn leave_room(_message: proto::LeaveRoom, session: Session) -> Result<()> { async fn leave_room(
leave_room_for_session(&session).await _: proto::LeaveRoom,
response: Response<proto::LeaveRoom>,
session: Session,
) -> Result<()> {
leave_room_for_session(&session).await?;
response.send(proto::Ack {})?;
Ok(())
} }
async fn call( async fn call(

View file

@ -274,10 +274,14 @@ async fn test_basic_calls(
} }
// User A leaves the room. // User A leaves the room.
active_call_a.update(cx_a, |call, cx| { active_call_a
call.hang_up(cx).unwrap(); .update(cx_a, |call, cx| {
let hang_up = call.hang_up(cx);
assert!(call.room().is_none()); assert!(call.room().is_none());
}); hang_up
})
.await
.unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
assert_eq!( assert_eq!(
room_participants(&room_a, cx_a), room_participants(&room_a, cx_a),
@ -557,6 +561,7 @@ async fn test_room_uniqueness(
// Client C can successfully call client B after client B leaves the room. // Client C can successfully call client B after client B leaves the room.
active_call_b active_call_b
.update(cx_b, |call, cx| call.hang_up(cx)) .update(cx_b, |call, cx| call.hang_up(cx))
.await
.unwrap(); .unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
active_call_c active_call_c
@ -936,6 +941,7 @@ async fn test_server_restarts(
// User D hangs up. // User D hangs up.
active_call_d active_call_d
.update(cx_d, |call, cx| call.hang_up(cx)) .update(cx_d, |call, cx| call.hang_up(cx))
.await
.unwrap(); .unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
assert_eq!( assert_eq!(
@ -1099,7 +1105,10 @@ async fn test_calls_on_multiple_connections(
.unwrap(); .unwrap();
// User B hangs up, and user A calls them again. // User B hangs up, and user A calls them again.
active_call_b2.update(cx_b2, |call, cx| call.hang_up(cx).unwrap()); active_call_b2
.update(cx_b2, |call, cx| call.hang_up(cx))
.await
.unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
active_call_a active_call_a
.update(cx_a, |call, cx| { .update(cx_a, |call, cx| {
@ -1134,7 +1143,10 @@ async fn test_calls_on_multiple_connections(
assert!(incoming_call_b2.next().await.unwrap().is_some()); assert!(incoming_call_b2.next().await.unwrap().is_some());
// User A hangs up, causing both connections to stop ringing. // User A hangs up, causing both connections to stop ringing.
active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); active_call_a
.update(cx_a, |call, cx| call.hang_up(cx))
.await
.unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b1.next().await.unwrap().is_none());
assert!(incoming_call_b2.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none());
@ -1371,7 +1383,10 @@ async fn test_unshare_project(
.unwrap(); .unwrap();
// When client B leaves the room, the project becomes read-only. // When client B leaves the room, the project becomes read-only.
active_call_b.update(cx_b, |call, cx| call.hang_up(cx).unwrap()); active_call_b
.update(cx_b, |call, cx| call.hang_up(cx))
.await
.unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
assert!(project_b.read_with(cx_b, |project, _| project.is_read_only())); assert!(project_b.read_with(cx_b, |project, _| project.is_read_only()));
@ -1400,7 +1415,10 @@ async fn test_unshare_project(
.unwrap(); .unwrap();
// When client A (the host) leaves the room, the project gets unshared and guests are notified. // When client A (the host) leaves the room, the project gets unshared and guests are notified.
active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); active_call_a
.update(cx_a, |call, cx| call.hang_up(cx))
.await
.unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
project_c2.read_with(cx_c, |project, _| { project_c2.read_with(cx_c, |project, _| {
@ -5455,7 +5473,10 @@ async fn test_contacts(
[("user_b".to_string(), "online", "busy")] [("user_b".to_string(), "online", "busy")]
); );
active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); active_call_a
.update(cx_a, |call, cx| call.hang_up(cx))
.await
.unwrap();
deterministic.run_until_parked(); deterministic.run_until_parked();
assert_eq!( assert_eq!(
contacts(&client_a, cx_a), contacts(&client_a, cx_a),

View file

@ -641,7 +641,7 @@ async fn randomly_mutate_active_call(
if can_hang_up && active_call.read_with(cx, |call, _| call.room().is_some()) => if can_hang_up && active_call.read_with(cx, |call, _| call.room().is_some()) =>
{ {
log::info!("{}: hanging up", client.username); log::info!("{}: hanging up", client.username);
active_call.update(cx, |call, cx| call.hang_up(cx))?; active_call.update(cx, |call, cx| call.hang_up(cx)).await?;
} }
_ => {} _ => {}
} }

View file

@ -342,7 +342,7 @@ impl CollabTitlebarItem {
fn leave_call(&mut self, _: &LeaveCall, cx: &mut ViewContext<Self>) { fn leave_call(&mut self, _: &LeaveCall, cx: &mut ViewContext<Self>) {
ActiveCall::global(cx) ActiveCall::global(cx)
.update(cx, |call, cx| call.hang_up(cx)) .update(cx, |call, cx| call.hang_up(cx))
.log_err(); .detach_and_log_err(cx);
} }
fn render_toggle_contacts_button( fn render_toggle_contacts_button(

View file

@ -588,17 +588,20 @@ impl MutableAppContext {
pub fn quit(&mut self) { pub fn quit(&mut self) {
let mut futures = Vec::new(); let mut futures = Vec::new();
for model_id in self.cx.models.keys().copied().collect::<Vec<_>>() {
let mut model = self.cx.models.remove(&model_id).unwrap(); self.update(|cx| {
futures.extend(model.app_will_quit(self)); for model_id in cx.models.keys().copied().collect::<Vec<_>>() {
self.cx.models.insert(model_id, model); let mut model = cx.cx.models.remove(&model_id).unwrap();
futures.extend(model.app_will_quit(cx));
cx.cx.models.insert(model_id, model);
} }
for view_id in self.cx.views.keys().copied().collect::<Vec<_>>() { for view_id in cx.views.keys().copied().collect::<Vec<_>>() {
let mut view = self.cx.views.remove(&view_id).unwrap(); let mut view = cx.cx.views.remove(&view_id).unwrap();
futures.extend(view.app_will_quit(self)); futures.extend(view.app_will_quit(cx));
self.cx.views.insert(view_id, view); cx.cx.views.insert(view_id, view);
} }
});
self.remove_all_windows(); self.remove_all_windows();

View file

@ -269,6 +269,7 @@ request_messages!(
(JoinChannel, JoinChannelResponse), (JoinChannel, JoinChannelResponse),
(JoinProject, JoinProjectResponse), (JoinProject, JoinProjectResponse),
(JoinRoom, JoinRoomResponse), (JoinRoom, JoinRoomResponse),
(LeaveRoom, Ack),
(RejoinRoom, RejoinRoomResponse), (RejoinRoom, RejoinRoomResponse),
(IncomingCall, Ack), (IncomingCall, Ack),
(OpenBufferById, OpenBufferResponse), (OpenBufferById, OpenBufferResponse),

View file

@ -6,4 +6,4 @@ pub use conn::Connection;
pub use peer::*; pub use peer::*;
mod macros; mod macros;
pub const PROTOCOL_VERSION: u32 = 49; pub const PROTOCOL_VERSION: u32 = 50;

View file

@ -1064,7 +1064,10 @@ impl Workspace {
if answer == Some(1) { if answer == Some(1) {
return anyhow::Ok(false); return anyhow::Ok(false);
} else { } else {
active_call.update(&mut cx, |call, cx| call.hang_up(cx))?; active_call
.update(&mut cx, |call, cx| call.hang_up(cx))
.await
.log_err();
} }
} }
} }