diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 0979ad8bb9..ed807ef8c5 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -151,11 +151,14 @@ impl Room { cx.spawn(|this, mut cx| async move { connect.await?; this.update(&mut cx, |this, cx| { - if this.read_only() || this.is_muted() { - Task::ready(Ok(())) - } else { - this.share_microphone(cx) + if !this.read_only() { + if let Some(live_kit) = &this.live_kit { + if !live_kit.muted_by_user && !live_kit.deafened { + return this.share_microphone(cx); + } + } } + Task::ready(Ok(())) })? .await }) @@ -1477,7 +1480,10 @@ impl Room { if let Some(live_kit) = self.live_kit.as_mut() { // When unmuting, undeafen if the user was deafened before. let was_deafened = live_kit.deafened; - if live_kit.muted_by_user || live_kit.deafened { + if live_kit.muted_by_user + || live_kit.deafened + || matches!(live_kit.microphone_track, LocalTrack::None) + { live_kit.muted_by_user = false; live_kit.deafened = false; } else { diff --git a/crates/collab/src/tests/channel_guest_tests.rs b/crates/collab/src/tests/channel_guest_tests.rs index d593323592..f3326cd692 100644 --- a/crates/collab/src/tests/channel_guest_tests.rs +++ b/crates/collab/src/tests/channel_guest_tests.rs @@ -57,7 +57,7 @@ async fn test_channel_guests( }) .await .is_err()); - assert!(room_b.read_with(cx_b, |room, _| !room.is_sharing_mic())); + assert!(room_b.read_with(cx_b, |room, _| room.is_muted())); } #[gpui::test] @@ -104,6 +104,7 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test }); assert!(project_b.read_with(cx_b, |project, _| project.is_read_only())); assert!(editor_b.update(cx_b, |e, cx| e.read_only(cx))); + assert!(room_b.read_with(cx_b, |room, _| room.read_only())); assert!(room_b .update(cx_b, |room, cx| room.share_microphone(cx)) .await @@ -127,10 +128,13 @@ async fn test_channel_guest_promotion(cx_a: &mut TestAppContext, cx_b: &mut Test // project and buffers are now editable assert!(project_b.read_with(cx_b, |project, _| !project.is_read_only())); assert!(editor_b.update(cx_b, |editor, cx| !editor.read_only(cx))); - room_b - .update(cx_b, |room, cx| room.share_microphone(cx)) - .await - .unwrap(); + + // B sees themselves as muted, and can unmute. + assert!(room_b.read_with(cx_b, |room, _| !room.read_only())); + room_b.read_with(cx_b, |room, _| assert!(room.is_muted())); + room_b.update(cx_b, |room, cx| room.toggle_mute(cx)); + cx_a.run_until_parked(); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); // B is demoted active_call_a diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index cedc841527..e68fd10d8d 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -1876,6 +1876,186 @@ fn active_call_events(cx: &mut TestAppContext) -> Rc>> events } +#[gpui::test] +async fn test_mute_deafen( + executor: BackgroundExecutor, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, +) { + let mut server = TestServer::start(executor.clone()).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + + server + .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); + + // User A calls user B, B answers. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + executor.run_until_parked(); + active_call_b + .update(cx_b, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + executor.run_until_parked(); + + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + + room_a.read_with(cx_a, |room, _| assert!(!room.is_muted())); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); + + // Users A and B are both muted. + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + }] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + }] + ); + + // User A mutes + room_a.update(cx_a, |room, cx| room.toggle_mute(cx)); + executor.run_until_parked(); + + // User A hears user B, but B doesn't hear A. + room_a.read_with(cx_a, |room, _| assert!(room.is_muted())); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + }] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: true, + audio_tracks_playing: vec![true], + }] + ); + + // User A deafens + room_a.update(cx_a, |room, cx| room.toggle_deafen(cx)); + executor.run_until_parked(); + + // User A does not hear user B. + room_a.read_with(cx_a, |room, _| assert!(room.is_muted())); + room_b.read_with(cx_b, |room, _| assert!(!room.is_muted())); + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![false], + }] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: true, + audio_tracks_playing: vec![true], + }] + ); + + // User B calls user C, C joins. + active_call_b + .update(cx_b, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + executor.run_until_parked(); + active_call_c + .update(cx_c, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + executor.run_until_parked(); + + // User A does not hear users B or C. + assert_eq!( + participant_audio_state(&room_a, cx_a), + &[ + ParticipantAudioState { + user_id: client_b.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![false], + }, + ParticipantAudioState { + user_id: client_c.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![false], + } + ] + ); + assert_eq!( + participant_audio_state(&room_b, cx_b), + &[ + ParticipantAudioState { + user_id: client_a.user_id().unwrap(), + is_muted: true, + audio_tracks_playing: vec![true], + }, + ParticipantAudioState { + user_id: client_c.user_id().unwrap(), + is_muted: false, + audio_tracks_playing: vec![true], + } + ] + ); + + #[derive(PartialEq, Eq, Debug)] + struct ParticipantAudioState { + user_id: u64, + is_muted: bool, + audio_tracks_playing: Vec, + } + + fn participant_audio_state( + room: &Model, + cx: &TestAppContext, + ) -> Vec { + room.read_with(cx, |room, _| { + room.remote_participants() + .iter() + .map(|(user_id, participant)| ParticipantAudioState { + user_id: *user_id, + is_muted: participant.muted, + audio_tracks_playing: participant + .audio_tracks + .values() + .map(|track| track.is_playing()) + .collect(), + }) + .collect::>() + }) + } +} + #[gpui::test(iterations = 10)] async fn test_room_location( executor: BackgroundExecutor, diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 1b7fd20bc2..96ca2b90dc 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,7 +1,7 @@ use crate::{ConnectionState, RoomUpdate, Sid}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; -use collections::{BTreeMap, HashMap}; +use collections::{BTreeMap, HashMap, HashSet}; use futures::Stream; use gpui::BackgroundExecutor; use live_kit_server::{proto, token}; @@ -13,7 +13,7 @@ use std::{ mem, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, - Arc, + Arc, Weak, }, }; @@ -113,7 +113,25 @@ impl TestServer { .0 .lock() .updates_tx - .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new( + RemoteVideoTrack { + server_track: track.clone(), + }, + ))) + .unwrap(); + } + for track in &room.audio_tracks { + client_room + .0 + .lock() + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( + Arc::new(RemoteAudioTrack { + server_track: track.clone(), + room: Arc::downgrade(&client_room), + }), + Arc::new(RemoteTrackPublication), + )) .unwrap(); } room.client_rooms.insert(identity, client_room); @@ -210,7 +228,7 @@ impl TestServer { } let sid = nanoid::nanoid!(17); - let track = Arc::new(RemoteVideoTrack { + let track = Arc::new(TestServerVideoTrack { sid: sid.clone(), publisher_id: identity.clone(), frames_rx: local_track.frames_rx.clone(), @@ -224,7 +242,11 @@ impl TestServer { .0 .lock() .updates_tx - .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new( + RemoteVideoTrack { + server_track: track.clone(), + }, + ))) .unwrap(); } } @@ -259,10 +281,10 @@ impl TestServer { } let sid = nanoid::nanoid!(17); - let track = Arc::new(RemoteAudioTrack { + let track = Arc::new(TestServerAudioTrack { sid: sid.clone(), publisher_id: identity.clone(), - running: AtomicBool::new(true), + muted: AtomicBool::new(false), }); let publication = Arc::new(RemoteTrackPublication); @@ -276,7 +298,10 @@ impl TestServer { .lock() .updates_tx .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( - track.clone(), + Arc::new(RemoteAudioTrack { + server_track: track.clone(), + room: Arc::downgrade(&client_room), + }), publication.clone(), )) .unwrap(); @@ -286,37 +311,123 @@ impl TestServer { Ok(sid) } + fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> { + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let room_name = claims.video.room.unwrap(); + let identity = claims.sub.unwrap(); + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + if let Some(track) = room + .audio_tracks + .iter_mut() + .find(|track| track.sid == track_sid) + { + track.muted.store(muted, SeqCst); + for (id, client_room) in room.client_rooms.iter() { + if *id != identity { + client_room + .0 + .lock() + .updates_tx + .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged { + track_id: track_sid.to_string(), + muted, + }) + .unwrap(); + } + } + } + Ok(()) + } + + fn is_track_muted(&self, token: &str, track_sid: &str) -> Option { + let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?; + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms.get_mut(&*room_name)?; + room.audio_tracks.iter().find_map(|track| { + if track.sid == track_sid { + Some(track.muted.load(SeqCst)) + } else { + None + } + }) + } + fn video_tracks(&self, token: String) -> Result>> { let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let room_name = claims.video.room.unwrap(); + let identity = claims.sub.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - Ok(room.video_tracks.clone()) + room.client_rooms + .get(identity.as_ref()) + .ok_or_else(|| anyhow!("not a participant in room"))?; + Ok(room + .video_tracks + .iter() + .map(|track| { + Arc::new(RemoteVideoTrack { + server_track: track.clone(), + }) + }) + .collect()) } fn audio_tracks(&self, token: String) -> Result>> { let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let room_name = claims.video.room.unwrap(); + let identity = claims.sub.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - Ok(room.audio_tracks.clone()) + let client_room = room + .client_rooms + .get(identity.as_ref()) + .ok_or_else(|| anyhow!("not a participant in room"))?; + Ok(room + .audio_tracks + .iter() + .map(|track| { + Arc::new(RemoteAudioTrack { + server_track: track.clone(), + room: Arc::downgrade(&client_room), + }) + }) + .collect()) } } #[derive(Default)] struct TestServerRoom { client_rooms: HashMap>, - video_tracks: Vec>, - audio_tracks: Vec>, + video_tracks: Vec>, + audio_tracks: Vec>, participant_permissions: HashMap, } +#[derive(Debug)] +struct TestServerVideoTrack { + sid: Sid, + publisher_id: Sid, + frames_rx: async_broadcast::Receiver, +} + +#[derive(Debug)] +struct TestServerAudioTrack { + sid: Sid, + publisher_id: Sid, + muted: AtomicBool, +} + impl TestServerRoom {} pub struct TestApiClient { @@ -387,6 +498,7 @@ struct RoomState { watch::Receiver, ), display_sources: Vec, + paused_audio_tracks: HashSet, updates_tx: async_broadcast::Sender, updates_rx: async_broadcast::Receiver, } @@ -399,6 +511,7 @@ impl Room { Arc::new(Self(Mutex::new(RoomState { connection: watch::channel_with(ConnectionState::Disconnected), display_sources: Default::default(), + paused_audio_tracks: Default::default(), updates_tx, updates_rx, }))) @@ -444,11 +557,12 @@ impl Room { .publish_video_track(this.token(), track) .await?; Ok(LocalTrackPublication { - muted: Default::default(), + room: Arc::downgrade(&this), sid, }) } } + pub fn publish_audio_track( self: &Arc, track: LocalAudioTrack, @@ -461,7 +575,7 @@ impl Room { .publish_audio_track(this.token(), &track) .await?; Ok(LocalTrackPublication { - muted: Default::default(), + room: Arc::downgrade(&this), sid, }) } @@ -561,20 +675,31 @@ impl Drop for Room { #[derive(Clone)] pub struct LocalTrackPublication { sid: String, - muted: Arc, + room: Weak, } impl LocalTrackPublication { pub fn set_mute(&self, mute: bool) -> impl Future> { - let muted = self.muted.clone(); + let sid = self.sid.clone(); + let room = self.room.clone(); async move { - muted.store(mute, SeqCst); - Ok(()) + if let Some(room) = room.upgrade() { + room.test_server() + .set_track_muted(&room.token(), &sid, mute) + } else { + Err(anyhow!("no such room")) + } } } pub fn is_muted(&self) -> bool { - self.muted.load(SeqCst) + if let Some(room) = self.room.upgrade() { + room.test_server() + .is_track_muted(&room.token(), &self.sid) + .unwrap_or(false) + } else { + false + } } pub fn sid(&self) -> String { @@ -622,47 +747,65 @@ impl LocalAudioTrack { #[derive(Debug)] pub struct RemoteVideoTrack { - sid: Sid, - publisher_id: Sid, - frames_rx: async_broadcast::Receiver, + server_track: Arc, } impl RemoteVideoTrack { pub fn sid(&self) -> &str { - &self.sid + &self.server_track.sid } pub fn publisher_id(&self) -> &str { - &self.publisher_id + &self.server_track.publisher_id } pub fn frames(&self) -> async_broadcast::Receiver { - self.frames_rx.clone() + self.server_track.frames_rx.clone() } } #[derive(Debug)] pub struct RemoteAudioTrack { - sid: Sid, - publisher_id: Sid, - running: AtomicBool, + server_track: Arc, + room: Weak, } impl RemoteAudioTrack { pub fn sid(&self) -> &str { - &self.sid + &self.server_track.sid } pub fn publisher_id(&self) -> &str { - &self.publisher_id + &self.server_track.publisher_id } pub fn start(&self) { - self.running.store(true, SeqCst); + if let Some(room) = self.room.upgrade() { + room.0 + .lock() + .paused_audio_tracks + .remove(&self.server_track.sid); + } } pub fn stop(&self) { - self.running.store(false, SeqCst); + if let Some(room) = self.room.upgrade() { + room.0 + .lock() + .paused_audio_tracks + .insert(self.server_track.sid.clone()); + } + } + + pub fn is_playing(&self) -> bool { + !self + .room + .upgrade() + .unwrap() + .0 + .lock() + .paused_audio_tracks + .contains(&self.server_track.sid) } }