diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 596a0ec853..64584e6140 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use client::{proto, Client, TypedEnvelope, User, UserStore}; use collections::HashSet; +use futures::{future::Shared, FutureExt}; use postage::watch; use gpui::{ @@ -33,6 +34,7 @@ pub struct IncomingCall { /// Singleton global maintaining the user's participation in a room across workspaces. pub struct ActiveCall { room: Option<(ModelHandle, Vec)>, + pending_room_creation: Option, Arc>>>>, location: Option>, pending_invites: HashSet, incoming_call: ( @@ -56,6 +58,7 @@ impl ActiveCall { ) -> Self { Self { room: None, + pending_room_creation: None, location: None, pending_invites: Default::default(), incoming_call: watch::channel(), @@ -124,45 +127,74 @@ impl ActiveCall { initial_project: Option>, cx: &mut ModelContext, ) -> Task> { - let client = self.client.clone(); - let user_store = self.user_store.clone(); if !self.pending_invites.insert(called_user_id) { return Task::ready(Err(anyhow!("user was already invited"))); } - cx.notify(); - cx.spawn(|this, mut cx| async move { - let invite = async { - if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) { - let initial_project_id = if let Some(initial_project) = initial_project { - Some( - room.update(&mut cx, |room, cx| { - room.share_project(initial_project, cx) - }) + + let room = if let Some(room) = self.room().cloned() { + Some(Task::ready(Ok(room)).shared()) + } else { + self.pending_room_creation.clone() + }; + + let invite = if let Some(room) = room { + cx.spawn_weak(|_, mut cx| async move { + let room = room.await.map_err(|err| anyhow!("{:?}", err))?; + + let initial_project_id = if let Some(initial_project) = initial_project { + Some( + room.update(&mut cx, |room, cx| room.share_project(initial_project, cx)) .await?, - ) - } else { - None - }; - - room.update(&mut cx, |room, cx| { - room.call(called_user_id, initial_project_id, cx) - }) - .await?; + ) } else { - let room = cx - .update(|cx| { - Room::create(called_user_id, initial_project, client, user_store, cx) - }) - .await?; - - this.update(&mut cx, |this, cx| this.set_room(Some(room), cx)) - .await?; + None }; - Ok(()) - }; + room.update(&mut cx, |room, cx| { + room.call(called_user_id, initial_project_id, cx) + }) + .await?; + anyhow::Ok(()) + }) + } else { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let room = cx + .spawn(|this, mut cx| async move { + let create_room = async { + let room = cx + .update(|cx| { + Room::create( + called_user_id, + initial_project, + client, + user_store, + cx, + ) + }) + .await?; + + this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)) + .await?; + + anyhow::Ok(room) + }; + + let room = create_room.await; + this.update(&mut cx, |this, _| this.pending_room_creation = None); + room.map_err(Arc::new) + }) + .shared(); + self.pending_room_creation = Some(room.clone()); + cx.foreground().spawn(async move { + room.await.map_err(|err| anyhow!("{:?}", err))?; + anyhow::Ok(()) + }) + }; + + cx.spawn(|this, mut cx| async move { let result = invite.await; this.update(&mut cx, |this, cx| { this.pending_invites.remove(&called_user_id); diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 55f888d11d..ff9872f31f 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -334,6 +334,134 @@ async fn test_basic_calls( ); } +#[gpui::test(iterations = 10)] +async fn test_calling_multiple_users_simultaneously( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, + cx_d: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + let client_d = server.create_client(cx_d, "user_d").await; + server + .make_contacts(&mut [ + (&client_a, cx_a), + (&client_b, cx_b), + (&client_c, cx_c), + (&client_d, cx_d), + ]) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); + let active_call_d = cx_d.read(ActiveCall::global); + + // Simultaneously call user B and user C from client A. + let b_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }); + let c_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }); + b_invite.await.unwrap(); + c_invite.await.unwrap(); + + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec!["user_b".to_string(), "user_c".to_string()] + } + ); + + // Call client D from client A. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_d.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string() + ] + } + ); + + // Accept the call on all clients simultaneously. + let accept_b = active_call_b.update(cx_b, |call, cx| call.accept_incoming(cx)); + let accept_c = active_call_c.update(cx_c, |call, cx| call.accept_incoming(cx)); + let accept_d = active_call_d.update(cx_d, |call, cx| call.accept_incoming(cx)); + accept_b.await.unwrap(); + accept_c.await.unwrap(); + accept_d.await.unwrap(); + + deterministic.run_until_parked(); + + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone()); + let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone()); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_c".to_string(), + ], + pending: Default::default() + } + ); +} + #[gpui::test(iterations = 10)] async fn test_room_uniqueness( deterministic: Arc,