From 75fdaeb56f7e71fffa79b8652a0f80cf0440ffe3 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 10 Jan 2024 16:08:39 -0800 Subject: [PATCH] Detect when a track is unpublished due to reconnecting to livekit Co-authored-by: Julia --- crates/call/src/room.rs | 22 ++++++ .../Sources/LiveKitBridge/LiveKitBridge.swift | 50 ++++++++++++- crates/live_kit_client/src/live_kit_client.rs | 4 ++ crates/live_kit_client/src/prod.rs | 71 +++++++++++++++++++ crates/live_kit_client/src/test.rs | 65 +++++++++++++---- 5 files changed, 195 insertions(+), 17 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 877afceff3..04e883e686 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -1060,6 +1060,28 @@ impl Room { participant_id: participant.peer_id, }); } + + RoomUpdate::LocalAudioTrackUnpublished { publication } => { + log::info!("unpublished audio track {}", publication.sid()); + if let Some(room) = &mut self.live_kit { + room.microphone_track = LocalTrack::None; + } + } + + RoomUpdate::LocalVideoTrackUnpublished { publication } => { + log::info!("unpublished video track {}", publication.sid()); + if let Some(room) = &mut self.live_kit { + room.screen_track = LocalTrack::None; + } + } + + RoomUpdate::LocalAudioTrackPublished { publication } => { + log::info!("published audio track {}", publication.sid()); + } + + RoomUpdate::LocalVideoTrackPublished { publication } => { + log::info!("published video track {}", publication.sid()); + } } cx.notify(); diff --git a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift index 5f22acf581..db5da8e0e9 100644 --- a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift +++ b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -12,6 +12,8 @@ class LKRoomDelegate: RoomDelegate { var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + var onDidPublishOrUnpublishLocalAudioTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void + var onDidPublishOrUnpublishLocalVideoTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void init( data: UnsafeRawPointer, @@ -21,7 +23,10 @@ class LKRoomDelegate: RoomDelegate { onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void) + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void, + onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void + ) { self.data = data self.onDidDisconnect = onDidDisconnect @@ -31,6 +36,8 @@ class LKRoomDelegate: RoomDelegate { self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack self.onActiveSpeakersChanged = onActiveSpeakersChanged + self.onDidPublishOrUnpublishLocalAudioTrack = onDidPublishOrUnpublishLocalAudioTrack + self.onDidPublishOrUnpublishLocalVideoTrack = onDidPublishOrUnpublishLocalVideoTrack } func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) { @@ -65,6 +72,22 @@ class LKRoomDelegate: RoomDelegate { self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString) } } + + func room(_ room: Room, localParticipant: LocalParticipant, didPublish publication: LocalTrackPublication) { + if publication.kind == .video { + self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true) + } else if publication.kind == .audio { + self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true) + } + } + + func room(_ room: Room, localParticipant: LocalParticipant, didUnpublish publication: LocalTrackPublication) { + if publication.kind == .video { + self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false) + } else if publication.kind == .audio { + self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false) + } + } } class LKVideoRenderer: NSObject, VideoRenderer { @@ -109,7 +132,9 @@ public func LKRoomDelegateCreate( onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void, + onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void ) -> UnsafeMutableRawPointer { let delegate = LKRoomDelegate( data: data, @@ -119,7 +144,9 @@ public func LKRoomDelegateCreate( onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack, onActiveSpeakersChanged: onActiveSpeakerChanged, onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack, - onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack + onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack, + onDidPublishOrUnpublishLocalAudioTrack: onDidPublishOrUnpublishLocalAudioTrack, + onDidPublishOrUnpublishLocalVideoTrack: onDidPublishOrUnpublishLocalVideoTrack ) return Unmanaged.passRetained(delegate).toOpaque() } @@ -292,6 +319,14 @@ public func LKLocalTrackPublicationSetMute( } } +@_cdecl("LKLocalTrackPublicationIsMuted") +public func LKLocalTrackPublicationIsMuted( + publication: UnsafeRawPointer +) -> Bool { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + return publication.muted +} + @_cdecl("LKRemoteTrackPublicationSetEnabled") public func LKRemoteTrackPublicationSetEnabled( publication: UnsafeRawPointer, @@ -325,3 +360,12 @@ public func LKRemoteTrackPublicationGetSid( return publication.sid as CFString } + +@_cdecl("LKLocalTrackPublicationGetSid") +public func LKLocalTrackPublicationGetSid( + publication: UnsafeRawPointer +) -> CFString { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + return publication.sid as CFString +} diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index 7052b107bc..abec27462e 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -28,4 +28,8 @@ pub enum RoomUpdate { SubscribedToRemoteAudioTrack(Arc, Arc), UnsubscribedFromRemoteVideoTrack { publisher_id: Sid, track_id: Sid }, UnsubscribedFromRemoteAudioTrack { publisher_id: Sid, track_id: Sid }, + LocalAudioTrackPublished { publication: LocalTrackPublication }, + LocalAudioTrackUnpublished { publication: LocalTrackPublication }, + LocalVideoTrackPublished { publication: LocalTrackPublication }, + LocalVideoTrackUnpublished { publication: LocalTrackPublication }, } diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index b9f5aa6aa8..0827c0cbb4 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -77,6 +77,16 @@ extern "C" { publisher_id: CFStringRef, track_id: CFStringRef, ), + on_did_publish_or_unpublish_local_audio_track: extern "C" fn( + callback_data: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ), + on_did_publish_or_unpublish_local_video_track: extern "C" fn( + callback_data: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ), ) -> swift::RoomDelegate; fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room; @@ -152,7 +162,9 @@ extern "C" { callback_data: *mut c_void, ); + fn LKLocalTrackPublicationIsMuted(publication: swift::LocalTrackPublication) -> bool; fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool; + fn LKLocalTrackPublicationGetSid(publication: swift::LocalTrackPublication) -> CFStringRef; fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; } @@ -511,6 +523,8 @@ impl RoomDelegate { Self::on_active_speakers_changed, Self::on_did_subscribe_to_remote_video_track, Self::on_did_unsubscribe_from_remote_video_track, + Self::on_did_publish_or_unpublish_local_audio_track, + Self::on_did_publish_or_unpublish_local_video_track, ) }; Self { @@ -624,6 +638,46 @@ impl RoomDelegate { } let _ = Weak::into_raw(room); } + + extern "C" fn on_did_publish_or_unpublish_local_audio_track( + room: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + if let Some(room) = room.upgrade() { + let publication = LocalTrackPublication::new(publication); + let update = if is_published { + RoomUpdate::LocalAudioTrackPublished { publication } + } else { + RoomUpdate::LocalAudioTrackUnpublished { publication } + }; + room.update_subscribers + .lock() + .retain(|tx| tx.unbounded_send(update.clone()).is_ok()); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_publish_or_unpublish_local_video_track( + room: *mut c_void, + publication: swift::LocalTrackPublication, + is_published: bool, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + if let Some(room) = room.upgrade() { + let publication = LocalTrackPublication::new(publication); + let update = if is_published { + RoomUpdate::LocalVideoTrackPublished { publication } + } else { + RoomUpdate::LocalVideoTrackUnpublished { publication } + }; + room.update_subscribers + .lock() + .retain(|tx| tx.unbounded_send(update.clone()).is_ok()); + } + let _ = Weak::into_raw(room); + } } impl Drop for RoomDelegate { @@ -673,6 +727,10 @@ impl LocalTrackPublication { Self(native_track_publication) } + pub fn sid(&self) -> String { + unsafe { CFString::wrap_under_get_rule(LKLocalTrackPublicationGetSid(self.0)).to_string() } + } + pub fn set_mute(&self, muted: bool) -> impl Future> { let (tx, rx) = futures::channel::oneshot::channel(); @@ -697,6 +755,19 @@ impl LocalTrackPublication { async move { rx.await.unwrap() } } + + pub fn is_muted(&self) -> bool { + unsafe { LKLocalTrackPublicationIsMuted(self.0) } + } +} + +impl Clone for LocalTrackPublication { + fn clone(&self) -> Self { + unsafe { + CFRetain(self.0 .0); + } + Self(self.0) + } } impl Drop for LocalTrackPublication { diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 9c1a5ec59a..0716042ff1 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -8,7 +8,14 @@ use live_kit_server::{proto, token}; use media::core_video::CVImageBuffer; use parking_lot::Mutex; use postage::watch; -use std::{future::Future, mem, sync::Arc}; +use std::{ + future::Future, + mem, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, +}; static SERVERS: Mutex>> = Mutex::new(BTreeMap::new()); @@ -176,7 +183,11 @@ impl TestServer { } } - async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { + async fn publish_video_track( + &self, + token: String, + local_track: LocalVideoTrack, + ) -> Result { self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); @@ -198,8 +209,9 @@ impl TestServer { return Err(anyhow!("user is not allowed to publish")); } + let sid = nanoid::nanoid!(17); let track = Arc::new(RemoteVideoTrack { - sid: nanoid::nanoid!(17), + sid: sid.clone(), publisher_id: identity.clone(), frames_rx: local_track.frames_rx.clone(), }); @@ -217,14 +229,14 @@ impl TestServer { } } - Ok(()) + Ok(sid) } async fn publish_audio_track( &self, token: String, _local_track: &LocalAudioTrack, - ) -> Result<()> { + ) -> Result { self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); @@ -246,8 +258,9 @@ impl TestServer { return Err(anyhow!("user is not allowed to publish")); } + let sid = nanoid::nanoid!(17); let track = Arc::new(RemoteAudioTrack { - sid: nanoid::nanoid!(17), + sid: sid.clone(), publisher_id: identity.clone(), }); @@ -269,7 +282,7 @@ impl TestServer { } } - Ok(()) + Ok(sid) } fn video_tracks(&self, token: String) -> Result>> { @@ -425,10 +438,14 @@ impl Room { let this = self.clone(); let track = track.clone(); async move { - this.test_server() + let sid = this + .test_server() .publish_video_track(this.token(), track) .await?; - Ok(LocalTrackPublication) + Ok(LocalTrackPublication { + muted: Default::default(), + sid, + }) } } pub fn publish_audio_track( @@ -438,10 +455,14 @@ impl Room { let this = self.clone(); let track = track.clone(); async move { - this.test_server() + let sid = this + .test_server() .publish_audio_track(this.token(), &track) .await?; - Ok(LocalTrackPublication) + Ok(LocalTrackPublication { + muted: Default::default(), + sid, + }) } } @@ -536,11 +557,27 @@ impl Drop for Room { } } -pub struct LocalTrackPublication; +#[derive(Clone)] +pub struct LocalTrackPublication { + sid: String, + muted: Arc, +} impl LocalTrackPublication { - pub fn set_mute(&self, _mute: bool) -> impl Future> { - async { Ok(()) } + pub fn set_mute(&self, mute: bool) -> impl Future> { + let muted = self.muted.clone(); + async move { + muted.store(mute, SeqCst); + Ok(()) + } + } + + pub fn is_muted(&self) -> bool { + self.muted.load(SeqCst) + } + + pub fn sid(&self) -> String { + self.sid.clone() } }