Add and enhance tests for muting/deafening, fix exposed logic errors

This commit is contained in:
Max Brunsfeld 2024-01-15 14:03:38 -08:00
parent 51218811cf
commit e90794d3ec
4 changed files with 375 additions and 42 deletions

View file

@ -1,7 +1,7 @@
use crate::{ConnectionState, RoomUpdate, Sid};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use collections::{BTreeMap, HashMap};
use collections::{BTreeMap, HashMap, HashSet};
use futures::Stream;
use gpui::BackgroundExecutor;
use live_kit_server::{proto, token};
@ -13,7 +13,7 @@ use std::{
mem,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc,
Arc, Weak,
},
};
@ -113,7 +113,25 @@ impl TestServer {
.0
.lock()
.updates_tx
.try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
.try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
RemoteVideoTrack {
server_track: track.clone(),
},
)))
.unwrap();
}
for track in &room.audio_tracks {
client_room
.0
.lock()
.updates_tx
.try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
Arc::new(RemoteAudioTrack {
server_track: track.clone(),
room: Arc::downgrade(&client_room),
}),
Arc::new(RemoteTrackPublication),
))
.unwrap();
}
room.client_rooms.insert(identity, client_room);
@ -210,7 +228,7 @@ impl TestServer {
}
let sid = nanoid::nanoid!(17);
let track = Arc::new(RemoteVideoTrack {
let track = Arc::new(TestServerVideoTrack {
sid: sid.clone(),
publisher_id: identity.clone(),
frames_rx: local_track.frames_rx.clone(),
@ -224,7 +242,11 @@ impl TestServer {
.0
.lock()
.updates_tx
.try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
.try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new(
RemoteVideoTrack {
server_track: track.clone(),
},
)))
.unwrap();
}
}
@ -259,10 +281,10 @@ impl TestServer {
}
let sid = nanoid::nanoid!(17);
let track = Arc::new(RemoteAudioTrack {
let track = Arc::new(TestServerAudioTrack {
sid: sid.clone(),
publisher_id: identity.clone(),
running: AtomicBool::new(true),
muted: AtomicBool::new(false),
});
let publication = Arc::new(RemoteTrackPublication);
@ -276,7 +298,10 @@ impl TestServer {
.lock()
.updates_tx
.try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
track.clone(),
Arc::new(RemoteAudioTrack {
server_track: track.clone(),
room: Arc::downgrade(&client_room),
}),
publication.clone(),
))
.unwrap();
@ -286,37 +311,123 @@ impl TestServer {
Ok(sid)
}
fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> {
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let room_name = claims.video.room.unwrap();
let identity = claims.sub.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
if let Some(track) = room
.audio_tracks
.iter_mut()
.find(|track| track.sid == track_sid)
{
track.muted.store(muted, SeqCst);
for (id, client_room) in room.client_rooms.iter() {
if *id != identity {
client_room
.0
.lock()
.updates_tx
.try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged {
track_id: track_sid.to_string(),
muted,
})
.unwrap();
}
}
}
Ok(())
}
fn is_track_muted(&self, token: &str, track_sid: &str) -> Option<bool> {
let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?;
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms.get_mut(&*room_name)?;
room.audio_tracks.iter().find_map(|track| {
if track.sid == track_sid {
Some(track.muted.load(SeqCst))
} else {
None
}
})
}
fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let room_name = claims.video.room.unwrap();
let identity = claims.sub.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
Ok(room.video_tracks.clone())
room.client_rooms
.get(identity.as_ref())
.ok_or_else(|| anyhow!("not a participant in room"))?;
Ok(room
.video_tracks
.iter()
.map(|track| {
Arc::new(RemoteVideoTrack {
server_track: track.clone(),
})
})
.collect())
}
fn audio_tracks(&self, token: String) -> Result<Vec<Arc<RemoteAudioTrack>>> {
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let room_name = claims.video.room.unwrap();
let identity = claims.sub.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
Ok(room.audio_tracks.clone())
let client_room = room
.client_rooms
.get(identity.as_ref())
.ok_or_else(|| anyhow!("not a participant in room"))?;
Ok(room
.audio_tracks
.iter()
.map(|track| {
Arc::new(RemoteAudioTrack {
server_track: track.clone(),
room: Arc::downgrade(&client_room),
})
})
.collect())
}
}
#[derive(Default)]
struct TestServerRoom {
client_rooms: HashMap<Sid, Arc<Room>>,
video_tracks: Vec<Arc<RemoteVideoTrack>>,
audio_tracks: Vec<Arc<RemoteAudioTrack>>,
video_tracks: Vec<Arc<TestServerVideoTrack>>,
audio_tracks: Vec<Arc<TestServerAudioTrack>>,
participant_permissions: HashMap<Sid, proto::ParticipantPermission>,
}
#[derive(Debug)]
struct TestServerVideoTrack {
sid: Sid,
publisher_id: Sid,
frames_rx: async_broadcast::Receiver<Frame>,
}
#[derive(Debug)]
struct TestServerAudioTrack {
sid: Sid,
publisher_id: Sid,
muted: AtomicBool,
}
impl TestServerRoom {}
pub struct TestApiClient {
@ -387,6 +498,7 @@ struct RoomState {
watch::Receiver<ConnectionState>,
),
display_sources: Vec<MacOSDisplay>,
paused_audio_tracks: HashSet<Sid>,
updates_tx: async_broadcast::Sender<RoomUpdate>,
updates_rx: async_broadcast::Receiver<RoomUpdate>,
}
@ -399,6 +511,7 @@ impl Room {
Arc::new(Self(Mutex::new(RoomState {
connection: watch::channel_with(ConnectionState::Disconnected),
display_sources: Default::default(),
paused_audio_tracks: Default::default(),
updates_tx,
updates_rx,
})))
@ -444,11 +557,12 @@ impl Room {
.publish_video_track(this.token(), track)
.await?;
Ok(LocalTrackPublication {
muted: Default::default(),
room: Arc::downgrade(&this),
sid,
})
}
}
pub fn publish_audio_track(
self: &Arc<Self>,
track: LocalAudioTrack,
@ -461,7 +575,7 @@ impl Room {
.publish_audio_track(this.token(), &track)
.await?;
Ok(LocalTrackPublication {
muted: Default::default(),
room: Arc::downgrade(&this),
sid,
})
}
@ -561,20 +675,31 @@ impl Drop for Room {
#[derive(Clone)]
pub struct LocalTrackPublication {
sid: String,
muted: Arc<AtomicBool>,
room: Weak<Room>,
}
impl LocalTrackPublication {
pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
let muted = self.muted.clone();
let sid = self.sid.clone();
let room = self.room.clone();
async move {
muted.store(mute, SeqCst);
Ok(())
if let Some(room) = room.upgrade() {
room.test_server()
.set_track_muted(&room.token(), &sid, mute)
} else {
Err(anyhow!("no such room"))
}
}
}
pub fn is_muted(&self) -> bool {
self.muted.load(SeqCst)
if let Some(room) = self.room.upgrade() {
room.test_server()
.is_track_muted(&room.token(), &self.sid)
.unwrap_or(false)
} else {
false
}
}
pub fn sid(&self) -> String {
@ -622,47 +747,65 @@ impl LocalAudioTrack {
#[derive(Debug)]
pub struct RemoteVideoTrack {
sid: Sid,
publisher_id: Sid,
frames_rx: async_broadcast::Receiver<Frame>,
server_track: Arc<TestServerVideoTrack>,
}
impl RemoteVideoTrack {
pub fn sid(&self) -> &str {
&self.sid
&self.server_track.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
&self.server_track.publisher_id
}
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
self.frames_rx.clone()
self.server_track.frames_rx.clone()
}
}
#[derive(Debug)]
pub struct RemoteAudioTrack {
sid: Sid,
publisher_id: Sid,
running: AtomicBool,
server_track: Arc<TestServerAudioTrack>,
room: Weak<Room>,
}
impl RemoteAudioTrack {
pub fn sid(&self) -> &str {
&self.sid
&self.server_track.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
&self.server_track.publisher_id
}
pub fn start(&self) {
self.running.store(true, SeqCst);
if let Some(room) = self.room.upgrade() {
room.0
.lock()
.paused_audio_tracks
.remove(&self.server_track.sid);
}
}
pub fn stop(&self) {
self.running.store(false, SeqCst);
if let Some(room) = self.room.upgrade() {
room.0
.lock()
.paused_audio_tracks
.insert(self.server_track.sid.clone());
}
}
pub fn is_playing(&self) -> bool {
!self
.room
.upgrade()
.unwrap()
.0
.lock()
.paused_audio_tracks
.contains(&self.server_track.sid)
}
}