Handle reconnects to the livekit server in which local tracks are unpublished (#3992)

Release notes:

* Fixed a bug where network interruptions could cause audio and screen
sharing to stop working without indicating that they were stopped, and
there was no way to restart the audio stream.
This commit is contained in:
Max Brunsfeld 2024-01-10 16:34:11 -08:00 committed by GitHub
commit cb5d4edc4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 314 additions and 202 deletions

View file

@ -15,10 +15,7 @@ use gpui::{
AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel, AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
}; };
use language::LanguageRegistry; use language::LanguageRegistry;
use live_kit_client::{ use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
RemoteVideoTrackUpdate,
};
use postage::{sink::Sink, stream::Stream, watch}; use postage::{sink::Sink, stream::Stream, watch};
use project::Project; use project::Project;
use settings::Settings as _; use settings::Settings as _;
@ -131,11 +128,11 @@ impl Room {
} }
}); });
let _maintain_video_tracks = cx.spawn({ let _handle_updates = cx.spawn({
let room = room.clone(); let room = room.clone();
move |this, mut cx| async move { move |this, mut cx| async move {
let mut track_video_changes = room.remote_video_track_updates(); let mut updates = room.updates();
while let Some(track_change) = track_video_changes.next().await { while let Some(update) = updates.next().await {
let this = if let Some(this) = this.upgrade() { let this = if let Some(this) = this.upgrade() {
this this
} else { } else {
@ -143,26 +140,7 @@ impl Room {
}; };
this.update(&mut cx, |this, cx| { this.update(&mut cx, |this, cx| {
this.remote_video_track_updated(track_change, cx).log_err() this.live_kit_room_updated(update, cx).log_err()
})
.ok();
}
}
});
let _maintain_audio_tracks = cx.spawn({
let room = room.clone();
|this, mut cx| async move {
let mut track_audio_changes = room.remote_audio_track_updates();
while let Some(track_change) = track_audio_changes.next().await {
let this = if let Some(this) = this.upgrade() {
this
} else {
break;
};
this.update(&mut cx, |this, cx| {
this.remote_audio_track_updated(track_change, cx).log_err()
}) })
.ok(); .ok();
} }
@ -195,7 +173,7 @@ impl Room {
deafened: false, deafened: false,
speaking: false, speaking: false,
_maintain_room, _maintain_room,
_maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], _handle_updates,
}) })
} else { } else {
None None
@ -877,8 +855,8 @@ impl Room {
.remote_audio_track_publications(&user.id.to_string()); .remote_audio_track_publications(&user.id.to_string());
for track in video_tracks { for track in video_tracks {
this.remote_video_track_updated( this.live_kit_room_updated(
RemoteVideoTrackUpdate::Subscribed(track), RoomUpdate::SubscribedToRemoteVideoTrack(track),
cx, cx,
) )
.log_err(); .log_err();
@ -887,8 +865,8 @@ impl Room {
for (track, publication) in for (track, publication) in
audio_tracks.iter().zip(publications.iter()) audio_tracks.iter().zip(publications.iter())
{ {
this.remote_audio_track_updated( this.live_kit_room_updated(
RemoteAudioTrackUpdate::Subscribed( RoomUpdate::SubscribedToRemoteAudioTrack(
track.clone(), track.clone(),
publication.clone(), publication.clone(),
), ),
@ -979,13 +957,13 @@ impl Room {
} }
} }
fn remote_video_track_updated( fn live_kit_room_updated(
&mut self, &mut self,
change: RemoteVideoTrackUpdate, update: RoomUpdate,
cx: &mut ModelContext<Self>, cx: &mut ModelContext<Self>,
) -> Result<()> { ) -> Result<()> {
match change { match update {
RemoteVideoTrackUpdate::Subscribed(track) => { RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
let user_id = track.publisher_id().parse()?; let user_id = track.publisher_id().parse()?;
let track_id = track.sid().to_string(); let track_id = track.sid().to_string();
let participant = self let participant = self
@ -997,7 +975,8 @@ impl Room {
participant_id: participant.peer_id, participant_id: participant.peer_id,
}); });
} }
RemoteVideoTrackUpdate::Unsubscribed {
RoomUpdate::UnsubscribedFromRemoteVideoTrack {
publisher_id, publisher_id,
track_id, track_id,
} => { } => {
@ -1011,19 +990,8 @@ impl Room {
participant_id: participant.peer_id, participant_id: participant.peer_id,
}); });
} }
}
cx.notify(); RoomUpdate::ActiveSpeakersChanged { speakers } => {
Ok(())
}
fn remote_audio_track_updated(
&mut self,
change: RemoteAudioTrackUpdate,
cx: &mut ModelContext<Self>,
) -> Result<()> {
match change {
RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
let mut speaker_ids = speakers let mut speaker_ids = speakers
.into_iter() .into_iter()
.filter_map(|speaker_sid| speaker_sid.parse().ok()) .filter_map(|speaker_sid| speaker_sid.parse().ok())
@ -1045,9 +1013,9 @@ impl Room {
} }
} }
} }
cx.notify();
} }
RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
let mut found = false; let mut found = false;
for participant in &mut self.remote_participants.values_mut() { for participant in &mut self.remote_participants.values_mut() {
for track in participant.audio_tracks.values() { for track in participant.audio_tracks.values() {
@ -1061,10 +1029,9 @@ impl Room {
break; break;
} }
} }
cx.notify();
} }
RemoteAudioTrackUpdate::Subscribed(track, publication) => {
RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
let user_id = track.publisher_id().parse()?; let user_id = track.publisher_id().parse()?;
let track_id = track.sid().to_string(); let track_id = track.sid().to_string();
let participant = self let participant = self
@ -1078,7 +1045,8 @@ impl Room {
participant_id: participant.peer_id, participant_id: participant.peer_id,
}); });
} }
RemoteAudioTrackUpdate::Unsubscribed {
RoomUpdate::UnsubscribedFromRemoteAudioTrack {
publisher_id, publisher_id,
track_id, track_id,
} => { } => {
@ -1092,6 +1060,28 @@ impl Room {
participant_id: participant.peer_id, participant_id: participant.peer_id,
}); });
} }
RoomUpdate::LocalAudioTrackUnpublished { publication } => {
log::info!("unpublished audio track {}", publication.sid());
if let Some(room) = &mut self.live_kit {
room.microphone_track = LocalTrack::None;
}
}
RoomUpdate::LocalVideoTrackUnpublished { publication } => {
log::info!("unpublished video track {}", publication.sid());
if let Some(room) = &mut self.live_kit {
room.screen_track = LocalTrack::None;
}
}
RoomUpdate::LocalAudioTrackPublished { publication } => {
log::info!("published audio track {}", publication.sid());
}
RoomUpdate::LocalVideoTrackPublished { publication } => {
log::info!("published video track {}", publication.sid());
}
} }
cx.notify(); cx.notify();
@ -1597,7 +1587,7 @@ struct LiveKitRoom {
speaking: bool, speaking: bool,
next_publish_id: usize, next_publish_id: usize,
_maintain_room: Task<()>, _maintain_room: Task<()>,
_maintain_tracks: [Task<()>; 2], _handle_updates: Task<()>,
} }
impl LiveKitRoom { impl LiveKitRoom {

View file

@ -12,6 +12,8 @@ class LKRoomDelegate: RoomDelegate {
var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void
var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void
var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
var onDidPublishOrUnpublishLocalAudioTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
var onDidPublishOrUnpublishLocalVideoTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
init( init(
data: UnsafeRawPointer, data: UnsafeRawPointer,
@ -21,7 +23,10 @@ class LKRoomDelegate: RoomDelegate {
onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void, onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void,
onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void) onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void,
onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
)
{ {
self.data = data self.data = data
self.onDidDisconnect = onDidDisconnect self.onDidDisconnect = onDidDisconnect
@ -31,6 +36,8 @@ class LKRoomDelegate: RoomDelegate {
self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack
self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack
self.onActiveSpeakersChanged = onActiveSpeakersChanged self.onActiveSpeakersChanged = onActiveSpeakersChanged
self.onDidPublishOrUnpublishLocalAudioTrack = onDidPublishOrUnpublishLocalAudioTrack
self.onDidPublishOrUnpublishLocalVideoTrack = onDidPublishOrUnpublishLocalVideoTrack
} }
func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) { func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) {
@ -65,6 +72,22 @@ class LKRoomDelegate: RoomDelegate {
self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString) self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString)
} }
} }
func room(_ room: Room, localParticipant: LocalParticipant, didPublish publication: LocalTrackPublication) {
if publication.kind == .video {
self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true)
} else if publication.kind == .audio {
self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true)
}
}
func room(_ room: Room, localParticipant: LocalParticipant, didUnpublish publication: LocalTrackPublication) {
if publication.kind == .video {
self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false)
} else if publication.kind == .audio {
self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false)
}
}
} }
class LKVideoRenderer: NSObject, VideoRenderer { class LKVideoRenderer: NSObject, VideoRenderer {
@ -109,7 +132,9 @@ public func LKRoomDelegateCreate(
onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void, onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void,
onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void,
onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
) -> UnsafeMutableRawPointer { ) -> UnsafeMutableRawPointer {
let delegate = LKRoomDelegate( let delegate = LKRoomDelegate(
data: data, data: data,
@ -119,7 +144,9 @@ public func LKRoomDelegateCreate(
onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack, onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack,
onActiveSpeakersChanged: onActiveSpeakerChanged, onActiveSpeakersChanged: onActiveSpeakerChanged,
onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack, onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack,
onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack,
onDidPublishOrUnpublishLocalAudioTrack: onDidPublishOrUnpublishLocalAudioTrack,
onDidPublishOrUnpublishLocalVideoTrack: onDidPublishOrUnpublishLocalVideoTrack
) )
return Unmanaged.passRetained(delegate).toOpaque() return Unmanaged.passRetained(delegate).toOpaque()
} }
@ -292,6 +319,14 @@ public func LKLocalTrackPublicationSetMute(
} }
} }
@_cdecl("LKLocalTrackPublicationIsMuted")
public func LKLocalTrackPublicationIsMuted(
publication: UnsafeRawPointer
) -> Bool {
let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
return publication.muted
}
@_cdecl("LKRemoteTrackPublicationSetEnabled") @_cdecl("LKRemoteTrackPublicationSetEnabled")
public func LKRemoteTrackPublicationSetEnabled( public func LKRemoteTrackPublicationSetEnabled(
publication: UnsafeRawPointer, publication: UnsafeRawPointer,
@ -325,3 +360,12 @@ public func LKRemoteTrackPublicationGetSid(
return publication.sid as CFString return publication.sid as CFString
} }
@_cdecl("LKLocalTrackPublicationGetSid")
public func LKLocalTrackPublicationGetSid(
publication: UnsafeRawPointer
) -> CFString {
let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
return publication.sid as CFString
}

View file

@ -2,9 +2,7 @@ use std::{sync::Arc, time::Duration};
use futures::StreamExt; use futures::StreamExt;
use gpui::{actions, KeyBinding, Menu, MenuItem}; use gpui::{actions, KeyBinding, Menu, MenuItem};
use live_kit_client::{ use live_kit_client::{LocalAudioTrack, LocalVideoTrack, Room, RoomUpdate};
LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room,
};
use live_kit_server::token::{self, VideoGrant}; use live_kit_server::token::{self, VideoGrant};
use log::LevelFilter; use log::LevelFilter;
use simplelog::SimpleLogger; use simplelog::SimpleLogger;
@ -60,12 +58,12 @@ fn main() {
let room_b = Room::new(); let room_b = Room::new();
room_b.connect(&live_kit_url, &user2_token).await.unwrap(); room_b.connect(&live_kit_url, &user2_token).await.unwrap();
let mut audio_track_updates = room_b.remote_audio_track_updates(); let mut room_updates = room_b.updates();
let audio_track = LocalAudioTrack::create(); let audio_track = LocalAudioTrack::create();
let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap();
if let RemoteAudioTrackUpdate::Subscribed(track, _) = if let RoomUpdate::SubscribedToRemoteAudioTrack(track, _) =
audio_track_updates.next().await.unwrap() room_updates.next().await.unwrap()
{ {
let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
assert_eq!(remote_tracks.len(), 1); assert_eq!(remote_tracks.len(), 1);
@ -78,8 +76,8 @@ fn main() {
audio_track_publication.set_mute(true).await.unwrap(); audio_track_publication.set_mute(true).await.unwrap();
println!("waiting for mute changed!"); println!("waiting for mute changed!");
if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } =
audio_track_updates.next().await.unwrap() room_updates.next().await.unwrap()
{ {
let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
assert_eq!(remote_tracks[0].sid(), track_id); assert_eq!(remote_tracks[0].sid(), track_id);
@ -90,8 +88,8 @@ fn main() {
audio_track_publication.set_mute(false).await.unwrap(); audio_track_publication.set_mute(false).await.unwrap();
if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } =
audio_track_updates.next().await.unwrap() room_updates.next().await.unwrap()
{ {
let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
assert_eq!(remote_tracks[0].sid(), track_id); assert_eq!(remote_tracks[0].sid(), track_id);
@ -110,13 +108,13 @@ fn main() {
room_a.unpublish_track(audio_track_publication); room_a.unpublish_track(audio_track_publication);
// Clear out any active speakers changed messages // Clear out any active speakers changed messages
let mut next = audio_track_updates.next().await.unwrap(); let mut next = room_updates.next().await.unwrap();
while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { while let RoomUpdate::ActiveSpeakersChanged { speakers } = next {
println!("Speakers changed: {:?}", speakers); println!("Speakers changed: {:?}", speakers);
next = audio_track_updates.next().await.unwrap(); next = room_updates.next().await.unwrap();
} }
if let RemoteAudioTrackUpdate::Unsubscribed { if let RoomUpdate::UnsubscribedFromRemoteAudioTrack {
publisher_id, publisher_id,
track_id, track_id,
} = next } = next
@ -128,7 +126,6 @@ fn main() {
panic!("unexpected message"); panic!("unexpected message");
} }
let mut video_track_updates = room_b.remote_video_track_updates();
let displays = room_a.display_sources().await.unwrap(); let displays = room_a.display_sources().await.unwrap();
let display = displays.into_iter().next().unwrap(); let display = displays.into_iter().next().unwrap();
@ -136,8 +133,8 @@ fn main() {
let local_video_track_publication = let local_video_track_publication =
room_a.publish_video_track(local_video_track).await.unwrap(); room_a.publish_video_track(local_video_track).await.unwrap();
if let RemoteVideoTrackUpdate::Subscribed(track) = if let RoomUpdate::SubscribedToRemoteVideoTrack(track) =
video_track_updates.next().await.unwrap() room_updates.next().await.unwrap()
{ {
let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); let remote_video_tracks = room_b.remote_video_tracks("test-participant-1");
assert_eq!(remote_video_tracks.len(), 1); assert_eq!(remote_video_tracks.len(), 1);
@ -152,10 +149,10 @@ fn main() {
.pop() .pop()
.unwrap(); .unwrap();
room_a.unpublish_track(local_video_track_publication); room_a.unpublish_track(local_video_track_publication);
if let RemoteVideoTrackUpdate::Unsubscribed { if let RoomUpdate::UnsubscribedFromRemoteVideoTrack {
publisher_id, publisher_id,
track_id, track_id,
} = video_track_updates.next().await.unwrap() } = room_updates.next().await.unwrap()
{ {
assert_eq!(publisher_id, "test-participant-1"); assert_eq!(publisher_id, "test-participant-1");
assert_eq!(remote_video_track.sid(), track_id); assert_eq!(remote_video_track.sid(), track_id);

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
#[cfg(not(any(test, feature = "test-support")))] #[cfg(not(any(test, feature = "test-support")))]
pub mod prod; pub mod prod;
@ -9,3 +11,25 @@ pub mod test;
#[cfg(any(test, feature = "test-support"))] #[cfg(any(test, feature = "test-support"))]
pub use test::*; pub use test::*;
pub type Sid = String;
#[derive(Clone, Eq, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connected { url: String, token: String },
}
#[derive(Clone)]
pub enum RoomUpdate {
ActiveSpeakersChanged { speakers: Vec<Sid> },
RemoteAudioTrackMuteChanged { track_id: Sid, muted: bool },
SubscribedToRemoteVideoTrack(Arc<RemoteVideoTrack>),
SubscribedToRemoteAudioTrack(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
UnsubscribedFromRemoteVideoTrack { publisher_id: Sid, track_id: Sid },
UnsubscribedFromRemoteAudioTrack { publisher_id: Sid, track_id: Sid },
LocalAudioTrackPublished { publication: LocalTrackPublication },
LocalAudioTrackUnpublished { publication: LocalTrackPublication },
LocalVideoTrackPublished { publication: LocalTrackPublication },
LocalVideoTrackUnpublished { publication: LocalTrackPublication },
}

View file

@ -1,3 +1,4 @@
use crate::{ConnectionState, RoomUpdate, Sid};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use core_foundation::{ use core_foundation::{
array::{CFArray, CFArrayRef}, array::{CFArray, CFArrayRef},
@ -76,6 +77,16 @@ extern "C" {
publisher_id: CFStringRef, publisher_id: CFStringRef,
track_id: CFStringRef, track_id: CFStringRef,
), ),
on_did_publish_or_unpublish_local_audio_track: extern "C" fn(
callback_data: *mut c_void,
publication: swift::LocalTrackPublication,
is_published: bool,
),
on_did_publish_or_unpublish_local_video_track: extern "C" fn(
callback_data: *mut c_void,
publication: swift::LocalTrackPublication,
is_published: bool,
),
) -> swift::RoomDelegate; ) -> swift::RoomDelegate;
fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room; fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room;
@ -151,26 +162,19 @@ extern "C" {
callback_data: *mut c_void, callback_data: *mut c_void,
); );
fn LKLocalTrackPublicationIsMuted(publication: swift::LocalTrackPublication) -> bool;
fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool; fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool;
fn LKLocalTrackPublicationGetSid(publication: swift::LocalTrackPublication) -> CFStringRef;
fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef;
} }
pub type Sid = String;
#[derive(Clone, Eq, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connected { url: String, token: String },
}
pub struct Room { pub struct Room {
native_room: swift::Room, native_room: swift::Room,
connection: Mutex<( connection: Mutex<(
watch::Sender<ConnectionState>, watch::Sender<ConnectionState>,
watch::Receiver<ConnectionState>, watch::Receiver<ConnectionState>,
)>, )>,
remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>, update_subscribers: Mutex<Vec<mpsc::UnboundedSender<RoomUpdate>>>,
remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
_delegate: RoomDelegate, _delegate: RoomDelegate,
} }
@ -181,8 +185,7 @@ impl Room {
Self { Self {
native_room: unsafe { LKRoomCreate(delegate.native_delegate) }, native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)), connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
remote_audio_track_subscribers: Default::default(), update_subscribers: Default::default(),
remote_video_track_subscribers: Default::default(),
_delegate: delegate, _delegate: delegate,
} }
}) })
@ -397,15 +400,9 @@ impl Room {
} }
} }
pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> { pub fn updates(&self) -> mpsc::UnboundedReceiver<RoomUpdate> {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
self.remote_audio_track_subscribers.lock().push(tx); self.update_subscribers.lock().push(tx);
rx
}
pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
let (tx, rx) = mpsc::unbounded();
self.remote_video_track_subscribers.lock().push(tx);
rx rx
} }
@ -416,8 +413,8 @@ impl Room {
) { ) {
let track = Arc::new(track); let track = Arc::new(track);
let publication = Arc::new(publication); let publication = Arc::new(publication);
self.remote_audio_track_subscribers.lock().retain(|tx| { self.update_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed( tx.unbounded_send(RoomUpdate::SubscribedToRemoteAudioTrack(
track.clone(), track.clone(),
publication.clone(), publication.clone(),
)) ))
@ -426,8 +423,8 @@ impl Room {
} }
fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) { fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
self.remote_audio_track_subscribers.lock().retain(|tx| { self.update_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed { tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteAudioTrack {
publisher_id: publisher_id.clone(), publisher_id: publisher_id.clone(),
track_id: track_id.clone(), track_id: track_id.clone(),
}) })
@ -436,8 +433,8 @@ impl Room {
} }
fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) { fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
self.remote_audio_track_subscribers.lock().retain(|tx| { self.update_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged { tx.unbounded_send(RoomUpdate::RemoteAudioTrackMuteChanged {
track_id: track_id.clone(), track_id: track_id.clone(),
muted, muted,
}) })
@ -445,12 +442,9 @@ impl Room {
}); });
} }
// A vec of publisher IDs
fn active_speakers_changed(&self, speakers: Vec<String>) { fn active_speakers_changed(&self, speakers: Vec<String>) {
self.remote_audio_track_subscribers self.update_subscribers.lock().retain(move |tx| {
.lock() tx.unbounded_send(RoomUpdate::ActiveSpeakersChanged {
.retain(move |tx| {
tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
speakers: speakers.clone(), speakers: speakers.clone(),
}) })
.is_ok() .is_ok()
@ -459,15 +453,15 @@ impl Room {
fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) { fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
let track = Arc::new(track); let track = Arc::new(track);
self.remote_video_track_subscribers.lock().retain(|tx| { self.update_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone())) tx.unbounded_send(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
.is_ok() .is_ok()
}); });
} }
fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) { fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
self.remote_video_track_subscribers.lock().retain(|tx| { self.update_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed { tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteVideoTrack {
publisher_id: publisher_id.clone(), publisher_id: publisher_id.clone(),
track_id: track_id.clone(), track_id: track_id.clone(),
}) })
@ -529,6 +523,8 @@ impl RoomDelegate {
Self::on_active_speakers_changed, Self::on_active_speakers_changed,
Self::on_did_subscribe_to_remote_video_track, Self::on_did_subscribe_to_remote_video_track,
Self::on_did_unsubscribe_from_remote_video_track, Self::on_did_unsubscribe_from_remote_video_track,
Self::on_did_publish_or_unpublish_local_audio_track,
Self::on_did_publish_or_unpublish_local_video_track,
) )
}; };
Self { Self {
@ -642,6 +638,46 @@ impl RoomDelegate {
} }
let _ = Weak::into_raw(room); let _ = Weak::into_raw(room);
} }
extern "C" fn on_did_publish_or_unpublish_local_audio_track(
room: *mut c_void,
publication: swift::LocalTrackPublication,
is_published: bool,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
if let Some(room) = room.upgrade() {
let publication = LocalTrackPublication::new(publication);
let update = if is_published {
RoomUpdate::LocalAudioTrackPublished { publication }
} else {
RoomUpdate::LocalAudioTrackUnpublished { publication }
};
room.update_subscribers
.lock()
.retain(|tx| tx.unbounded_send(update.clone()).is_ok());
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_did_publish_or_unpublish_local_video_track(
room: *mut c_void,
publication: swift::LocalTrackPublication,
is_published: bool,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
if let Some(room) = room.upgrade() {
let publication = LocalTrackPublication::new(publication);
let update = if is_published {
RoomUpdate::LocalVideoTrackPublished { publication }
} else {
RoomUpdate::LocalVideoTrackUnpublished { publication }
};
room.update_subscribers
.lock()
.retain(|tx| tx.unbounded_send(update.clone()).is_ok());
}
let _ = Weak::into_raw(room);
}
} }
impl Drop for RoomDelegate { impl Drop for RoomDelegate {
@ -691,6 +727,10 @@ impl LocalTrackPublication {
Self(native_track_publication) Self(native_track_publication)
} }
pub fn sid(&self) -> String {
unsafe { CFString::wrap_under_get_rule(LKLocalTrackPublicationGetSid(self.0)).to_string() }
}
pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> { pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
let (tx, rx) = futures::channel::oneshot::channel(); let (tx, rx) = futures::channel::oneshot::channel();
@ -715,6 +755,19 @@ impl LocalTrackPublication {
async move { rx.await.unwrap() } async move { rx.await.unwrap() }
} }
pub fn is_muted(&self) -> bool {
unsafe { LKLocalTrackPublicationIsMuted(self.0) }
}
}
impl Clone for LocalTrackPublication {
fn clone(&self) -> Self {
unsafe {
CFRetain(self.0 .0);
}
Self(self.0)
}
} }
impl Drop for LocalTrackPublication { impl Drop for LocalTrackPublication {
@ -889,18 +942,6 @@ impl Drop for RemoteVideoTrack {
} }
} }
pub enum RemoteVideoTrackUpdate {
Subscribed(Arc<RemoteVideoTrack>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
pub enum RemoteAudioTrackUpdate {
ActiveSpeakersChanged { speakers: Vec<Sid> },
MuteChanged { track_id: Sid, muted: bool },
Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
pub struct MacOSDisplay(swift::MacOSDisplay); pub struct MacOSDisplay(swift::MacOSDisplay);
impl MacOSDisplay { impl MacOSDisplay {

View file

@ -1,3 +1,4 @@
use crate::{ConnectionState, RoomUpdate, Sid};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use collections::{BTreeMap, HashMap}; use collections::{BTreeMap, HashMap};
@ -7,7 +8,14 @@ use live_kit_server::{proto, token};
use media::core_video::CVImageBuffer; use media::core_video::CVImageBuffer;
use parking_lot::Mutex; use parking_lot::Mutex;
use postage::watch; use postage::watch;
use std::{future::Future, mem, sync::Arc}; use std::{
future::Future,
mem,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc,
},
};
static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new()); static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
@ -104,9 +112,8 @@ impl TestServer {
client_room client_room
.0 .0
.lock() .lock()
.video_track_updates .updates_tx
.0 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
.try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
.unwrap(); .unwrap();
} }
room.client_rooms.insert(identity, client_room); room.client_rooms.insert(identity, client_room);
@ -176,7 +183,11 @@ impl TestServer {
} }
} }
async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { async fn publish_video_track(
&self,
token: String,
local_track: LocalVideoTrack,
) -> Result<Sid> {
self.executor.simulate_random_delay().await; self.executor.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string(); let identity = claims.sub.unwrap().to_string();
@ -198,8 +209,9 @@ impl TestServer {
return Err(anyhow!("user is not allowed to publish")); return Err(anyhow!("user is not allowed to publish"));
} }
let sid = nanoid::nanoid!(17);
let track = Arc::new(RemoteVideoTrack { let track = Arc::new(RemoteVideoTrack {
sid: nanoid::nanoid!(17), sid: sid.clone(),
publisher_id: identity.clone(), publisher_id: identity.clone(),
frames_rx: local_track.frames_rx.clone(), frames_rx: local_track.frames_rx.clone(),
}); });
@ -211,21 +223,20 @@ impl TestServer {
let _ = client_room let _ = client_room
.0 .0
.lock() .lock()
.video_track_updates .updates_tx
.0 .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
.try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
.unwrap(); .unwrap();
} }
} }
Ok(()) Ok(sid)
} }
async fn publish_audio_track( async fn publish_audio_track(
&self, &self,
token: String, token: String,
_local_track: &LocalAudioTrack, _local_track: &LocalAudioTrack,
) -> Result<()> { ) -> Result<Sid> {
self.executor.simulate_random_delay().await; self.executor.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string(); let identity = claims.sub.unwrap().to_string();
@ -247,8 +258,9 @@ impl TestServer {
return Err(anyhow!("user is not allowed to publish")); return Err(anyhow!("user is not allowed to publish"));
} }
let sid = nanoid::nanoid!(17);
let track = Arc::new(RemoteAudioTrack { let track = Arc::new(RemoteAudioTrack {
sid: nanoid::nanoid!(17), sid: sid.clone(),
publisher_id: identity.clone(), publisher_id: identity.clone(),
}); });
@ -261,9 +273,8 @@ impl TestServer {
let _ = client_room let _ = client_room
.0 .0
.lock() .lock()
.audio_track_updates .updates_tx
.0 .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
.try_broadcast(RemoteAudioTrackUpdate::Subscribed(
track.clone(), track.clone(),
publication.clone(), publication.clone(),
)) ))
@ -271,7 +282,7 @@ impl TestServer {
} }
} }
Ok(()) Ok(sid)
} }
fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> { fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
@ -369,39 +380,26 @@ impl live_kit_server::api::Client for TestApiClient {
} }
} }
pub type Sid = String;
struct RoomState { struct RoomState {
connection: ( connection: (
watch::Sender<ConnectionState>, watch::Sender<ConnectionState>,
watch::Receiver<ConnectionState>, watch::Receiver<ConnectionState>,
), ),
display_sources: Vec<MacOSDisplay>, display_sources: Vec<MacOSDisplay>,
audio_track_updates: ( updates_tx: async_broadcast::Sender<RoomUpdate>,
async_broadcast::Sender<RemoteAudioTrackUpdate>, updates_rx: async_broadcast::Receiver<RoomUpdate>,
async_broadcast::Receiver<RemoteAudioTrackUpdate>,
),
video_track_updates: (
async_broadcast::Sender<RemoteVideoTrackUpdate>,
async_broadcast::Receiver<RemoteVideoTrackUpdate>,
),
}
#[derive(Clone, Eq, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connected { url: String, token: String },
} }
pub struct Room(Mutex<RoomState>); pub struct Room(Mutex<RoomState>);
impl Room { impl Room {
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
Arc::new(Self(Mutex::new(RoomState { Arc::new(Self(Mutex::new(RoomState {
connection: watch::channel_with(ConnectionState::Disconnected), connection: watch::channel_with(ConnectionState::Disconnected),
display_sources: Default::default(), display_sources: Default::default(),
video_track_updates: async_broadcast::broadcast(128), updates_tx,
audio_track_updates: async_broadcast::broadcast(128), updates_rx,
}))) })))
} }
@ -440,10 +438,14 @@ impl Room {
let this = self.clone(); let this = self.clone();
let track = track.clone(); let track = track.clone();
async move { async move {
this.test_server() let sid = this
.test_server()
.publish_video_track(this.token(), track) .publish_video_track(this.token(), track)
.await?; .await?;
Ok(LocalTrackPublication) Ok(LocalTrackPublication {
muted: Default::default(),
sid,
})
} }
} }
pub fn publish_audio_track( pub fn publish_audio_track(
@ -453,10 +455,14 @@ impl Room {
let this = self.clone(); let this = self.clone();
let track = track.clone(); let track = track.clone();
async move { async move {
this.test_server() let sid = this
.test_server()
.publish_audio_track(this.token(), &track) .publish_audio_track(this.token(), &track)
.await?; .await?;
Ok(LocalTrackPublication) Ok(LocalTrackPublication {
muted: Default::default(),
sid,
})
} }
} }
@ -505,12 +511,8 @@ impl Room {
.collect() .collect()
} }
pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> { pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
self.0.lock().audio_track_updates.1.clone() self.0.lock().updates_rx.clone()
}
pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
self.0.lock().video_track_updates.1.clone()
} }
pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) { pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
@ -555,11 +557,27 @@ impl Drop for Room {
} }
} }
pub struct LocalTrackPublication; #[derive(Clone)]
pub struct LocalTrackPublication {
sid: String,
muted: Arc<AtomicBool>,
}
impl LocalTrackPublication { impl LocalTrackPublication {
pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> { pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
async { Ok(()) } let muted = self.muted.clone();
async move {
muted.store(mute, SeqCst);
Ok(())
}
}
pub fn is_muted(&self) -> bool {
self.muted.load(SeqCst)
}
pub fn sid(&self) -> String {
self.sid.clone()
} }
} }
@ -646,20 +664,6 @@ impl RemoteAudioTrack {
} }
} }
#[derive(Clone)]
pub enum RemoteVideoTrackUpdate {
Subscribed(Arc<RemoteVideoTrack>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
#[derive(Clone)]
pub enum RemoteAudioTrackUpdate {
ActiveSpeakersChanged { speakers: Vec<Sid> },
MuteChanged { track_id: Sid, muted: bool },
Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
#[derive(Clone)] #[derive(Clone)]
pub struct MacOSDisplay { pub struct MacOSDisplay {
frames: ( frames: (

View file

@ -4,20 +4,28 @@ const { spawn, execFileSync } = require("child_process");
const RESOLUTION_REGEX = /(\d+) x (\d+)/; const RESOLUTION_REGEX = /(\d+) x (\d+)/;
const DIGIT_FLAG_REGEX = /^--?(\d+)$/; const DIGIT_FLAG_REGEX = /^--?(\d+)$/;
const RELEASE_MODE = "--release";
const args = process.argv.slice(2);
// Parse the number of Zed instances to spawn. // Parse the number of Zed instances to spawn.
let instanceCount = 1; let instanceCount = 1;
const digitMatch = args[0]?.match(DIGIT_FLAG_REGEX); let isReleaseMode = false;
if (digitMatch) { let isTop = false;
const args = process.argv.slice(2);
for (const arg of args) {
const digitMatch = arg.match(DIGIT_FLAG_REGEX);
if (digitMatch) {
instanceCount = parseInt(digitMatch[1]); instanceCount = parseInt(digitMatch[1]);
args.shift(); continue;
} }
const isReleaseMode = args.some((arg) => arg === RELEASE_MODE);
if (instanceCount > 4) { if (arg == "--release") {
throw new Error("Cannot spawn more than 4 instances"); isReleaseMode = true;
continue;
}
if (arg == "--top") {
isTop = true;
}
} }
// Parse the resolution of the main screen // Parse the resolution of the main screen
@ -34,7 +42,11 @@ if (!mainDisplayResolution) {
throw new Error("Could not parse screen resolution"); throw new Error("Could not parse screen resolution");
} }
const screenWidth = parseInt(mainDisplayResolution[1]); const screenWidth = parseInt(mainDisplayResolution[1]);
const screenHeight = parseInt(mainDisplayResolution[2]); let screenHeight = parseInt(mainDisplayResolution[2]);
if (isTop) {
screenHeight = Math.floor(screenHeight / 2);
}
// Determine the window size for each instance // Determine the window size for each instance
let instanceWidth = screenWidth; let instanceWidth = screenWidth;