diff --git a/Cargo.lock b/Cargo.lock index 2e70000673..a971570e2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -693,6 +693,7 @@ dependencies = [ "collections", "futures", "gpui", + "postage", "project", "util", ] diff --git a/crates/call/Cargo.toml b/crates/call/Cargo.toml index cf5e7d6702..e725c7cfe3 100644 --- a/crates/call/Cargo.toml +++ b/crates/call/Cargo.toml @@ -25,6 +25,7 @@ util = { path = "../util" } anyhow = "1.0.38" futures = "0.3" +postage = { version = "0.4.1", features = ["futures-traits"] } [dev-dependencies] client = { path = "../client", features = ["test-support"] } diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 2f64115fb5..607931fdc4 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -2,22 +2,31 @@ mod participant; pub mod room; use anyhow::{anyhow, Result}; -use client::{incoming_call::IncomingCall, Client, UserStore}; -use gpui::{AppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Subscription, Task}; +use client::{incoming_call::IncomingCall, proto, Client, TypedEnvelope, UserStore}; +use gpui::{ + AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, + Subscription, Task, +}; pub use participant::ParticipantLocation; +use postage::watch; use project::Project; pub use room::Room; use std::sync::Arc; pub fn init(client: Arc, user_store: ModelHandle, cx: &mut MutableAppContext) { - let active_call = cx.add_model(|_| ActiveCall::new(client, user_store)); + let active_call = cx.add_model(|cx| ActiveCall::new(client, user_store, cx)); cx.set_global(active_call); } pub struct ActiveCall { room: Option<(ModelHandle, Vec)>, + incoming_call: ( + watch::Sender>, + watch::Receiver>, + ), client: Arc, user_store: ModelHandle, + _subscriptions: Vec, } impl Entity for ActiveCall { @@ -25,14 +34,63 @@ impl Entity for ActiveCall { } impl ActiveCall { - fn new(client: Arc, user_store: ModelHandle) -> Self { + fn new( + client: Arc, + user_store: ModelHandle, + cx: &mut ModelContext, + ) -> Self { Self { room: None, + incoming_call: watch::channel(), + _subscriptions: vec![ + client.add_request_handler(cx.handle(), Self::handle_incoming_call), + client.add_message_handler(cx.handle(), Self::handle_cancel_call), + ], client, user_store, } } + async fn handle_incoming_call( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { + let user_store = this.read_with(&cx, |this, _| this.user_store.clone()); + let call = IncomingCall { + room_id: envelope.payload.room_id, + participants: user_store + .update(&mut cx, |user_store, cx| { + user_store.get_users(envelope.payload.participant_user_ids, cx) + }) + .await?, + caller: user_store + .update(&mut cx, |user_store, cx| { + user_store.get_user(envelope.payload.caller_user_id, cx) + }) + .await?, + initial_project_id: envelope.payload.initial_project_id, + }; + this.update(&mut cx, |this, _| { + *this.incoming_call.0.borrow_mut() = Some(call); + }); + + Ok(proto::Ack {}) + } + + async fn handle_cancel_call( + this: ModelHandle, + _: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, _| { + *this.incoming_call.0.borrow_mut() = None; + }); + Ok(()) + } + pub fn global(cx: &AppContext) -> ModelHandle { cx.global::>().clone() } @@ -74,12 +132,22 @@ impl ActiveCall { }) } - pub fn join(&mut self, call: &IncomingCall, cx: &mut ModelContext) -> Task> { + pub fn incoming(&self) -> watch::Receiver> { + self.incoming_call.1.clone() + } + + pub fn accept_incoming(&mut self, cx: &mut ModelContext) -> Task> { if self.room.is_some() { return Task::ready(Err(anyhow!("cannot join while on another call"))); } - let join = Room::join(call, self.client.clone(), self.user_store.clone(), cx); + let call = if let Some(call) = self.incoming_call.1.borrow().clone() { + call + } else { + return Task::ready(Err(anyhow!("no incoming call"))); + }; + + let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx); cx.spawn(|this, mut cx| async move { let room = join.await?; this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)); @@ -87,6 +155,19 @@ impl ActiveCall { }) } + pub fn decline_incoming(&mut self) -> Result<()> { + *self.incoming_call.0.borrow_mut() = None; + self.client.send(proto::DeclineCall {})?; + Ok(()) + } + + pub fn hang_up(&mut self, cx: &mut ModelContext) -> Result<()> { + if let Some((room, _)) = self.room.take() { + room.update(cx, |room, cx| room.leave(cx))?; + } + Ok(()) + } + fn set_room(&mut self, room: Option>, cx: &mut ModelContext) { if room.as_ref() != self.room.as_ref().map(|room| &room.0) { if let Some(room) = room { diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 0237972167..52f283dd03 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -66,7 +66,7 @@ impl Room { } } - pub fn create( + pub(crate) fn create( client: Arc, user_store: ModelHandle, cx: &mut MutableAppContext, @@ -77,7 +77,7 @@ impl Room { }) } - pub fn join( + pub(crate) fn join( call: &IncomingCall, client: Arc, user_store: ModelHandle, @@ -93,7 +93,7 @@ impl Room { }) } - pub fn leave(&mut self, cx: &mut ModelContext) -> Result<()> { + pub(crate) fn leave(&mut self, cx: &mut ModelContext) -> Result<()> { if self.status.is_offline() { return Err(anyhow!("room is offline")); } @@ -213,7 +213,7 @@ impl Room { Ok(()) } - pub fn call( + pub(crate) fn call( &mut self, recipient_user_id: u64, initial_project_id: Option, diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 2d79b7be84..252fb4d455 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -1,5 +1,4 @@ use super::{http::HttpClient, proto, Client, Status, TypedEnvelope}; -use crate::incoming_call::IncomingCall; use anyhow::{anyhow, Context, Result}; use collections::{hash_map::Entry, HashMap, HashSet}; use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt}; @@ -59,10 +58,6 @@ pub struct UserStore { outgoing_contact_requests: Vec>, pending_contact_requests: HashMap, invite_info: Option, - incoming_call: ( - watch::Sender>, - watch::Receiver>, - ), client: Weak, http: Arc, _maintain_contacts: Task<()>, @@ -112,8 +107,6 @@ impl UserStore { client.add_message_handler(cx.handle(), Self::handle_update_contacts), client.add_message_handler(cx.handle(), Self::handle_update_invite_info), client.add_message_handler(cx.handle(), Self::handle_show_contacts), - client.add_request_handler(cx.handle(), Self::handle_incoming_call), - client.add_message_handler(cx.handle(), Self::handle_cancel_call), ]; Self { users: Default::default(), @@ -122,7 +115,6 @@ impl UserStore { incoming_contact_requests: Default::default(), outgoing_contact_requests: Default::default(), invite_info: None, - incoming_call: watch::channel(), client: Arc::downgrade(&client), update_contacts_tx, http, @@ -194,60 +186,10 @@ impl UserStore { Ok(()) } - async fn handle_incoming_call( - this: ModelHandle, - envelope: TypedEnvelope, - _: Arc, - mut cx: AsyncAppContext, - ) -> Result { - let call = IncomingCall { - room_id: envelope.payload.room_id, - participants: this - .update(&mut cx, |this, cx| { - this.get_users(envelope.payload.participant_user_ids, cx) - }) - .await?, - caller: this - .update(&mut cx, |this, cx| { - this.get_user(envelope.payload.caller_user_id, cx) - }) - .await?, - initial_project_id: envelope.payload.initial_project_id, - }; - this.update(&mut cx, |this, _| { - *this.incoming_call.0.borrow_mut() = Some(call); - }); - - Ok(proto::Ack {}) - } - - async fn handle_cancel_call( - this: ModelHandle, - _: TypedEnvelope, - _: Arc, - mut cx: AsyncAppContext, - ) -> Result<()> { - this.update(&mut cx, |this, _| { - *this.incoming_call.0.borrow_mut() = None; - }); - Ok(()) - } - pub fn invite_info(&self) -> Option<&InviteInfo> { self.invite_info.as_ref() } - pub fn incoming_call(&self) -> watch::Receiver> { - self.incoming_call.1.clone() - } - - pub fn decline_call(&mut self) -> Result<()> { - if let Some(client) = self.client.upgrade() { - client.send(proto::DeclineCall {})?; - } - Ok(()) - } - async fn handle_update_contacts( this: ModelHandle, message: TypedEnvelope, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 79d167d013..1adac8b28e 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -5,7 +5,7 @@ use crate::{ }; use ::rpc::Peer; use anyhow::anyhow; -use call::{room, ParticipantLocation, Room}; +use call::{room, ActiveCall, ParticipantLocation, Room}; use client::{ self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Connection, Credentials, EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT, @@ -78,29 +78,18 @@ async fn test_basic_calls( .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; - let room_a = cx_a - .update(|cx| Room::create(client_a.clone(), client_a.user_store.clone(), cx)) - .await - .unwrap(); - assert_eq!( - room_participants(&room_a, cx_a), - RoomParticipants { - remote: Default::default(), - pending: Default::default() - } - ); + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); // Call user B from client A. - let mut incoming_call_b = client_b - .user_store - .update(cx_b, |user, _| user.incoming_call()); - room_a - .update(cx_a, |room, cx| { - room.call(client_b.user_id().unwrap(), None, cx) + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) }) .await .unwrap(); - + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), @@ -111,21 +100,24 @@ async fn test_basic_calls( ); // User B receives the call. + let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); let call_b = incoming_call_b.next().await.unwrap().unwrap(); + assert_eq!(call_b.caller.github_login, "user_a"); // User B connects via another client and also receives a ring on the newly-connected client. - let client_b2 = server.create_client(cx_b2, "user_b").await; - let mut incoming_call_b2 = client_b2 - .user_store - .update(cx_b2, |user, _| user.incoming_call()); + let _client_b2 = server.create_client(cx_b2, "user_b").await; + let active_call_b2 = cx_b2.read(ActiveCall::global); + let mut incoming_call_b2 = active_call_b2.read_with(cx_b2, |call, _| call.incoming()); deterministic.run_until_parked(); - let _call_b2 = incoming_call_b2.next().await.unwrap().unwrap(); + let call_b2 = incoming_call_b2.next().await.unwrap().unwrap(); + assert_eq!(call_b2.caller.github_login, "user_a"); // User B joins the room using the first client. - let room_b = cx_b - .update(|cx| Room::join(&call_b, client_b.clone(), client_b.user_store.clone(), cx)) + active_call_b + .update(cx_b, |call, cx| call.accept_incoming(cx)) .await .unwrap(); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); assert!(incoming_call_b.next().await.unwrap().is_none()); deterministic.run_until_parked(); @@ -145,12 +137,10 @@ async fn test_basic_calls( ); // Call user C from client B. - let mut incoming_call_c = client_c - .user_store - .update(cx_c, |user, _| user.incoming_call()); - room_b - .update(cx_b, |room, cx| { - room.call(client_c.user_id().unwrap(), None, cx) + let mut incoming_call_c = active_call_c.read_with(cx_c, |call, _| call.incoming()); + active_call_b + .update(cx_b, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) }) .await .unwrap(); @@ -172,11 +162,9 @@ async fn test_basic_calls( ); // User C receives the call, but declines it. - let _call_c = incoming_call_c.next().await.unwrap().unwrap(); - client_c - .user_store - .update(cx_c, |user, _| user.decline_call()) - .unwrap(); + let call_c = incoming_call_c.next().await.unwrap().unwrap(); + assert_eq!(call_c.caller.github_login, "user_b"); + active_call_c.update(cx_c, |call, _| call.decline_incoming().unwrap()); assert!(incoming_call_c.next().await.unwrap().is_none()); deterministic.run_until_parked(); @@ -196,7 +184,10 @@ async fn test_basic_calls( ); // User A leaves the room. - room_a.update(cx_a, |room, cx| room.leave(cx)).unwrap(); + active_call_a.update(cx_a, |call, cx| { + call.hang_up(cx).unwrap(); + assert!(call.room().is_none()); + }); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), @@ -218,108 +209,107 @@ async fn test_basic_calls( async fn test_room_uniqueness( deterministic: Arc, cx_a: &mut TestAppContext, + cx_a2: &mut TestAppContext, cx_b: &mut TestAppContext, + cx_b2: &mut TestAppContext, cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; + let _client_a2 = server.create_client(cx_a2, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; + let _client_b2 = server.create_client(cx_b2, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; server .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; - let room_a = cx_a - .update(|cx| Room::create(client_a.clone(), client_a.user_store.clone(), cx)) - .await - .unwrap(); - // Ensure room can't be created given we've just created one. - cx_a.update(|cx| Room::create(client_a.clone(), client_a.user_store.clone(), cx)) - .await - .unwrap_err(); + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_a2 = cx_a2.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_b2 = cx_b2.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); // Call user B from client A. - let mut incoming_call_b = client_b - .user_store - .update(cx_b, |user, _| user.incoming_call()); - room_a - .update(cx_a, |room, cx| { - room.call(client_b.user_id().unwrap(), None, cx) + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) }) .await .unwrap(); + + // Ensure a new room can't be created given user A just created one. + active_call_a2 + .update(cx_a2, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) + .await + .unwrap_err(); + active_call_a2.read_with(cx_a2, |call, _| assert!(call.room().is_none())); + + // User B receives the call from user A. + let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); let call_b1 = incoming_call_b.next().await.unwrap().unwrap(); assert_eq!(call_b1.caller.github_login, "user_a"); // Ensure calling users A and B from client C fails. - let room_c = cx_c - .update(|cx| Room::create(client_c.clone(), client_c.user_store.clone(), cx)) - .await - .unwrap(); - room_c - .update(cx_c, |room, cx| { - room.call(client_a.user_id().unwrap(), None, cx) + active_call_c + .update(cx_c, |call, cx| { + call.invite(client_a.user_id().unwrap(), None, cx) }) .await .unwrap_err(); - room_c - .update(cx_c, |room, cx| { - room.call(client_b.user_id().unwrap(), None, cx) + active_call_c + .update(cx_c, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) }) .await .unwrap_err(); // Ensure User B can't create a room while they still have an incoming call. - cx_b.update(|cx| Room::create(client_b.clone(), client_b.user_store.clone(), cx)) - .await - .unwrap_err(); - - // User B joins the room and calling them after they've joined still fails. - let room_b = cx_b - .update(|cx| { - Room::join( - &call_b1, - client_b.client.clone(), - client_b.user_store.clone(), - cx, - ) + active_call_b2 + .update(cx_b2, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) }) .await + .unwrap_err(); + active_call_b2.read_with(cx_b2, |call, _| assert!(call.room().is_none())); + + // User B joins the room and calling them after they've joined still fails. + active_call_b + .update(cx_b, |call, cx| call.accept_incoming(cx)) + .await .unwrap(); - room_c - .update(cx_c, |room, cx| { - room.call(client_b.user_id().unwrap(), None, cx) + active_call_c + .update(cx_c, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) }) .await .unwrap_err(); // Ensure User B can't create a room while they belong to another room. - cx_b.update(|cx| Room::create(client_b.clone(), client_b.user_store.clone(), cx)) + active_call_b2 + .update(cx_b2, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) .await .unwrap_err(); + active_call_b2.read_with(cx_b2, |call, _| assert!(call.room().is_none())); // Client C can successfully call client B after client B leaves the room. - cx_b.update(|_| drop(room_b)); + active_call_b + .update(cx_b, |call, cx| call.hang_up(cx)) + .unwrap(); deterministic.run_until_parked(); - room_c - .update(cx_c, |room, cx| { - room.call(client_b.user_id().unwrap(), None, cx) + active_call_c + .update(cx_c, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) }) .await .unwrap(); let call_b2 = incoming_call_b.next().await.unwrap().unwrap(); assert_eq!(call_b2.caller.github_login, "user_c"); - - // Client B can successfully create a room after declining the call from client C. - client_b - .user_store - .update(cx_b, |user_store, _| user_store.decline_call()) - .unwrap(); - deterministic.run_until_parked(); - cx_b.update(|cx| Room::create(client_b.clone(), client_b.user_store.clone(), cx)) - .await - .unwrap(); } #[gpui::test(iterations = 10)] @@ -336,28 +326,26 @@ async fn test_leaving_room_on_disconnection( .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; - let room_a = cx_a - .update(|cx| Room::create(client_a.clone(), client_a.user_store.clone(), cx)) - .await - .unwrap(); + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); // Call user B from client A. - let mut incoming_call_b = client_b - .user_store - .update(cx_b, |user, _| user.incoming_call()); - room_a - .update(cx_a, |room, cx| { - room.call(client_b.user_id().unwrap(), None, cx) + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) }) .await .unwrap(); + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); // User B receives the call and joins the room. - let call_b = incoming_call_b.next().await.unwrap().unwrap(); - let room_b = cx_b - .update(|cx| Room::join(&call_b, client_b.clone(), client_b.user_store.clone(), cx)) + let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); + incoming_call_b.next().await.unwrap().unwrap(); + active_call_b + .update(cx_b, |call, cx| call.accept_incoming(cx)) .await .unwrap(); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), @@ -398,13 +386,13 @@ async fn test_share_project( cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); let (_, window_b) = cx_b.add_window(|_| EmptyView); let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -502,10 +490,13 @@ async fn test_unshare_project( let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; - let (room_id, mut rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + client_a .fs .insert_tree( @@ -532,7 +523,7 @@ async fn test_unshare_project( .unwrap(); // When client B leaves the room, the project becomes read-only. - cx_b.update(|_| drop(rooms.remove(1))); + active_call_b.update(cx_b, |call, cx| call.hang_up(cx).unwrap()); deterministic.run_until_parked(); assert!(project_b.read_with(cx_b, |project, _| project.is_read_only())); @@ -560,7 +551,7 @@ async fn test_unshare_project( .unwrap(); // When client A (the host) leaves the room, the project gets unshared and guests are notified. - cx_a.update(|_| drop(rooms.remove(0))); + active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); deterministic.run_until_parked(); project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); project_c2.read_with(cx_c, |project, _| { @@ -582,8 +573,8 @@ async fn test_host_disconnect( let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; client_a @@ -660,7 +651,7 @@ async fn test_host_disconnect( } #[gpui::test(iterations = 10)] -async fn test_room_events( +async fn test_active_call_events( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, @@ -675,24 +666,21 @@ async fn test_room_events( let (project_a, _) = client_a.build_local_project("/a", cx_a).await; let (project_b, _) = client_b.build_local_project("/b", cx_b).await; - let (room_id, mut rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; - let room_a = rooms.remove(0); - let room_a_events = room_events(&room_a, cx_a); - - let room_b = rooms.remove(0); - let room_b_events = room_events(&room_b, cx_b); + let events_a = active_call_events(cx_a); + let events_b = active_call_events(cx_b); let project_a_id = project_a .update(cx_a, |project, cx| project.share(room_id, cx)) .await .unwrap(); deterministic.run_until_parked(); - assert_eq!(mem::take(&mut *room_a_events.borrow_mut()), vec![]); + assert_eq!(mem::take(&mut *events_a.borrow_mut()), vec![]); assert_eq!( - mem::take(&mut *room_b_events.borrow_mut()), + mem::take(&mut *events_b.borrow_mut()), vec![room::Event::RemoteProjectShared { owner: Arc::new(User { id: client_a.user_id().unwrap(), @@ -709,7 +697,7 @@ async fn test_room_events( .unwrap(); deterministic.run_until_parked(); assert_eq!( - mem::take(&mut *room_a_events.borrow_mut()), + mem::take(&mut *events_a.borrow_mut()), vec![room::Event::RemoteProjectShared { owner: Arc::new(User { id: client_b.user_id().unwrap(), @@ -719,17 +707,15 @@ async fn test_room_events( project_id: project_b_id, }] ); - assert_eq!(mem::take(&mut *room_b_events.borrow_mut()), vec![]); + assert_eq!(mem::take(&mut *events_b.borrow_mut()), vec![]); - fn room_events( - room: &ModelHandle, - cx: &mut TestAppContext, - ) -> Rc>> { + fn active_call_events(cx: &mut TestAppContext) -> Rc>> { let events = Rc::new(RefCell::new(Vec::new())); + let active_call = cx.read(ActiveCall::global); cx.update({ let events = events.clone(); |cx| { - cx.subscribe(room, move |_, event, _| { + cx.subscribe(&active_call, move |_, event, _| { events.borrow_mut().push(event.clone()) }) .detach() @@ -755,26 +741,28 @@ async fn test_room_location( let (project_a, _) = client_a.build_local_project("/a", cx_a).await; let (project_b, _) = client_b.build_local_project("/b", cx_b).await; - let (room_id, mut rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; - let room_a = rooms.remove(0); - let room_a_notified = Rc::new(Cell::new(false)); + let active_call_a = cx_a.read(ActiveCall::global); + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + let a_notified = Rc::new(Cell::new(false)); cx_a.update({ - let room_a_notified = room_a_notified.clone(); + let notified = a_notified.clone(); |cx| { - cx.observe(&room_a, move |_, _| room_a_notified.set(true)) + cx.observe(&active_call_a, move |_, _| notified.set(true)) .detach() } }); - let room_b = rooms.remove(0); - let room_b_notified = Rc::new(Cell::new(false)); + let active_call_b = cx_b.read(ActiveCall::global); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + let b_notified = Rc::new(Cell::new(false)); cx_b.update({ - let room_b_notified = room_b_notified.clone(); + let b_notified = b_notified.clone(); |cx| { - cx.observe(&room_b, move |_, _| room_b_notified.set(true)) + cx.observe(&active_call_b, move |_, _| b_notified.set(true)) .detach() } }); @@ -784,12 +772,12 @@ async fn test_room_location( .await .unwrap(); deterministic.run_until_parked(); - assert!(room_a_notified.take()); + assert!(a_notified.take()); assert_eq!( participant_locations(&room_a, cx_a), vec![("user_b".to_string(), ParticipantLocation::External)] ); - assert!(room_b_notified.take()); + assert!(b_notified.take()); assert_eq!( participant_locations(&room_b, cx_b), vec![("user_a".to_string(), ParticipantLocation::External)] @@ -800,12 +788,12 @@ async fn test_room_location( .await .unwrap(); deterministic.run_until_parked(); - assert!(room_a_notified.take()); + assert!(a_notified.take()); assert_eq!( participant_locations(&room_a, cx_a), vec![("user_b".to_string(), ParticipantLocation::External)] ); - assert!(room_b_notified.take()); + assert!(b_notified.take()); assert_eq!( participant_locations(&room_b, cx_b), vec![("user_a".to_string(), ParticipantLocation::External)] @@ -816,12 +804,12 @@ async fn test_room_location( .await .unwrap(); deterministic.run_until_parked(); - assert!(room_a_notified.take()); + assert!(a_notified.take()); assert_eq!( participant_locations(&room_a, cx_a), vec![("user_b".to_string(), ParticipantLocation::External)] ); - assert!(room_b_notified.take()); + assert!(b_notified.take()); assert_eq!( participant_locations(&room_b, cx_b), vec![( @@ -837,7 +825,7 @@ async fn test_room_location( .await .unwrap(); deterministic.run_until_parked(); - assert!(room_a_notified.take()); + assert!(a_notified.take()); assert_eq!( participant_locations(&room_a, cx_a), vec![( @@ -847,7 +835,7 @@ async fn test_room_location( } )] ); - assert!(room_b_notified.take()); + assert!(b_notified.take()); assert_eq!( participant_locations(&room_b, cx_b), vec![( @@ -863,12 +851,12 @@ async fn test_room_location( .await .unwrap(); deterministic.run_until_parked(); - assert!(room_a_notified.take()); + assert!(a_notified.take()); assert_eq!( participant_locations(&room_a, cx_a), vec![("user_b".to_string(), ParticipantLocation::External)] ); - assert!(room_b_notified.take()); + assert!(b_notified.take()); assert_eq!( participant_locations(&room_b, cx_b), vec![( @@ -908,8 +896,8 @@ async fn test_propagate_saves_and_fs_changes( let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; client_a @@ -1056,8 +1044,8 @@ async fn test_fs_operations( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -1321,8 +1309,8 @@ async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut T let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -1374,8 +1362,8 @@ async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -1432,8 +1420,8 @@ async fn test_editing_while_guest_opens_buffer( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -1478,8 +1466,8 @@ async fn test_leaving_worktree_while_opening_buffer( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -1522,8 +1510,8 @@ async fn test_canceling_buffer_opening( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -1573,8 +1561,8 @@ async fn test_leaving_project( let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; client_a @@ -1663,8 +1651,8 @@ async fn test_collaborating_with_diagnostics( let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; // Set up a fake language server. @@ -1900,8 +1888,8 @@ async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mu let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -2073,8 +2061,8 @@ async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut Te let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -2165,8 +2153,8 @@ async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppCon let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -2265,8 +2253,8 @@ async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -2408,8 +2396,8 @@ async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -2508,8 +2496,8 @@ async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContex let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -2586,8 +2574,8 @@ async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppC let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -2687,8 +2675,8 @@ async fn test_lsp_hover(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; client_a @@ -2789,8 +2777,8 @@ async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppConte let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -2896,8 +2884,8 @@ async fn test_open_buffer_while_getting_definition_pointing_to_it( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -2971,8 +2959,8 @@ async fn test_collaborating_with_code_actions( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -3181,8 +3169,8 @@ async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut T let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -3372,8 +3360,8 @@ async fn test_language_server_statuses( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; // Set up a fake language server. @@ -4145,8 +4133,8 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; cx_a.update(editor::init); cx_b.update(editor::init); @@ -4354,8 +4342,8 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; cx_a.update(editor::init); cx_b.update(editor::init); @@ -4522,8 +4510,8 @@ async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; cx_a.update(editor::init); cx_b.update(editor::init); @@ -4685,8 +4673,8 @@ async fn test_peers_simultaneously_following_each_other( let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; - let (room_id, _rooms) = server - .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + let room_id = server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) .await; cx_a.update(editor::init); cx_b.update(editor::init); @@ -4790,12 +4778,8 @@ async fn test_random_collaboration( .unwrap(); } - let client = server.create_client(cx, "room-creator").await; - let room = cx - .update(|cx| Room::create(client.client.clone(), client.user_store.clone(), cx)) - .await - .unwrap(); - let room_id = room.read_with(cx, |room, _| room.id()); + let _room_creator = server.create_client(cx, "room-creator").await; + let active_call = cx.read(ActiveCall::global); let mut clients = Vec::new(); let mut user_ids = Vec::new(); @@ -4963,22 +4947,17 @@ async fn test_random_collaboration( host_language_registry.add(Arc::new(language)); let host_user_id = host.current_user_id(&host_cx); - room.update(cx, |room, cx| room.call(host_user_id.to_proto(), None, cx)) + active_call + .update(cx, |call, cx| { + call.invite(host_user_id.to_proto(), None, cx) + }) .await .unwrap(); + let room_id = active_call.read_with(cx, |call, cx| call.room().unwrap().read(cx).id()); deterministic.run_until_parked(); - let call = host - .user_store - .read_with(&host_cx, |user_store, _| user_store.incoming_call()); - let host_room = host_cx - .update(|cx| { - Room::join( - call.borrow().as_ref().unwrap(), - host.client.clone(), - host.user_store.clone(), - cx, - ) - }) + host_cx + .read(ActiveCall::global) + .update(&mut host_cx, |call, cx| call.accept_incoming(cx)) .await .unwrap(); @@ -4991,7 +4970,6 @@ async fn test_random_collaboration( user_ids.push(host_user_id); op_start_signals.push(op_start_signal.0); clients.push(host_cx.foreground().spawn(host.simulate_host( - host_room, host_project, op_start_signal.1, rng.clone(), @@ -5016,20 +4994,26 @@ async fn test_random_collaboration( deterministic.finish_waiting(); deterministic.run_until_parked(); - let (host, host_room, host_project, mut host_cx, host_err) = clients.remove(0); + let (host, host_project, mut host_cx, host_err) = clients.remove(0); if let Some(host_err) = host_err { log::error!("host error - {:?}", host_err); } host_project.read_with(&host_cx, |project, _| assert!(!project.is_shared())); - for (guest, guest_room, guest_project, mut guest_cx, guest_err) in clients { + for (guest, guest_project, mut guest_cx, guest_err) in clients { if let Some(guest_err) = guest_err { log::error!("{} error - {:?}", guest.username, guest_err); } guest_project.read_with(&guest_cx, |project, _| assert!(project.is_read_only())); - guest_cx.update(|_| drop((guest, guest_room, guest_project))); + guest_cx.update(|cx| { + cx.clear_globals(); + drop((guest, guest_project)); + }); } - host_cx.update(|_| drop((host, host_room, host_project))); + host_cx.update(|cx| { + cx.clear_globals(); + drop((host, host_project)); + }); return; } @@ -5055,23 +5039,16 @@ async fn test_random_collaboration( let guest = server.create_client(&mut guest_cx, &guest_username).await; let guest_user_id = guest.current_user_id(&guest_cx); - room.update(cx, |room, cx| room.call(guest_user_id.to_proto(), None, cx)) + active_call + .update(cx, |call, cx| { + call.invite(guest_user_id.to_proto(), None, cx) + }) .await .unwrap(); deterministic.run_until_parked(); - let call = guest - .user_store - .read_with(&guest_cx, |user_store, _| user_store.incoming_call()); - - let guest_room = guest_cx - .update(|cx| { - Room::join( - call.borrow().as_ref().unwrap(), - guest.client.clone(), - guest.user_store.clone(), - cx, - ) - }) + guest_cx + .read(ActiveCall::global) + .update(&mut guest_cx, |call, cx| call.accept_incoming(cx)) .await .unwrap(); @@ -5093,7 +5070,6 @@ async fn test_random_collaboration( op_start_signals.push(op_start_signal.0); clients.push(guest_cx.foreground().spawn(guest.simulate_guest( guest_username.clone(), - guest_room, guest_project, op_start_signal.1, rng.clone(), @@ -5114,7 +5090,7 @@ async fn test_random_collaboration( deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.start_waiting(); log::info!("Waiting for guest {} to exit...", removed_guest_id); - let (guest, guest_room, guest_project, mut guest_cx, guest_err) = guest.await; + let (guest, guest_project, mut guest_cx, guest_err) = guest.await; deterministic.finish_waiting(); server.allow_connections(); @@ -5142,7 +5118,10 @@ async fn test_random_collaboration( log::info!("{} removed", guest.username); available_guests.push(guest.username.clone()); - guest_cx.update(|_| drop((guest, guest_room, guest_project))); + guest_cx.update(|cx| { + cx.clear_globals(); + drop((guest, guest_project)); + }); operations += 1; } @@ -5169,7 +5148,7 @@ async fn test_random_collaboration( deterministic.finish_waiting(); deterministic.run_until_parked(); - let (host_client, host_room, host_project, mut host_cx, host_err) = clients.remove(0); + let (host_client, host_project, mut host_cx, host_err) = clients.remove(0); if let Some(host_err) = host_err { panic!("host error - {:?}", host_err); } @@ -5185,7 +5164,7 @@ async fn test_random_collaboration( host_project.read_with(&host_cx, |project, cx| project.check_invariants(cx)); - for (guest_client, guest_room, guest_project, mut guest_cx, guest_err) in clients.into_iter() { + for (guest_client, guest_project, mut guest_cx, guest_err) in clients.into_iter() { if let Some(guest_err) = guest_err { panic!("{} error - {:?}", guest_client.username, guest_err); } @@ -5257,10 +5236,16 @@ async fn test_random_collaboration( ); } - guest_cx.update(|_| drop((guest_room, guest_project, guest_client))); + guest_cx.update(|cx| { + cx.clear_globals(); + drop((guest_project, guest_client)); + }); } - host_cx.update(|_| drop((host_client, host_room, host_project))); + host_cx.update(|cx| { + cx.clear_globals(); + drop((host_client, host_project)) + }); } struct TestServer { @@ -5385,7 +5370,10 @@ impl TestServer { Channel::init(&client); Project::init(&client); - cx.update(|cx| workspace::init(app_state.clone(), cx)); + cx.update(|cx| { + workspace::init(app_state.clone(), cx); + call::init(client.clone(), user_store.clone(), cx); + }); client .authenticate_and_connect(false, &cx.to_async()) @@ -5447,50 +5435,29 @@ impl TestServer { } } - async fn create_rooms( - &self, - clients: &mut [(&TestClient, &mut TestAppContext)], - ) -> (u64, Vec>) { + async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) -> u64 { self.make_contacts(clients).await; - let mut rooms = Vec::new(); - let (left, right) = clients.split_at_mut(1); - let (client_a, cx_a) = &mut left[0]; - - let room_a = cx_a - .update(|cx| Room::create(client_a.client.clone(), client_a.user_store.clone(), cx)) - .await - .unwrap(); - let room_id = room_a.read_with(*cx_a, |room, _| room.id()); + let (_client_a, cx_a) = &mut left[0]; + let active_call_a = cx_a.read(ActiveCall::global); for (client_b, cx_b) in right { let user_id_b = client_b.current_user_id(*cx_b).to_proto(); - room_a - .update(*cx_a, |room, cx| room.call(user_id_b, None, cx)) + active_call_a + .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx)) .await .unwrap(); cx_b.foreground().run_until_parked(); - let incoming_call = client_b - .user_store - .read_with(*cx_b, |user_store, _| user_store.incoming_call()); - let room_b = cx_b - .update(|cx| { - Room::join( - incoming_call.borrow().as_ref().unwrap(), - client_b.client.clone(), - client_b.user_store.clone(), - cx, - ) - }) + let active_call_b = cx_b.read(ActiveCall::global); + active_call_b + .update(*cx_b, |call, cx| call.accept_incoming(cx)) .await .unwrap(); - rooms.push(room_b); } - rooms.insert(0, room_a); - (room_id, rooms) + active_call_a.read_with(*cx_a, |call, cx| call.room().unwrap().read(cx).id()) } async fn build_app_state(test_db: &TestDb) -> Arc { @@ -5656,14 +5623,12 @@ impl TestClient { async fn simulate_host( mut self, - room: ModelHandle, project: ModelHandle, op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, mut cx: TestAppContext, ) -> ( Self, - ModelHandle, ModelHandle, TestAppContext, Option, @@ -5789,20 +5754,18 @@ impl TestClient { let result = simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx).await; log::info!("Host done"); - (self, room, project, cx, result.err()) + (self, project, cx, result.err()) } pub async fn simulate_guest( mut self, guest_username: String, - room: ModelHandle, project: ModelHandle, op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, mut cx: TestAppContext, ) -> ( Self, - ModelHandle, ModelHandle, TestAppContext, Option, @@ -6121,7 +6084,7 @@ impl TestClient { .await; log::info!("{}: done", guest_username); - (self, room, project, cx, result.err()) + (self, project, cx, result.err()) } } diff --git a/crates/collab_ui/src/collab_ui.rs b/crates/collab_ui/src/collab_ui.rs index 786d344df1..03d1bf6672 100644 --- a/crates/collab_ui/src/collab_ui.rs +++ b/crates/collab_ui/src/collab_ui.rs @@ -13,7 +13,7 @@ use workspace::{AppState, JoinProject, ToggleFollow, Workspace}; pub fn init(app_state: Arc, cx: &mut MutableAppContext) { contacts_popover::init(cx); collab_titlebar_item::init(cx); - incoming_call_notification::init(app_state.user_store.clone(), cx); + incoming_call_notification::init(cx); project_shared_notification::init(cx); cx.add_global_action(move |action: &JoinProject, cx| { diff --git a/crates/collab_ui/src/incoming_call_notification.rs b/crates/collab_ui/src/incoming_call_notification.rs index e46e69522f..ae1240d0d9 100644 --- a/crates/collab_ui/src/incoming_call_notification.rs +++ b/crates/collab_ui/src/incoming_call_notification.rs @@ -1,11 +1,11 @@ use call::ActiveCall; -use client::{incoming_call::IncomingCall, UserStore}; +use client::incoming_call::IncomingCall; use futures::StreamExt; use gpui::{ elements::*, geometry::{rect::RectF, vector::vec2f}, - impl_internal_actions, Entity, ModelHandle, MouseButton, MutableAppContext, RenderContext, - View, ViewContext, WindowBounds, WindowKind, WindowOptions, + impl_internal_actions, Entity, MouseButton, MutableAppContext, RenderContext, View, + ViewContext, WindowBounds, WindowKind, WindowOptions, }; use settings::Settings; use util::ResultExt; @@ -13,10 +13,10 @@ use workspace::JoinProject; impl_internal_actions!(incoming_call_notification, [RespondToCall]); -pub fn init(user_store: ModelHandle, cx: &mut MutableAppContext) { +pub fn init(cx: &mut MutableAppContext) { cx.add_action(IncomingCallNotification::respond_to_call); - let mut incoming_call = user_store.read(cx).incoming_call(); + let mut incoming_call = ActiveCall::global(cx).read(cx).incoming(); cx.spawn(|mut cx| async move { let mut notification_window = None; while let Some(incoming_call) = incoming_call.next().await { @@ -33,7 +33,7 @@ pub fn init(user_store: ModelHandle, cx: &mut MutableAppContext) { kind: WindowKind::PopUp, is_movable: false, }, - |_| IncomingCallNotification::new(incoming_call, user_store.clone()), + |_| IncomingCallNotification::new(incoming_call), ); notification_window = Some(window_id); } @@ -49,18 +49,17 @@ struct RespondToCall { pub struct IncomingCallNotification { call: IncomingCall, - user_store: ModelHandle, } impl IncomingCallNotification { - pub fn new(call: IncomingCall, user_store: ModelHandle) -> Self { - Self { call, user_store } + pub fn new(call: IncomingCall) -> Self { + Self { call } } fn respond_to_call(&mut self, action: &RespondToCall, cx: &mut ViewContext) { + let active_call = ActiveCall::global(cx); if action.accept { - let join = ActiveCall::global(cx) - .update(cx, |active_call, cx| active_call.join(&self.call, cx)); + let join = active_call.update(cx, |active_call, cx| active_call.accept_incoming(cx)); let caller_user_id = self.call.caller.id; let initial_project_id = self.call.initial_project_id; cx.spawn_weak(|_, mut cx| async move { @@ -77,12 +76,10 @@ impl IncomingCallNotification { }) .detach_and_log_err(cx); } else { - self.user_store - .update(cx, |user_store, _| user_store.decline_call().log_err()); + active_call.update(cx, |active_call, _| { + active_call.decline_incoming().log_err(); + }); } - - let window_id = cx.window_id(); - cx.remove_window(window_id); } fn render_caller(&self, cx: &mut RenderContext) -> ElementBox { diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 002bd01377..668071d046 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -1906,6 +1906,10 @@ impl MutableAppContext { }) } + pub fn clear_globals(&mut self) { + self.cx.globals.clear(); + } + pub fn add_model(&mut self, build_model: F) -> ModelHandle where T: Entity, diff --git a/crates/gpui/src/test.rs b/crates/gpui/src/test.rs index 4122ad09b7..6cfb4cf2b6 100644 --- a/crates/gpui/src/test.rs +++ b/crates/gpui/src/test.rs @@ -91,7 +91,7 @@ pub fn run_test( cx.update(|cx| cx.remove_all_windows()); deterministic.run_until_parked(); - cx.update(|_| {}); // flush effects + cx.update(|cx| cx.clear_globals()); leak_detector.lock().detect(); if is_last_iteration { diff --git a/crates/gpui_macros/src/gpui_macros.rs b/crates/gpui_macros/src/gpui_macros.rs index a60d385e8f..32a821f4c8 100644 --- a/crates/gpui_macros/src/gpui_macros.rs +++ b/crates/gpui_macros/src/gpui_macros.rs @@ -122,7 +122,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { cx_teardowns.extend(quote!( #cx_varname.update(|cx| cx.remove_all_windows()); deterministic.run_until_parked(); - #cx_varname.update(|_| {}); // flush effects + #cx_varname.update(|cx| cx.clear_globals()); )); inner_fn_args.extend(quote!(&mut #cx_varname,)); continue;