diff --git a/Cargo.lock b/Cargo.lock index 241448fc4a..425a2a0a4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1114,7 +1114,7 @@ dependencies = [ "gpui2", "image", "language", - "live_kit_client2", + "live_kit_client", "log", "media", "postage", @@ -1476,7 +1476,7 @@ dependencies = [ "language", "lazy_static", "lipsum", - "live_kit_client2", + "live_kit_client", "live_kit_server", "log", "lsp", @@ -4168,39 +4168,6 @@ dependencies = [ [[package]] name = "live_kit_client" version = "0.1.0" -dependencies = [ - "anyhow", - "async-broadcast", - "async-trait", - "block", - "byteorder", - "bytes 1.5.0", - "cocoa", - "collections", - "core-foundation", - "core-graphics", - "foreign-types", - "futures 0.3.28", - "gpui", - "hmac 0.12.1", - "jwt", - "live_kit_server", - "log", - "media", - "nanoid", - "objc", - "parking_lot 0.11.2", - "postage", - "serde", - "serde_derive", - "serde_json", - "sha2 0.10.7", - "simplelog", -] - -[[package]] -name = "live_kit_client2" -version = "0.1.0" dependencies = [ "anyhow", "async-broadcast", diff --git a/crates/call/Cargo.toml b/crates/call/Cargo.toml index 616bc1743e..2ddf7caf44 100644 --- a/crates/call/Cargo.toml +++ b/crates/call/Cargo.toml @@ -24,7 +24,7 @@ client = { path = "../client" } collections = { path = "../collections" } gpui = { package = "gpui2", path = "../gpui2" } log.workspace = true -live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2" } +live_kit_client = { path = "../live_kit_client" } fs = { path = "../fs" } language = { path = "../language" } media = { path = "../media" } @@ -49,6 +49,6 @@ fs = { path = "../fs", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } collections = { path = "../collections", features = ["test-support"] } gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] } -live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2", features = ["test-support"] } +live_kit_client = { path = "../live_kit_client", features = ["test-support"] } project = { path = "../project", features = ["test-support"] } util = { path = "../util", features = ["test-support"] } diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index 383b020545..2607331ef8 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -70,7 +70,7 @@ editor = { path = "../editor", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } fs = { path = "../fs", features = ["test-support"] } git = { path = "../git", features = ["test-support"] } -live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2", features = ["test-support"] } +live_kit_client = { path = "../live_kit_client", features = ["test-support"] } lsp = { path = "../lsp", features = ["test-support"] } node_runtime = { path = "../node_runtime" } notifications = { path = "../notifications", features = ["test-support"] } diff --git a/crates/collab2/Cargo.toml b/crates/collab2/Cargo.toml deleted file mode 100644 index 383b020545..0000000000 --- a/crates/collab2/Cargo.toml +++ /dev/null @@ -1,99 +0,0 @@ -[package] -authors = ["Nathan Sobo "] -default-run = "collab" -edition = "2021" -name = "collab" -version = "0.28.0" -publish = false - -[[bin]] -name = "collab" - -[[bin]] -name = "seed" -required-features = ["seed-support"] - -[dependencies] -clock = { path = "../clock" } -collections = { path = "../collections" } -live_kit_server = { path = "../live_kit_server" } -text = { path = "../text" } -rpc = { path = "../rpc" } -util = { path = "../util" } - -anyhow.workspace = true -async-tungstenite = "0.16" -axum = { version = "0.5", features = ["json", "headers", "ws"] } -axum-extra = { version = "0.3", features = ["erased-json"] } -base64 = "0.13" -clap = { version = "3.1", features = ["derive"], optional = true } -dashmap = "5.4" -envy = "0.4.2" -futures.workspace = true -hyper = "0.14" -lazy_static.workspace = true -lipsum = { version = "0.8", optional = true } -log.workspace = true -nanoid = "0.4" -parking_lot.workspace = true -prometheus = "0.13" -prost.workspace = true -rand.workspace = true -reqwest = { version = "0.11", features = ["json"], optional = true } -scrypt = "0.7" -smallvec.workspace = true -sea-orm = { version = "0.12.x", features = ["sqlx-postgres", "postgres-array", "runtime-tokio-rustls", "with-uuid"] } -serde.workspace = true -serde_derive.workspace = true -serde_json.workspace = true -sha-1 = "0.9" -sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "json", "time", "uuid", "any"] } -time.workspace = true -tokio = { version = "1", features = ["full"] } -tokio-tungstenite = "0.17" -tonic = "0.6" -tower = "0.4" -toml.workspace = true -tracing = "0.1.34" -tracing-log = "0.1.3" -tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] } -uuid.workspace = true - -[dev-dependencies] -audio = { path = "../audio" } -collections = { path = "../collections", features = ["test-support"] } -gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] } -call = { path = "../call", features = ["test-support"] } -client = { path = "../client", features = ["test-support"] } -channel = { path = "../channel" } -editor = { path = "../editor", features = ["test-support"] } -language = { path = "../language", features = ["test-support"] } -fs = { path = "../fs", features = ["test-support"] } -git = { path = "../git", features = ["test-support"] } -live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2", features = ["test-support"] } -lsp = { path = "../lsp", features = ["test-support"] } -node_runtime = { path = "../node_runtime" } -notifications = { path = "../notifications", features = ["test-support"] } - -project = { path = "../project", features = ["test-support"] } -rpc = { path = "../rpc", features = ["test-support"] } -settings = { path = "../settings", features = ["test-support"] } -theme = { path = "../theme" } -workspace = { path = "../workspace", features = ["test-support"] } - -collab_ui = { path = "../collab_ui", features = ["test-support"] } - -async-trait.workspace = true -pretty_assertions.workspace = true -ctor.workspace = true -env_logger.workspace = true -indoc.workspace = true -util = { path = "../util" } -lazy_static.workspace = true -sea-orm = { version = "0.12.x", features = ["sqlx-sqlite"] } -serde_json.workspace = true -sqlx = { version = "0.7", features = ["sqlite"] } -unindent.workspace = true - -[features] -seed-support = ["clap", "lipsum", "reqwest"] diff --git a/crates/live_kit_client/Cargo.toml b/crates/live_kit_client/Cargo.toml index 78f435906b..f96d7a3340 100644 --- a/crates/live_kit_client/Cargo.toml +++ b/crates/live_kit_client/Cargo.toml @@ -23,7 +23,7 @@ test-support = [ [dependencies] collections = { path = "../collections", optional = true } -gpui = { path = "../gpui", optional = true } +gpui = { package = "gpui2", path = "../gpui2", optional = true } live_kit_server = { path = "../live_kit_server", optional = true } media = { path = "../media" } @@ -41,7 +41,7 @@ nanoid = { version ="0.4", optional = true} [dev-dependencies] collections = { path = "../collections", features = ["test-support"] } -gpui = { path = "../gpui", features = ["test-support"] } +gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] } live_kit_server = { path = "../live_kit_server" } media = { path = "../media" } nanoid = "0.4" diff --git a/crates/live_kit_client/LiveKitBridge/Package.resolved b/crates/live_kit_client/LiveKitBridge/Package.resolved index b0ba567f2b..bf17ef24c5 100644 --- a/crates/live_kit_client/LiveKitBridge/Package.resolved +++ b/crates/live_kit_client/LiveKitBridge/Package.resolved @@ -42,8 +42,8 @@ "repositoryURL": "https://github.com/apple/swift-protobuf.git", "state": { "branch": null, - "revision": "0af9125c4eae12a4973fb66574c53a54962a9e1e", - "version": "1.21.0" + "revision": "ce20dc083ee485524b802669890291c0d8090170", + "version": "1.22.1" } } ] diff --git a/crates/live_kit_client/build.rs b/crates/live_kit_client/build.rs index 1445704b46..3f67bfc156 100644 --- a/crates/live_kit_client/build.rs +++ b/crates/live_kit_client/build.rs @@ -61,12 +61,14 @@ fn build_bridge(swift_target: &SwiftTarget) { let swift_package_root = swift_package_root(); let swift_target_folder = swift_target_folder(); + let swift_cache_folder = swift_cache_folder(); if !Command::new("swift") .arg("build") .arg("--disable-automatic-resolution") .args(["--configuration", &env::var("PROFILE").unwrap()]) .args(["--triple", &swift_target.target.triple]) .args(["--build-path".into(), swift_target_folder]) + .args(["--cache-path".into(), swift_cache_folder]) .current_dir(&swift_package_root) .status() .unwrap() @@ -133,9 +135,17 @@ fn swift_package_root() -> PathBuf { } fn swift_target_folder() -> PathBuf { + let target = env::var("TARGET").unwrap(); env::current_dir() .unwrap() - .join(format!("../../target/{SWIFT_PACKAGE_NAME}")) + .join(format!("../../target/{target}/{SWIFT_PACKAGE_NAME}_target")) +} + +fn swift_cache_folder() -> PathBuf { + let target = env::var("TARGET").unwrap(); + env::current_dir() + .unwrap() + .join(format!("../../target/{target}/{SWIFT_PACKAGE_NAME}_cache")) } fn copy_dir(source: &Path, destination: &Path) { diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs index fa5bb1bc40..96407497ae 100644 --- a/crates/live_kit_client/examples/test_app.rs +++ b/crates/live_kit_client/examples/test_app.rs @@ -1,7 +1,7 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use futures::StreamExt; -use gpui::{actions, keymap_matcher::Binding, Menu, MenuItem}; +use gpui::{actions, KeyBinding}; use live_kit_client::{ LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, }; @@ -9,30 +9,32 @@ use live_kit_server::token::{self, VideoGrant}; use log::LevelFilter; use simplelog::SimpleLogger; -actions!(capture, [Quit]); +actions!(live_kit_client, [Quit]); fn main() { SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger"); - gpui::App::new(()).unwrap().run(|cx| { + gpui::App::production(Arc::new(())).run(|cx| { #[cfg(any(test, feature = "test-support"))] println!("USING TEST LIVEKIT"); #[cfg(not(any(test, feature = "test-support")))] println!("USING REAL LIVEKIT"); - cx.platform().activate(true); - cx.add_global_action(quit); + cx.activate(true); - cx.add_bindings([Binding::new("cmd-q", Quit, None)]); - cx.set_menus(vec![Menu { - name: "Zed", - items: vec![MenuItem::Action { - name: "Quit", - action: Box::new(Quit), - os_action: None, - }], - }]); + cx.on_action(quit); + cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]); + + // todo!() + // cx.set_menus(vec![Menu { + // name: "Zed", + // items: vec![MenuItem::Action { + // name: "Quit", + // action: Box::new(Quit), + // os_action: None, + // }], + // }]); let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into()); let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into()); @@ -100,7 +102,7 @@ fn main() { } println!("Pausing for 5 seconds to test audio, make some noise!"); - let timer = cx.background().timer(Duration::from_secs(5)); + let timer = cx.background_executor().timer(Duration::from_secs(5)); timer.await; let remote_audio_track = room_b .remote_audio_tracks("test-participant-1") @@ -163,12 +165,12 @@ fn main() { panic!("unexpected message"); } - cx.platform().quit(); + cx.update(|cx| cx.shutdown()).ok(); }) .detach(); }); } fn quit(_: &Quit, cx: &mut gpui::AppContext) { - cx.platform().quit(); + cx.quit(); } diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index 007c47ac63..b2b83e95fc 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -17,6 +17,29 @@ use std::{ sync::{Arc, Weak}, }; +// SAFETY: Most live kit types are threadsafe: +// https://github.com/livekit/client-sdk-swift#thread-safety +macro_rules! pointer_type { + ($pointer_name:ident) => { + #[repr(transparent)] + #[derive(Copy, Clone, Debug)] + pub struct $pointer_name(pub *const std::ffi::c_void); + unsafe impl Send for $pointer_name {} + }; +} + +mod swift { + pointer_type!(Room); + pointer_type!(LocalAudioTrack); + pointer_type!(RemoteAudioTrack); + pointer_type!(LocalVideoTrack); + pointer_type!(RemoteVideoTrack); + pointer_type!(LocalTrackPublication); + pointer_type!(RemoteTrackPublication); + pointer_type!(MacOSDisplay); + pointer_type!(RoomDelegate); +} + extern "C" { fn LKRoomDelegateCreate( callback_data: *mut c_void, @@ -25,8 +48,8 @@ extern "C" { callback_data: *mut c_void, publisher_id: CFStringRef, track_id: CFStringRef, - remote_track: *const c_void, - remote_publication: *const c_void, + remote_track: swift::RemoteAudioTrack, + remote_publication: swift::RemoteTrackPublication, ), on_did_unsubscribe_from_remote_audio_track: extern "C" fn( callback_data: *mut c_void, @@ -46,49 +69,50 @@ extern "C" { callback_data: *mut c_void, publisher_id: CFStringRef, track_id: CFStringRef, - remote_track: *const c_void, + remote_track: swift::RemoteVideoTrack, ), on_did_unsubscribe_from_remote_video_track: extern "C" fn( callback_data: *mut c_void, publisher_id: CFStringRef, track_id: CFStringRef, ), - ) -> *const c_void; + ) -> swift::RoomDelegate; - fn LKRoomCreate(delegate: *const c_void) -> *const c_void; + fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room; fn LKRoomConnect( - room: *const c_void, + room: swift::Room, url: CFStringRef, token: CFStringRef, callback: extern "C" fn(*mut c_void, CFStringRef), callback_data: *mut c_void, ); - fn LKRoomDisconnect(room: *const c_void); + fn LKRoomDisconnect(room: swift::Room); fn LKRoomPublishVideoTrack( - room: *const c_void, - track: *const c_void, - callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef), + room: swift::Room, + track: swift::LocalVideoTrack, + callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), callback_data: *mut c_void, ); fn LKRoomPublishAudioTrack( - room: *const c_void, - track: *const c_void, - callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef), + room: swift::Room, + track: swift::LocalAudioTrack, + callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), callback_data: *mut c_void, ); - fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void); + fn LKRoomUnpublishTrack(room: swift::Room, publication: swift::LocalTrackPublication); + fn LKRoomAudioTracksForRemoteParticipant( - room: *const c_void, + room: swift::Room, participant_id: CFStringRef, ) -> CFArrayRef; fn LKRoomAudioTrackPublicationsForRemoteParticipant( - room: *const c_void, + room: swift::Room, participant_id: CFStringRef, ) -> CFArrayRef; fn LKRoomVideoTracksForRemoteParticipant( - room: *const c_void, + room: swift::Room, participant_id: CFStringRef, ) -> CFArrayRef; @@ -98,9 +122,9 @@ extern "C" { on_drop: extern "C" fn(callback_data: *mut c_void), ) -> *const c_void; - fn LKRemoteAudioTrackGetSid(track: *const c_void) -> CFStringRef; - fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void); - fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef; + fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef; + fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void); + fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef; fn LKDisplaySources( callback_data: *mut c_void, @@ -110,25 +134,25 @@ extern "C" { error: CFStringRef, ), ); - fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void; - fn LKLocalAudioTrackCreateTrack() -> *const c_void; + fn LKCreateScreenShareTrackForDisplay(display: swift::MacOSDisplay) -> swift::LocalVideoTrack; + fn LKLocalAudioTrackCreateTrack() -> swift::LocalAudioTrack; fn LKLocalTrackPublicationSetMute( - publication: *const c_void, + publication: swift::LocalTrackPublication, muted: bool, on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), callback_data: *mut c_void, ); fn LKRemoteTrackPublicationSetEnabled( - publication: *const c_void, + publication: swift::RemoteTrackPublication, enabled: bool, on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), callback_data: *mut c_void, ); - fn LKRemoteTrackPublicationIsMuted(publication: *const c_void) -> bool; - fn LKRemoteTrackPublicationGetSid(publication: *const c_void) -> CFStringRef; + fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool; + fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; } pub type Sid = String; @@ -140,30 +164,29 @@ pub enum ConnectionState { } pub struct Room { - native_room: *const c_void, + native_room: Mutex, connection: Mutex<( watch::Sender, watch::Receiver, )>, remote_audio_track_subscribers: Mutex>>, remote_video_track_subscribers: Mutex>>, - _delegate: RoomDelegate, + _delegate: Mutex, } -// SAFETY: LiveKit objects are thread-safe: https://github.com/livekit/client-sdk-swift#thread-safety -unsafe impl Send for Room {} -unsafe impl Sync for Room {} +trait AssertSendSync: Send {} +impl AssertSendSync for Room {} impl Room { pub fn new() -> Arc { Arc::new_cyclic(|weak_room| { let delegate = RoomDelegate::new(weak_room.clone()); Self { - native_room: unsafe { LKRoomCreate(delegate.native_delegate) }, + native_room: Mutex::new(unsafe { LKRoomCreate(delegate.native_delegate) }), connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)), remote_audio_track_subscribers: Default::default(), remote_video_track_subscribers: Default::default(), - _delegate: delegate, + _delegate: Mutex::new(delegate), } }) } @@ -178,7 +201,7 @@ impl Room { let (did_connect, tx, rx) = Self::build_done_callback(); unsafe { LKRoomConnect( - self.native_room, + *self.native_room.lock(), url.as_concrete_TypeRef(), token.as_concrete_TypeRef(), did_connect, @@ -210,7 +233,7 @@ impl Room { } else { let sources = CFArray::wrap_under_get_rule(sources) .into_iter() - .map(|source| MacOSDisplay::new(*source)) + .map(|source| MacOSDisplay::new(swift::MacOSDisplay(*source))) .collect(); let _ = tx.send(Ok(sources)); @@ -232,7 +255,11 @@ impl Room { track: LocalVideoTrack, ) -> impl Future> { let (tx, rx) = oneshot::channel::>(); - extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) { + extern "C" fn callback( + tx: *mut c_void, + publication: swift::LocalTrackPublication, + error: CFStringRef, + ) { let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; if error.is_null() { @@ -244,7 +271,7 @@ impl Room { } unsafe { LKRoomPublishVideoTrack( - self.native_room, + *self.native_room.lock(), track.0, callback, Box::into_raw(Box::new(tx)) as *mut c_void, @@ -258,7 +285,11 @@ impl Room { track: LocalAudioTrack, ) -> impl Future> { let (tx, rx) = oneshot::channel::>(); - extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) { + extern "C" fn callback( + tx: *mut c_void, + publication: swift::LocalTrackPublication, + error: CFStringRef, + ) { let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; if error.is_null() { @@ -270,7 +301,7 @@ impl Room { } unsafe { LKRoomPublishAudioTrack( - self.native_room, + *self.native_room.lock(), track.0, callback, Box::into_raw(Box::new(tx)) as *mut c_void, @@ -281,14 +312,14 @@ impl Room { pub fn unpublish_track(&self, publication: LocalTrackPublication) { unsafe { - LKRoomUnpublishTrack(self.native_room, publication.0); + LKRoomUnpublishTrack(*self.native_room.lock(), publication.0); } } pub fn remote_video_tracks(&self, participant_id: &str) -> Vec> { unsafe { let tracks = LKRoomVideoTracksForRemoteParticipant( - self.native_room, + *self.native_room.lock(), CFString::new(participant_id).as_concrete_TypeRef(), ); @@ -299,7 +330,7 @@ impl Room { tracks .into_iter() .map(|native_track| { - let native_track = *native_track; + let native_track = swift::RemoteVideoTrack(*native_track); let id = CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track)) .to_string(); @@ -317,7 +348,7 @@ impl Room { pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec> { unsafe { let tracks = LKRoomAudioTracksForRemoteParticipant( - self.native_room, + *self.native_room.lock(), CFString::new(participant_id).as_concrete_TypeRef(), ); @@ -328,7 +359,7 @@ impl Room { tracks .into_iter() .map(|native_track| { - let native_track = *native_track; + let native_track = swift::RemoteAudioTrack(*native_track); let id = CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track)) .to_string(); @@ -349,7 +380,7 @@ impl Room { ) -> Vec> { unsafe { let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant( - self.native_room, + *self.native_room.lock(), CFString::new(participant_id).as_concrete_TypeRef(), ); @@ -360,7 +391,8 @@ impl Room { tracks .into_iter() .map(|native_track_publication| { - let native_track_publication = *native_track_publication; + let native_track_publication = + swift::RemoteTrackPublication(*native_track_publication); Arc::new(RemoteTrackPublication::new(native_track_publication)) }) .collect() @@ -467,28 +499,32 @@ impl Room { rx, ) } + + pub fn set_display_sources(&self, _: Vec) { + unreachable!("This is a test-only function") + } } impl Drop for Room { fn drop(&mut self) { unsafe { - LKRoomDisconnect(self.native_room); - CFRelease(self.native_room); + let native_room = &*self.native_room.lock(); + LKRoomDisconnect(*native_room); + CFRelease(native_room.0); } } } struct RoomDelegate { - native_delegate: *const c_void, - weak_room: *const Room, + native_delegate: swift::RoomDelegate, + _weak_room: Weak, } impl RoomDelegate { fn new(weak_room: Weak) -> Self { - let weak_room = Weak::into_raw(weak_room); let native_delegate = unsafe { LKRoomDelegateCreate( - weak_room as *mut c_void, + weak_room.as_ptr() as *mut c_void, Self::on_did_disconnect, Self::on_did_subscribe_to_remote_audio_track, Self::on_did_unsubscribe_from_remote_audio_track, @@ -500,7 +536,7 @@ impl RoomDelegate { }; Self { native_delegate, - weak_room, + _weak_room: weak_room, } } @@ -516,8 +552,8 @@ impl RoomDelegate { room: *mut c_void, publisher_id: CFStringRef, track_id: CFStringRef, - track: *const c_void, - publication: *const c_void, + track: swift::RemoteAudioTrack, + publication: swift::RemoteTrackPublication, ) { let room = unsafe { Weak::from_raw(room as *mut Room) }; let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; @@ -584,7 +620,7 @@ impl RoomDelegate { room: *mut c_void, publisher_id: CFStringRef, track_id: CFStringRef, - track: *const c_void, + track: swift::RemoteVideoTrack, ) { let room = unsafe { Weak::from_raw(room as *mut Room) }; let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; @@ -614,14 +650,12 @@ impl RoomDelegate { impl Drop for RoomDelegate { fn drop(&mut self) { unsafe { - CFRelease(self.native_delegate); - let _ = Weak::from_raw(self.weak_room); + CFRelease(self.native_delegate.0); } } } -pub struct LocalAudioTrack(*const c_void); -unsafe impl Send for LocalAudioTrack {} +pub struct LocalAudioTrack(swift::LocalAudioTrack); impl LocalAudioTrack { pub fn create() -> Self { @@ -631,12 +665,11 @@ impl LocalAudioTrack { impl Drop for LocalAudioTrack { fn drop(&mut self) { - unsafe { CFRelease(self.0) } + unsafe { CFRelease(self.0 .0) } } } -pub struct LocalVideoTrack(*const c_void); -unsafe impl Send for LocalVideoTrack {} +pub struct LocalVideoTrack(swift::LocalVideoTrack); impl LocalVideoTrack { pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { @@ -646,17 +679,16 @@ impl LocalVideoTrack { impl Drop for LocalVideoTrack { fn drop(&mut self) { - unsafe { CFRelease(self.0) } + unsafe { CFRelease(self.0 .0) } } } -pub struct LocalTrackPublication(*const c_void); -unsafe impl Send for LocalTrackPublication {} +pub struct LocalTrackPublication(swift::LocalTrackPublication); impl LocalTrackPublication { - pub fn new(native_track_publication: *const c_void) -> Self { + pub fn new(native_track_publication: swift::LocalTrackPublication) -> Self { unsafe { - CFRetain(native_track_publication); + CFRetain(native_track_publication.0); } Self(native_track_publication) } @@ -689,28 +721,35 @@ impl LocalTrackPublication { impl Drop for LocalTrackPublication { fn drop(&mut self) { - unsafe { CFRelease(self.0) } + unsafe { CFRelease(self.0 .0) } } } -pub struct RemoteTrackPublication(*const c_void); - -unsafe impl Send for RemoteTrackPublication {} +pub struct RemoteTrackPublication { + native_publication: Mutex, +} impl RemoteTrackPublication { - pub fn new(native_track_publication: *const c_void) -> Self { + pub fn new(native_track_publication: swift::RemoteTrackPublication) -> Self { unsafe { - CFRetain(native_track_publication); + CFRetain(native_track_publication.0); + } + Self { + native_publication: Mutex::new(native_track_publication), } - Self(native_track_publication) } pub fn sid(&self) -> String { - unsafe { CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid(self.0)).to_string() } + unsafe { + CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid( + *self.native_publication.lock(), + )) + .to_string() + } } pub fn is_muted(&self) -> bool { - unsafe { LKRemoteTrackPublicationIsMuted(self.0) } + unsafe { LKRemoteTrackPublicationIsMuted(*self.native_publication.lock()) } } pub fn set_enabled(&self, enabled: bool) -> impl Future> { @@ -728,7 +767,7 @@ impl RemoteTrackPublication { unsafe { LKRemoteTrackPublicationSetEnabled( - self.0, + *self.native_publication.lock(), enabled, complete_callback, Box::into_raw(Box::new(tx)) as *mut c_void, @@ -741,26 +780,24 @@ impl RemoteTrackPublication { impl Drop for RemoteTrackPublication { fn drop(&mut self) { - unsafe { CFRelease(self.0) } + unsafe { CFRelease((*self.native_publication.lock()).0) } } } #[derive(Debug)] pub struct RemoteAudioTrack { - _native_track: *const c_void, + native_track: Mutex, sid: Sid, publisher_id: String, } -unsafe impl Send for RemoteAudioTrack {} - impl RemoteAudioTrack { - fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self { + fn new(native_track: swift::RemoteAudioTrack, sid: Sid, publisher_id: String) -> Self { unsafe { - CFRetain(native_track); + CFRetain(native_track.0); } Self { - _native_track: native_track, + native_track: Mutex::new(native_track), sid, publisher_id, } @@ -783,22 +820,26 @@ impl RemoteAudioTrack { } } +impl Drop for RemoteAudioTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.native_track.lock().0) } + } +} + #[derive(Debug)] pub struct RemoteVideoTrack { - native_track: *const c_void, + native_track: Mutex, sid: Sid, publisher_id: String, } -unsafe impl Send for RemoteVideoTrack {} - impl RemoteVideoTrack { - fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self { + fn new(native_track: swift::RemoteVideoTrack, sid: Sid, publisher_id: String) -> Self { unsafe { - CFRetain(native_track); + CFRetain(native_track.0); } Self { - native_track, + native_track: Mutex::new(native_track), sid, publisher_id, } @@ -847,7 +888,7 @@ impl RemoteVideoTrack { on_frame, on_drop, ); - LKVideoTrackAddRenderer(self.native_track, renderer); + LKVideoTrackAddRenderer(*self.native_track.lock(), renderer); rx } } @@ -855,7 +896,7 @@ impl RemoteVideoTrack { impl Drop for RemoteVideoTrack { fn drop(&mut self) { - unsafe { CFRelease(self.native_track) } + unsafe { CFRelease(self.native_track.lock().0) } } } @@ -871,14 +912,12 @@ pub enum RemoteAudioTrackUpdate { Unsubscribed { publisher_id: Sid, track_id: Sid }, } -pub struct MacOSDisplay(*const c_void); - -unsafe impl Send for MacOSDisplay {} +pub struct MacOSDisplay(swift::MacOSDisplay); impl MacOSDisplay { - fn new(ptr: *const c_void) -> Self { + fn new(ptr: swift::MacOSDisplay) -> Self { unsafe { - CFRetain(ptr); + CFRetain(ptr.0); } Self(ptr) } @@ -886,7 +925,7 @@ impl MacOSDisplay { impl Drop for MacOSDisplay { fn drop(&mut self) { - unsafe { CFRelease(self.0) } + unsafe { CFRelease(self.0 .0) } } } diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index a1d4f9a56e..1106e66f31 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,8 +1,8 @@ -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use collections::{BTreeMap, HashMap}; use futures::Stream; -use gpui::executor::Background; +use gpui::BackgroundExecutor; use live_kit_server::token; use media::core_video::CVImageBuffer; use parking_lot::Mutex; @@ -16,7 +16,7 @@ pub struct TestServer { pub api_key: String, pub secret_key: String, rooms: Mutex>, - background: Arc, + executor: BackgroundExecutor, } impl TestServer { @@ -24,7 +24,7 @@ impl TestServer { url: String, api_key: String, secret_key: String, - background: Arc, + executor: BackgroundExecutor, ) -> Result> { let mut servers = SERVERS.lock(); if servers.contains_key(&url) { @@ -35,7 +35,7 @@ impl TestServer { api_key, secret_key, rooms: Default::default(), - background, + executor, }); servers.insert(url, server.clone()); Ok(server) @@ -65,7 +65,7 @@ impl TestServer { } pub async fn create_room(&self, room: String) -> Result<()> { - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); if server_rooms.contains_key(&room) { Err(anyhow!("room {:?} already exists", room)) @@ -77,7 +77,7 @@ impl TestServer { async fn delete_room(&self, room: String) -> Result<()> { // TODO: clear state associated with all `Room`s. - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); server_rooms .remove(&room) @@ -86,7 +86,7 @@ impl TestServer { } async fn join_room(&self, token: String, client_room: Arc) -> Result<()> { - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); @@ -115,7 +115,7 @@ impl TestServer { } async fn leave_room(&self, token: String) -> Result<()> { - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); @@ -136,7 +136,7 @@ impl TestServer { async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> { // TODO: clear state associated with the `Room`. - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&room_name) @@ -152,7 +152,7 @@ impl TestServer { } pub async fn disconnect_client(&self, client_identity: String) { - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); for room in server_rooms.values_mut() { if let Some(room) = room.client_rooms.remove(&client_identity) { @@ -162,7 +162,7 @@ impl TestServer { } async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); @@ -200,7 +200,7 @@ impl TestServer { token: String, _local_track: &LocalAudioTrack, ) -> Result<()> { - self.background.simulate_random_delay().await; + self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); @@ -364,7 +364,10 @@ impl Room { let token = token.to_string(); async move { let server = TestServer::get(&url)?; - server.join_room(token.clone(), this.clone()).await?; + server + .join_room(token.clone(), this.clone()) + .await + .context("room join")?; *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token }; Ok(()) } @@ -374,7 +377,7 @@ impl Room { let this = self.clone(); async move { let server = this.test_server(); - server.background.simulate_random_delay().await; + server.executor.simulate_random_delay().await; Ok(this.0.lock().display_sources.clone()) } } @@ -492,8 +495,8 @@ impl Drop for Room { ConnectionState::Disconnected, ) { if let Ok(server) = TestServer::get(&token) { - let background = server.background.clone(); - background + let executor = server.executor.clone(); + executor .spawn(async move { server.leave_room(token).await.unwrap() }) .detach(); } @@ -547,6 +550,7 @@ impl LocalAudioTrack { } } +#[derive(Debug)] pub struct RemoteVideoTrack { sid: Sid, publisher_id: Sid, diff --git a/crates/live_kit_client2/.cargo/config.toml b/crates/live_kit_client2/.cargo/config.toml deleted file mode 100644 index b33fe211bd..0000000000 --- a/crates/live_kit_client2/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[live_kit_client_test] -rustflags = ["-C", "link-args=-ObjC"] diff --git a/crates/live_kit_client2/Cargo.toml b/crates/live_kit_client2/Cargo.toml deleted file mode 100644 index 073c0017b0..0000000000 --- a/crates/live_kit_client2/Cargo.toml +++ /dev/null @@ -1,71 +0,0 @@ -[package] -name = "live_kit_client2" -version = "0.1.0" -edition = "2021" -description = "Bindings to LiveKit Swift client SDK" -publish = false - -[lib] -path = "src/live_kit_client2.rs" -doctest = false - -[[example]] -name = "test_app2" - -[features] -test-support = [ - "async-trait", - "collections/test-support", - "gpui/test-support", - "live_kit_server", - "nanoid", -] - -[dependencies] -collections = { path = "../collections", optional = true } -gpui = { package = "gpui2", path = "../gpui2", optional = true } -live_kit_server = { path = "../live_kit_server", optional = true } -media = { path = "../media" } - -anyhow.workspace = true -async-broadcast = "0.4" -core-foundation = "0.9.3" -core-graphics = "0.22.3" -futures.workspace = true -log.workspace = true -parking_lot.workspace = true -postage.workspace = true - -async-trait = { workspace = true, optional = true } -nanoid = { version ="0.4", optional = true} - -[dev-dependencies] -collections = { path = "../collections", features = ["test-support"] } -gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] } -live_kit_server = { path = "../live_kit_server" } -media = { path = "../media" } -nanoid = "0.4" - -anyhow.workspace = true -async-trait.workspace = true -block = "0.1" -bytes = "1.2" -byteorder = "1.4" -cocoa = "0.24" -core-foundation = "0.9.3" -core-graphics = "0.22.3" -foreign-types = "0.3" -futures.workspace = true -hmac = "0.12" -jwt = "0.16" -objc = "0.2" -parking_lot.workspace = true -serde.workspace = true -serde_derive.workspace = true -sha2 = "0.10" -simplelog = "0.9" - -[build-dependencies] -serde.workspace = true -serde_derive.workspace = true -serde_json.workspace = true diff --git a/crates/live_kit_client2/LiveKitBridge2/Package.resolved b/crates/live_kit_client2/LiveKitBridge2/Package.resolved deleted file mode 100644 index bf17ef24c5..0000000000 --- a/crates/live_kit_client2/LiveKitBridge2/Package.resolved +++ /dev/null @@ -1,52 +0,0 @@ -{ - "object": { - "pins": [ - { - "package": "LiveKit", - "repositoryURL": "https://github.com/livekit/client-sdk-swift.git", - "state": { - "branch": null, - "revision": "8b9cefed8d1669ec8fce41376b56dce3036a5f50", - "version": "1.1.4" - } - }, - { - "package": "Promises", - "repositoryURL": "https://github.com/google/promises.git", - "state": { - "branch": null, - "revision": "ec957ccddbcc710ccc64c9dcbd4c7006fcf8b73a", - "version": "2.2.0" - } - }, - { - "package": "WebRTC", - "repositoryURL": "https://github.com/webrtc-sdk/Specs.git", - "state": { - "branch": null, - "revision": "4fa8d6d647fc759cdd0265fd413d2f28ea2e0e08", - "version": "114.5735.8" - } - }, - { - "package": "swift-log", - "repositoryURL": "https://github.com/apple/swift-log.git", - "state": { - "branch": null, - "revision": "32e8d724467f8fe623624570367e3d50c5638e46", - "version": "1.5.2" - } - }, - { - "package": "SwiftProtobuf", - "repositoryURL": "https://github.com/apple/swift-protobuf.git", - "state": { - "branch": null, - "revision": "ce20dc083ee485524b802669890291c0d8090170", - "version": "1.22.1" - } - } - ] - }, - "version": 1 -} diff --git a/crates/live_kit_client2/LiveKitBridge2/Package.swift b/crates/live_kit_client2/LiveKitBridge2/Package.swift deleted file mode 100644 index 29a5021e54..0000000000 --- a/crates/live_kit_client2/LiveKitBridge2/Package.swift +++ /dev/null @@ -1,27 +0,0 @@ -// swift-tools-version: 5.5 - -import PackageDescription - -let package = Package( - name: "LiveKitBridge2", - platforms: [ - .macOS(.v10_15) - ], - products: [ - // Products define the executables and libraries a package produces, and make them visible to other packages. - .library( - name: "LiveKitBridge2", - type: .static, - targets: ["LiveKitBridge2"]), - ], - dependencies: [ - .package(url: "https://github.com/livekit/client-sdk-swift.git", .exact("1.1.4")), - ], - targets: [ - // Targets are the basic building blocks of a package. A target can define a module or a test suite. - // Targets can depend on other targets in this package, and on products in packages this package depends on. - .target( - name: "LiveKitBridge2", - dependencies: [.product(name: "LiveKit", package: "client-sdk-swift")]), - ] -) diff --git a/crates/live_kit_client2/LiveKitBridge2/README.md b/crates/live_kit_client2/LiveKitBridge2/README.md deleted file mode 100644 index 1fceed8165..0000000000 --- a/crates/live_kit_client2/LiveKitBridge2/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# LiveKitBridge2 - -A description of this package. diff --git a/crates/live_kit_client2/LiveKitBridge2/Sources/LiveKitBridge2/LiveKitBridge2.swift b/crates/live_kit_client2/LiveKitBridge2/Sources/LiveKitBridge2/LiveKitBridge2.swift deleted file mode 100644 index 5f22acf581..0000000000 --- a/crates/live_kit_client2/LiveKitBridge2/Sources/LiveKitBridge2/LiveKitBridge2.swift +++ /dev/null @@ -1,327 +0,0 @@ -import Foundation -import LiveKit -import WebRTC -import ScreenCaptureKit - -class LKRoomDelegate: RoomDelegate { - var data: UnsafeRawPointer - var onDidDisconnect: @convention(c) (UnsafeRawPointer) -> Void - var onDidSubscribeToRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void - var onDidUnsubscribeFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void - var onMuteChangedFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void - var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void - var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void - var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void - - init( - data: UnsafeRawPointer, - onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void, - onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, - onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, - onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void, - onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void) - { - self.data = data - self.onDidDisconnect = onDidDisconnect - self.onDidSubscribeToRemoteAudioTrack = onDidSubscribeToRemoteAudioTrack - self.onDidUnsubscribeFromRemoteAudioTrack = onDidUnsubscribeFromRemoteAudioTrack - self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack - self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack - self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack - self.onActiveSpeakersChanged = onActiveSpeakersChanged - } - - func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) { - if connectionState.isDisconnected { - self.onDidDisconnect(self.data) - } - } - - func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) { - if track.kind == .video { - self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque()) - } else if track.kind == .audio { - self.onDidSubscribeToRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque(), Unmanaged.passUnretained(publication).toOpaque()) - } - } - - func room(_ room: Room, participant: Participant, didUpdate publication: TrackPublication, muted: Bool) { - if publication.kind == .audio { - self.onMuteChangedFromRemoteAudioTrack(self.data, publication.sid as CFString, muted) - } - } - - func room(_ room: Room, didUpdate speakers: [Participant]) { - guard let speaker_ids = speakers.compactMap({ $0.identity as CFString }) as CFArray? else { return } - self.onActiveSpeakersChanged(self.data, speaker_ids) - } - - func room(_ room: Room, participant: RemoteParticipant, didUnsubscribe publication: RemoteTrackPublication, track: Track) { - if track.kind == .video { - self.onDidUnsubscribeFromRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString) - } else if track.kind == .audio { - self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString) - } - } -} - -class LKVideoRenderer: NSObject, VideoRenderer { - var data: UnsafeRawPointer - var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool - var onDrop: @convention(c) (UnsafeRawPointer) -> Void - var adaptiveStreamIsEnabled: Bool = false - var adaptiveStreamSize: CGSize = .zero - weak var track: VideoTrack? - - init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) { - self.data = data - self.onFrame = onFrame - self.onDrop = onDrop - } - - deinit { - self.onDrop(self.data) - } - - func setSize(_ size: CGSize) { - } - - func renderFrame(_ frame: RTCVideoFrame?) { - let buffer = frame?.buffer as? RTCCVPixelBuffer - if let pixelBuffer = buffer?.pixelBuffer { - if !self.onFrame(self.data, pixelBuffer) { - DispatchQueue.main.async { - self.track?.remove(videoRenderer: self) - } - } - } - } -} - -@_cdecl("LKRoomDelegateCreate") -public func LKRoomDelegateCreate( - data: UnsafeRawPointer, - onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void, - onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, - onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, - onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void, - onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, - onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void -) -> UnsafeMutableRawPointer { - let delegate = LKRoomDelegate( - data: data, - onDidDisconnect: onDidDisconnect, - onDidSubscribeToRemoteAudioTrack: onDidSubscribeToRemoteAudioTrack, - onDidUnsubscribeFromRemoteAudioTrack: onDidUnsubscribeFromRemoteAudioTrack, - onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack, - onActiveSpeakersChanged: onActiveSpeakerChanged, - onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack, - onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack - ) - return Unmanaged.passRetained(delegate).toOpaque() -} - -@_cdecl("LKRoomCreate") -public func LKRoomCreate(delegate: UnsafeRawPointer) -> UnsafeMutableRawPointer { - let delegate = Unmanaged.fromOpaque(delegate).takeUnretainedValue() - return Unmanaged.passRetained(Room(delegate: delegate)).toOpaque() -} - -@_cdecl("LKRoomConnect") -public func LKRoomConnect(room: UnsafeRawPointer, url: CFString, token: CFString, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - - room.connect(url as String, token as String).then { _ in - callback(callback_data, UnsafeRawPointer(nil) as! CFString?) - }.catch { error in - callback(callback_data, error.localizedDescription as CFString) - } -} - -@_cdecl("LKRoomDisconnect") -public func LKRoomDisconnect(room: UnsafeRawPointer) { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - room.disconnect() -} - -@_cdecl("LKRoomPublishVideoTrack") -public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - let track = Unmanaged.fromOpaque(track).takeUnretainedValue() - room.localParticipant?.publishVideoTrack(track: track).then { publication in - callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil) - }.catch { error in - callback(callback_data, nil, error.localizedDescription as CFString) - } -} - -@_cdecl("LKRoomPublishAudioTrack") -public func LKRoomPublishAudioTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - let track = Unmanaged.fromOpaque(track).takeUnretainedValue() - room.localParticipant?.publishAudioTrack(track: track).then { publication in - callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil) - }.catch { error in - callback(callback_data, nil, error.localizedDescription as CFString) - } -} - - -@_cdecl("LKRoomUnpublishTrack") -public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - let _ = room.localParticipant?.unpublish(publication: publication) -} - -@_cdecl("LKRoomAudioTracksForRemoteParticipant") -public func LKRoomAudioTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - - for (_, participant) in room.remoteParticipants { - if participant.identity == participantId as String { - return participant.audioTracks.compactMap { $0.track as? RemoteAudioTrack } as CFArray? - } - } - - return nil; -} - -@_cdecl("LKRoomAudioTrackPublicationsForRemoteParticipant") -public func LKRoomAudioTrackPublicationsForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - - for (_, participant) in room.remoteParticipants { - if participant.identity == participantId as String { - return participant.audioTracks.compactMap { $0 as? RemoteTrackPublication } as CFArray? - } - } - - return nil; -} - -@_cdecl("LKRoomVideoTracksForRemoteParticipant") -public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { - let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - - for (_, participant) in room.remoteParticipants { - if participant.identity == participantId as String { - return participant.videoTracks.compactMap { $0.track as? RemoteVideoTrack } as CFArray? - } - } - - return nil; -} - -@_cdecl("LKLocalAudioTrackCreateTrack") -public func LKLocalAudioTrackCreateTrack() -> UnsafeMutableRawPointer { - let track = LocalAudioTrack.createTrack(options: AudioCaptureOptions( - echoCancellation: true, - noiseSuppression: true - )) - - return Unmanaged.passRetained(track).toOpaque() -} - - -@_cdecl("LKCreateScreenShareTrackForDisplay") -public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { - let display = Unmanaged.fromOpaque(display).takeUnretainedValue() - let track = LocalVideoTrack.createMacOSScreenShareTrack(source: display, preferredMethod: .legacy) - return Unmanaged.passRetained(track).toOpaque() -} - -@_cdecl("LKVideoRendererCreate") -public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer { - Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque() -} - -@_cdecl("LKVideoTrackAddRenderer") -public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) { - let track = Unmanaged.fromOpaque(track).takeUnretainedValue() as! VideoTrack - let renderer = Unmanaged.fromOpaque(renderer).takeRetainedValue() - renderer.track = track - track.add(videoRenderer: renderer) -} - -@_cdecl("LKRemoteVideoTrackGetSid") -public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString { - let track = Unmanaged.fromOpaque(track).takeUnretainedValue() - return track.sid! as CFString -} - -@_cdecl("LKRemoteAudioTrackGetSid") -public func LKRemoteAudioTrackGetSid(track: UnsafeRawPointer) -> CFString { - let track = Unmanaged.fromOpaque(track).takeUnretainedValue() - return track.sid! as CFString -} - -@_cdecl("LKDisplaySources") -public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) { - MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in - callback(data, displaySources as CFArray, nil) - }.catch { error in - callback(data, nil, error.localizedDescription as CFString) - } -} - -@_cdecl("LKLocalTrackPublicationSetMute") -public func LKLocalTrackPublicationSetMute( - publication: UnsafeRawPointer, - muted: Bool, - on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, - callback_data: UnsafeRawPointer -) { - let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - - if muted { - publication.mute().then { - on_complete(callback_data, nil) - }.catch { error in - on_complete(callback_data, error.localizedDescription as CFString) - } - } else { - publication.unmute().then { - on_complete(callback_data, nil) - }.catch { error in - on_complete(callback_data, error.localizedDescription as CFString) - } - } -} - -@_cdecl("LKRemoteTrackPublicationSetEnabled") -public func LKRemoteTrackPublicationSetEnabled( - publication: UnsafeRawPointer, - enabled: Bool, - on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, - callback_data: UnsafeRawPointer -) { - let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - - publication.set(enabled: enabled).then { - on_complete(callback_data, nil) - }.catch { error in - on_complete(callback_data, error.localizedDescription as CFString) - } -} - -@_cdecl("LKRemoteTrackPublicationIsMuted") -public func LKRemoteTrackPublicationIsMuted( - publication: UnsafeRawPointer -) -> Bool { - let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - - return publication.muted -} - -@_cdecl("LKRemoteTrackPublicationGetSid") -public func LKRemoteTrackPublicationGetSid( - publication: UnsafeRawPointer -) -> CFString { - let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - - return publication.sid as CFString -} diff --git a/crates/live_kit_client2/build.rs b/crates/live_kit_client2/build.rs deleted file mode 100644 index a2b7ef866d..0000000000 --- a/crates/live_kit_client2/build.rs +++ /dev/null @@ -1,182 +0,0 @@ -use serde::Deserialize; -use std::{ - env, - path::{Path, PathBuf}, - process::Command, -}; - -const SWIFT_PACKAGE_NAME: &str = "LiveKitBridge2"; - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SwiftTargetInfo { - pub triple: String, - pub unversioned_triple: String, - pub module_triple: String, - pub swift_runtime_compatibility_version: String, - #[serde(rename = "librariesRequireRPath")] - pub libraries_require_rpath: bool, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SwiftPaths { - pub runtime_library_paths: Vec, - pub runtime_library_import_paths: Vec, - pub runtime_resource_path: String, -} - -#[derive(Debug, Deserialize)] -pub struct SwiftTarget { - pub target: SwiftTargetInfo, - pub paths: SwiftPaths, -} - -const MACOS_TARGET_VERSION: &str = "10.15.7"; - -fn main() { - if cfg!(not(any(test, feature = "test-support"))) { - let swift_target = get_swift_target(); - - build_bridge(&swift_target); - link_swift_stdlib(&swift_target); - link_webrtc_framework(&swift_target); - - // Register exported Objective-C selectors, protocols, etc when building example binaries. - println!("cargo:rustc-link-arg=-Wl,-ObjC"); - } -} - -fn build_bridge(swift_target: &SwiftTarget) { - println!("cargo:rerun-if-env-changed=MACOSX_DEPLOYMENT_TARGET"); - println!("cargo:rerun-if-changed={}/Sources", SWIFT_PACKAGE_NAME); - println!( - "cargo:rerun-if-changed={}/Package.swift", - SWIFT_PACKAGE_NAME - ); - println!( - "cargo:rerun-if-changed={}/Package.resolved", - SWIFT_PACKAGE_NAME - ); - - let swift_package_root = swift_package_root(); - let swift_target_folder = swift_target_folder(); - let swift_cache_folder = swift_cache_folder(); - if !Command::new("swift") - .arg("build") - .arg("--disable-automatic-resolution") - .args(["--configuration", &env::var("PROFILE").unwrap()]) - .args(["--triple", &swift_target.target.triple]) - .args(["--build-path".into(), swift_target_folder]) - .args(["--cache-path".into(), swift_cache_folder]) - .current_dir(&swift_package_root) - .status() - .unwrap() - .success() - { - panic!( - "Failed to compile swift package in {}", - swift_package_root.display() - ); - } - - println!( - "cargo:rustc-link-search=native={}", - swift_target.out_dir_path().display() - ); - println!("cargo:rustc-link-lib=static={}", SWIFT_PACKAGE_NAME); -} - -fn link_swift_stdlib(swift_target: &SwiftTarget) { - for path in &swift_target.paths.runtime_library_paths { - println!("cargo:rustc-link-search=native={}", path); - } -} - -fn link_webrtc_framework(swift_target: &SwiftTarget) { - let swift_out_dir_path = swift_target.out_dir_path(); - println!("cargo:rustc-link-lib=framework=WebRTC"); - println!( - "cargo:rustc-link-search=framework={}", - swift_out_dir_path.display() - ); - // Find WebRTC.framework as a sibling of the executable when running tests. - println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path"); - // Find WebRTC.framework in parent directory of the executable when running examples. - println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/.."); - - let source_path = swift_out_dir_path.join("WebRTC.framework"); - let deps_dir_path = - PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../deps/WebRTC.framework"); - let target_dir_path = - PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../WebRTC.framework"); - copy_dir(&source_path, &deps_dir_path); - copy_dir(&source_path, &target_dir_path); -} - -fn get_swift_target() -> SwiftTarget { - let mut arch = env::var("CARGO_CFG_TARGET_ARCH").unwrap(); - if arch == "aarch64" { - arch = "arm64".into(); - } - let target = format!("{}-apple-macosx{}", arch, MACOS_TARGET_VERSION); - - let swift_target_info_str = Command::new("swift") - .args(["-target", &target, "-print-target-info"]) - .output() - .unwrap() - .stdout; - - serde_json::from_slice(&swift_target_info_str).unwrap() -} - -fn swift_package_root() -> PathBuf { - env::current_dir().unwrap().join(SWIFT_PACKAGE_NAME) -} - -fn swift_target_folder() -> PathBuf { - let target = env::var("TARGET").unwrap(); - env::current_dir() - .unwrap() - .join(format!("../../target/{target}/{SWIFT_PACKAGE_NAME}_target")) -} - -fn swift_cache_folder() -> PathBuf { - let target = env::var("TARGET").unwrap(); - env::current_dir() - .unwrap() - .join(format!("../../target/{target}/{SWIFT_PACKAGE_NAME}_cache")) -} - -fn copy_dir(source: &Path, destination: &Path) { - assert!( - Command::new("rm") - .arg("-rf") - .arg(destination) - .status() - .unwrap() - .success(), - "could not remove {:?} before copying", - destination - ); - - assert!( - Command::new("cp") - .arg("-R") - .args([source, destination]) - .status() - .unwrap() - .success(), - "could not copy {:?} to {:?}", - source, - destination - ); -} - -impl SwiftTarget { - fn out_dir_path(&self) -> PathBuf { - swift_target_folder() - .join(&self.target.unversioned_triple) - .join(env::var("PROFILE").unwrap()) - } -} diff --git a/crates/live_kit_client2/examples/test_app2.rs b/crates/live_kit_client2/examples/test_app2.rs deleted file mode 100644 index fc2c8deb71..0000000000 --- a/crates/live_kit_client2/examples/test_app2.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use futures::StreamExt; -use gpui::{actions, KeyBinding}; -use live_kit_client2::{ - LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, -}; -use live_kit_server::token::{self, VideoGrant}; -use log::LevelFilter; -use simplelog::SimpleLogger; - -actions!(live_kit_client, [Quit]); - -fn main() { - SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger"); - - gpui::App::production(Arc::new(())).run(|cx| { - #[cfg(any(test, feature = "test-support"))] - println!("USING TEST LIVEKIT"); - - #[cfg(not(any(test, feature = "test-support")))] - println!("USING REAL LIVEKIT"); - - cx.activate(true); - - cx.on_action(quit); - cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]); - - // todo!() - // cx.set_menus(vec![Menu { - // name: "Zed", - // items: vec![MenuItem::Action { - // name: "Quit", - // action: Box::new(Quit), - // os_action: None, - // }], - // }]); - - let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into()); - let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into()); - let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into()); - - cx.spawn(|cx| async move { - let user_a_token = token::create( - &live_kit_key, - &live_kit_secret, - Some("test-participant-1"), - VideoGrant::to_join("test-room"), - ) - .unwrap(); - let room_a = Room::new(); - room_a.connect(&live_kit_url, &user_a_token).await.unwrap(); - - let user2_token = token::create( - &live_kit_key, - &live_kit_secret, - Some("test-participant-2"), - VideoGrant::to_join("test-room"), - ) - .unwrap(); - let room_b = Room::new(); - room_b.connect(&live_kit_url, &user2_token).await.unwrap(); - - let mut audio_track_updates = room_b.remote_audio_track_updates(); - let audio_track = LocalAudioTrack::create(); - let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); - - if let RemoteAudioTrackUpdate::Subscribed(track, _) = - audio_track_updates.next().await.unwrap() - { - let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); - assert_eq!(remote_tracks.len(), 1); - assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); - assert_eq!(track.publisher_id(), "test-participant-1"); - } else { - panic!("unexpected message"); - } - - audio_track_publication.set_mute(true).await.unwrap(); - - println!("waiting for mute changed!"); - if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - audio_track_updates.next().await.unwrap() - { - let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); - assert_eq!(remote_tracks[0].sid(), track_id); - assert_eq!(muted, true); - } else { - panic!("unexpected message"); - } - - audio_track_publication.set_mute(false).await.unwrap(); - - if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - audio_track_updates.next().await.unwrap() - { - let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); - assert_eq!(remote_tracks[0].sid(), track_id); - assert_eq!(muted, false); - } else { - panic!("unexpected message"); - } - - println!("Pausing for 5 seconds to test audio, make some noise!"); - let timer = cx.background_executor().timer(Duration::from_secs(5)); - timer.await; - let remote_audio_track = room_b - .remote_audio_tracks("test-participant-1") - .pop() - .unwrap(); - room_a.unpublish_track(audio_track_publication); - - // Clear out any active speakers changed messages - let mut next = audio_track_updates.next().await.unwrap(); - while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { - println!("Speakers changed: {:?}", speakers); - next = audio_track_updates.next().await.unwrap(); - } - - if let RemoteAudioTrackUpdate::Unsubscribed { - publisher_id, - track_id, - } = next - { - assert_eq!(publisher_id, "test-participant-1"); - assert_eq!(remote_audio_track.sid(), track_id); - assert_eq!(room_b.remote_audio_tracks("test-participant-1").len(), 0); - } else { - panic!("unexpected message"); - } - - let mut video_track_updates = room_b.remote_video_track_updates(); - let displays = room_a.display_sources().await.unwrap(); - let display = displays.into_iter().next().unwrap(); - - let local_video_track = LocalVideoTrack::screen_share_for_display(&display); - let local_video_track_publication = - room_a.publish_video_track(local_video_track).await.unwrap(); - - if let RemoteVideoTrackUpdate::Subscribed(track) = - video_track_updates.next().await.unwrap() - { - let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); - assert_eq!(remote_video_tracks.len(), 1); - assert_eq!(remote_video_tracks[0].publisher_id(), "test-participant-1"); - assert_eq!(track.publisher_id(), "test-participant-1"); - } else { - panic!("unexpected message"); - } - - let remote_video_track = room_b - .remote_video_tracks("test-participant-1") - .pop() - .unwrap(); - room_a.unpublish_track(local_video_track_publication); - if let RemoteVideoTrackUpdate::Unsubscribed { - publisher_id, - track_id, - } = video_track_updates.next().await.unwrap() - { - assert_eq!(publisher_id, "test-participant-1"); - assert_eq!(remote_video_track.sid(), track_id); - assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0); - } else { - panic!("unexpected message"); - } - - cx.update(|cx| cx.shutdown()).ok(); - }) - .detach(); - }); -} - -fn quit(_: &Quit, cx: &mut gpui::AppContext) { - cx.quit(); -} diff --git a/crates/live_kit_client2/src/live_kit_client2.rs b/crates/live_kit_client2/src/live_kit_client2.rs deleted file mode 100644 index 47cc3873ff..0000000000 --- a/crates/live_kit_client2/src/live_kit_client2.rs +++ /dev/null @@ -1,11 +0,0 @@ -#[cfg(not(any(test, feature = "test-support")))] -pub mod prod; - -#[cfg(not(any(test, feature = "test-support")))] -pub use prod::*; - -#[cfg(any(test, feature = "test-support"))] -pub mod test; - -#[cfg(any(test, feature = "test-support"))] -pub use test::*; diff --git a/crates/live_kit_client2/src/prod.rs b/crates/live_kit_client2/src/prod.rs deleted file mode 100644 index b2b83e95fc..0000000000 --- a/crates/live_kit_client2/src/prod.rs +++ /dev/null @@ -1,947 +0,0 @@ -use anyhow::{anyhow, Context, Result}; -use core_foundation::{ - array::{CFArray, CFArrayRef}, - base::{CFRelease, CFRetain, TCFType}, - string::{CFString, CFStringRef}, -}; -use futures::{ - channel::{mpsc, oneshot}, - Future, -}; -pub use media::core_video::CVImageBuffer; -use media::core_video::CVImageBufferRef; -use parking_lot::Mutex; -use postage::watch; -use std::{ - ffi::c_void, - sync::{Arc, Weak}, -}; - -// SAFETY: Most live kit types are threadsafe: -// https://github.com/livekit/client-sdk-swift#thread-safety -macro_rules! pointer_type { - ($pointer_name:ident) => { - #[repr(transparent)] - #[derive(Copy, Clone, Debug)] - pub struct $pointer_name(pub *const std::ffi::c_void); - unsafe impl Send for $pointer_name {} - }; -} - -mod swift { - pointer_type!(Room); - pointer_type!(LocalAudioTrack); - pointer_type!(RemoteAudioTrack); - pointer_type!(LocalVideoTrack); - pointer_type!(RemoteVideoTrack); - pointer_type!(LocalTrackPublication); - pointer_type!(RemoteTrackPublication); - pointer_type!(MacOSDisplay); - pointer_type!(RoomDelegate); -} - -extern "C" { - fn LKRoomDelegateCreate( - callback_data: *mut c_void, - on_did_disconnect: extern "C" fn(callback_data: *mut c_void), - on_did_subscribe_to_remote_audio_track: extern "C" fn( - callback_data: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - remote_track: swift::RemoteAudioTrack, - remote_publication: swift::RemoteTrackPublication, - ), - on_did_unsubscribe_from_remote_audio_track: extern "C" fn( - callback_data: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - ), - on_mute_changed_from_remote_audio_track: extern "C" fn( - callback_data: *mut c_void, - track_id: CFStringRef, - muted: bool, - ), - on_active_speakers_changed: extern "C" fn( - callback_data: *mut c_void, - participants: CFArrayRef, - ), - on_did_subscribe_to_remote_video_track: extern "C" fn( - callback_data: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - remote_track: swift::RemoteVideoTrack, - ), - on_did_unsubscribe_from_remote_video_track: extern "C" fn( - callback_data: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - ), - ) -> swift::RoomDelegate; - - fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room; - fn LKRoomConnect( - room: swift::Room, - url: CFStringRef, - token: CFStringRef, - callback: extern "C" fn(*mut c_void, CFStringRef), - callback_data: *mut c_void, - ); - fn LKRoomDisconnect(room: swift::Room); - fn LKRoomPublishVideoTrack( - room: swift::Room, - track: swift::LocalVideoTrack, - callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), - callback_data: *mut c_void, - ); - fn LKRoomPublishAudioTrack( - room: swift::Room, - track: swift::LocalAudioTrack, - callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), - callback_data: *mut c_void, - ); - fn LKRoomUnpublishTrack(room: swift::Room, publication: swift::LocalTrackPublication); - - fn LKRoomAudioTracksForRemoteParticipant( - room: swift::Room, - participant_id: CFStringRef, - ) -> CFArrayRef; - - fn LKRoomAudioTrackPublicationsForRemoteParticipant( - room: swift::Room, - participant_id: CFStringRef, - ) -> CFArrayRef; - - fn LKRoomVideoTracksForRemoteParticipant( - room: swift::Room, - participant_id: CFStringRef, - ) -> CFArrayRef; - - fn LKVideoRendererCreate( - callback_data: *mut c_void, - on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool, - on_drop: extern "C" fn(callback_data: *mut c_void), - ) -> *const c_void; - - fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef; - fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void); - fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef; - - fn LKDisplaySources( - callback_data: *mut c_void, - callback: extern "C" fn( - callback_data: *mut c_void, - sources: CFArrayRef, - error: CFStringRef, - ), - ); - fn LKCreateScreenShareTrackForDisplay(display: swift::MacOSDisplay) -> swift::LocalVideoTrack; - fn LKLocalAudioTrackCreateTrack() -> swift::LocalAudioTrack; - - fn LKLocalTrackPublicationSetMute( - publication: swift::LocalTrackPublication, - muted: bool, - on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), - callback_data: *mut c_void, - ); - - fn LKRemoteTrackPublicationSetEnabled( - publication: swift::RemoteTrackPublication, - enabled: bool, - on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), - callback_data: *mut c_void, - ); - - fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool; - fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; -} - -pub type Sid = String; - -#[derive(Clone, Eq, PartialEq)] -pub enum ConnectionState { - Disconnected, - Connected { url: String, token: String }, -} - -pub struct Room { - native_room: Mutex, - connection: Mutex<( - watch::Sender, - watch::Receiver, - )>, - remote_audio_track_subscribers: Mutex>>, - remote_video_track_subscribers: Mutex>>, - _delegate: Mutex, -} - -trait AssertSendSync: Send {} -impl AssertSendSync for Room {} - -impl Room { - pub fn new() -> Arc { - Arc::new_cyclic(|weak_room| { - let delegate = RoomDelegate::new(weak_room.clone()); - Self { - native_room: Mutex::new(unsafe { LKRoomCreate(delegate.native_delegate) }), - connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)), - remote_audio_track_subscribers: Default::default(), - remote_video_track_subscribers: Default::default(), - _delegate: Mutex::new(delegate), - } - }) - } - - pub fn status(&self) -> watch::Receiver { - self.connection.lock().1.clone() - } - - pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { - let url = CFString::new(url); - let token = CFString::new(token); - let (did_connect, tx, rx) = Self::build_done_callback(); - unsafe { - LKRoomConnect( - *self.native_room.lock(), - url.as_concrete_TypeRef(), - token.as_concrete_TypeRef(), - did_connect, - tx, - ) - } - - let this = self.clone(); - let url = url.to_string(); - let token = token.to_string(); - async move { - rx.await.unwrap().context("error connecting to room")?; - *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token }; - Ok(()) - } - } - - fn did_disconnect(&self) { - *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected; - } - - pub fn display_sources(self: &Arc) -> impl Future>> { - extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) { - unsafe { - let tx = Box::from_raw(tx as *mut oneshot::Sender>>); - - if sources.is_null() { - let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error)))); - } else { - let sources = CFArray::wrap_under_get_rule(sources) - .into_iter() - .map(|source| MacOSDisplay::new(swift::MacOSDisplay(*source))) - .collect(); - - let _ = tx.send(Ok(sources)); - } - } - } - - let (tx, rx) = oneshot::channel(); - - unsafe { - LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback); - } - - async move { rx.await.unwrap() } - } - - pub fn publish_video_track( - self: &Arc, - track: LocalVideoTrack, - ) -> impl Future> { - let (tx, rx) = oneshot::channel::>(); - extern "C" fn callback( - tx: *mut c_void, - publication: swift::LocalTrackPublication, - error: CFStringRef, - ) { - let tx = - unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; - if error.is_null() { - let _ = tx.send(Ok(LocalTrackPublication::new(publication))); - } else { - let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; - let _ = tx.send(Err(anyhow!(error))); - } - } - unsafe { - LKRoomPublishVideoTrack( - *self.native_room.lock(), - track.0, - callback, - Box::into_raw(Box::new(tx)) as *mut c_void, - ); - } - async { rx.await.unwrap().context("error publishing video track") } - } - - pub fn publish_audio_track( - self: &Arc, - track: LocalAudioTrack, - ) -> impl Future> { - let (tx, rx) = oneshot::channel::>(); - extern "C" fn callback( - tx: *mut c_void, - publication: swift::LocalTrackPublication, - error: CFStringRef, - ) { - let tx = - unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; - if error.is_null() { - let _ = tx.send(Ok(LocalTrackPublication::new(publication))); - } else { - let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; - let _ = tx.send(Err(anyhow!(error))); - } - } - unsafe { - LKRoomPublishAudioTrack( - *self.native_room.lock(), - track.0, - callback, - Box::into_raw(Box::new(tx)) as *mut c_void, - ); - } - async { rx.await.unwrap().context("error publishing audio track") } - } - - pub fn unpublish_track(&self, publication: LocalTrackPublication) { - unsafe { - LKRoomUnpublishTrack(*self.native_room.lock(), publication.0); - } - } - - pub fn remote_video_tracks(&self, participant_id: &str) -> Vec> { - unsafe { - let tracks = LKRoomVideoTracksForRemoteParticipant( - *self.native_room.lock(), - CFString::new(participant_id).as_concrete_TypeRef(), - ); - - if tracks.is_null() { - Vec::new() - } else { - let tracks = CFArray::wrap_under_get_rule(tracks); - tracks - .into_iter() - .map(|native_track| { - let native_track = swift::RemoteVideoTrack(*native_track); - let id = - CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track)) - .to_string(); - Arc::new(RemoteVideoTrack::new( - native_track, - id, - participant_id.into(), - )) - }) - .collect() - } - } - } - - pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec> { - unsafe { - let tracks = LKRoomAudioTracksForRemoteParticipant( - *self.native_room.lock(), - CFString::new(participant_id).as_concrete_TypeRef(), - ); - - if tracks.is_null() { - Vec::new() - } else { - let tracks = CFArray::wrap_under_get_rule(tracks); - tracks - .into_iter() - .map(|native_track| { - let native_track = swift::RemoteAudioTrack(*native_track); - let id = - CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track)) - .to_string(); - Arc::new(RemoteAudioTrack::new( - native_track, - id, - participant_id.into(), - )) - }) - .collect() - } - } - } - - pub fn remote_audio_track_publications( - &self, - participant_id: &str, - ) -> Vec> { - unsafe { - let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant( - *self.native_room.lock(), - CFString::new(participant_id).as_concrete_TypeRef(), - ); - - if tracks.is_null() { - Vec::new() - } else { - let tracks = CFArray::wrap_under_get_rule(tracks); - tracks - .into_iter() - .map(|native_track_publication| { - let native_track_publication = - swift::RemoteTrackPublication(*native_track_publication); - Arc::new(RemoteTrackPublication::new(native_track_publication)) - }) - .collect() - } - } - } - - pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver { - let (tx, rx) = mpsc::unbounded(); - self.remote_audio_track_subscribers.lock().push(tx); - rx - } - - pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver { - let (tx, rx) = mpsc::unbounded(); - self.remote_video_track_subscribers.lock().push(tx); - rx - } - - fn did_subscribe_to_remote_audio_track( - &self, - track: RemoteAudioTrack, - publication: RemoteTrackPublication, - ) { - let track = Arc::new(track); - let publication = Arc::new(publication); - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed( - track.clone(), - publication.clone(), - )) - .is_ok() - }); - } - - fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) { - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed { - publisher_id: publisher_id.clone(), - track_id: track_id.clone(), - }) - .is_ok() - }); - } - - fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) { - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged { - track_id: track_id.clone(), - muted, - }) - .is_ok() - }); - } - - // A vec of publisher IDs - fn active_speakers_changed(&self, speakers: Vec) { - self.remote_audio_track_subscribers - .lock() - .retain(move |tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged { - speakers: speakers.clone(), - }) - .is_ok() - }); - } - - fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) { - let track = Arc::new(track); - self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone())) - .is_ok() - }); - } - - fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) { - self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed { - publisher_id: publisher_id.clone(), - track_id: track_id.clone(), - }) - .is_ok() - }); - } - - fn build_done_callback() -> ( - extern "C" fn(*mut c_void, CFStringRef), - *mut c_void, - oneshot::Receiver>, - ) { - let (tx, rx) = oneshot::channel(); - extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) { - let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; - if error.is_null() { - let _ = tx.send(Ok(())); - } else { - let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; - let _ = tx.send(Err(anyhow!(error))); - } - } - ( - done_callback, - Box::into_raw(Box::new(tx)) as *mut c_void, - rx, - ) - } - - pub fn set_display_sources(&self, _: Vec) { - unreachable!("This is a test-only function") - } -} - -impl Drop for Room { - fn drop(&mut self) { - unsafe { - let native_room = &*self.native_room.lock(); - LKRoomDisconnect(*native_room); - CFRelease(native_room.0); - } - } -} - -struct RoomDelegate { - native_delegate: swift::RoomDelegate, - _weak_room: Weak, -} - -impl RoomDelegate { - fn new(weak_room: Weak) -> Self { - let native_delegate = unsafe { - LKRoomDelegateCreate( - weak_room.as_ptr() as *mut c_void, - Self::on_did_disconnect, - Self::on_did_subscribe_to_remote_audio_track, - Self::on_did_unsubscribe_from_remote_audio_track, - Self::on_mute_change_from_remote_audio_track, - Self::on_active_speakers_changed, - Self::on_did_subscribe_to_remote_video_track, - Self::on_did_unsubscribe_from_remote_video_track, - ) - }; - Self { - native_delegate, - _weak_room: weak_room, - } - } - - extern "C" fn on_did_disconnect(room: *mut c_void) { - let room = unsafe { Weak::from_raw(room as *mut Room) }; - if let Some(room) = room.upgrade() { - room.did_disconnect(); - } - let _ = Weak::into_raw(room); - } - - extern "C" fn on_did_subscribe_to_remote_audio_track( - room: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - track: swift::RemoteAudioTrack, - publication: swift::RemoteTrackPublication, - ) { - let room = unsafe { Weak::from_raw(room as *mut Room) }; - let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; - let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; - let track = RemoteAudioTrack::new(track, track_id, publisher_id); - let publication = RemoteTrackPublication::new(publication); - if let Some(room) = room.upgrade() { - room.did_subscribe_to_remote_audio_track(track, publication); - } - let _ = Weak::into_raw(room); - } - - extern "C" fn on_did_unsubscribe_from_remote_audio_track( - room: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - ) { - let room = unsafe { Weak::from_raw(room as *mut Room) }; - let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; - let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; - if let Some(room) = room.upgrade() { - room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id); - } - let _ = Weak::into_raw(room); - } - - extern "C" fn on_mute_change_from_remote_audio_track( - room: *mut c_void, - track_id: CFStringRef, - muted: bool, - ) { - let room = unsafe { Weak::from_raw(room as *mut Room) }; - let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; - if let Some(room) = room.upgrade() { - room.mute_changed_from_remote_audio_track(track_id, muted); - } - let _ = Weak::into_raw(room); - } - - extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) { - if participants.is_null() { - return; - } - - let room = unsafe { Weak::from_raw(room as *mut Room) }; - let speakers = unsafe { - CFArray::wrap_under_get_rule(participants) - .into_iter() - .map( - |speaker: core_foundation::base::ItemRef<'_, *const c_void>| { - CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string() - }, - ) - .collect() - }; - - if let Some(room) = room.upgrade() { - room.active_speakers_changed(speakers); - } - let _ = Weak::into_raw(room); - } - - extern "C" fn on_did_subscribe_to_remote_video_track( - room: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - track: swift::RemoteVideoTrack, - ) { - let room = unsafe { Weak::from_raw(room as *mut Room) }; - let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; - let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; - let track = RemoteVideoTrack::new(track, track_id, publisher_id); - if let Some(room) = room.upgrade() { - room.did_subscribe_to_remote_video_track(track); - } - let _ = Weak::into_raw(room); - } - - extern "C" fn on_did_unsubscribe_from_remote_video_track( - room: *mut c_void, - publisher_id: CFStringRef, - track_id: CFStringRef, - ) { - let room = unsafe { Weak::from_raw(room as *mut Room) }; - let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; - let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; - if let Some(room) = room.upgrade() { - room.did_unsubscribe_from_remote_video_track(publisher_id, track_id); - } - let _ = Weak::into_raw(room); - } -} - -impl Drop for RoomDelegate { - fn drop(&mut self) { - unsafe { - CFRelease(self.native_delegate.0); - } - } -} - -pub struct LocalAudioTrack(swift::LocalAudioTrack); - -impl LocalAudioTrack { - pub fn create() -> Self { - Self(unsafe { LKLocalAudioTrackCreateTrack() }) - } -} - -impl Drop for LocalAudioTrack { - fn drop(&mut self) { - unsafe { CFRelease(self.0 .0) } - } -} - -pub struct LocalVideoTrack(swift::LocalVideoTrack); - -impl LocalVideoTrack { - pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { - Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) }) - } -} - -impl Drop for LocalVideoTrack { - fn drop(&mut self) { - unsafe { CFRelease(self.0 .0) } - } -} - -pub struct LocalTrackPublication(swift::LocalTrackPublication); - -impl LocalTrackPublication { - pub fn new(native_track_publication: swift::LocalTrackPublication) -> Self { - unsafe { - CFRetain(native_track_publication.0); - } - Self(native_track_publication) - } - - pub fn set_mute(&self, muted: bool) -> impl Future> { - let (tx, rx) = futures::channel::oneshot::channel(); - - extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { - let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; - if error.is_null() { - tx.send(Ok(())).ok(); - } else { - let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; - tx.send(Err(anyhow!(error))).ok(); - } - } - - unsafe { - LKLocalTrackPublicationSetMute( - self.0, - muted, - complete_callback, - Box::into_raw(Box::new(tx)) as *mut c_void, - ) - } - - async move { rx.await.unwrap() } - } -} - -impl Drop for LocalTrackPublication { - fn drop(&mut self) { - unsafe { CFRelease(self.0 .0) } - } -} - -pub struct RemoteTrackPublication { - native_publication: Mutex, -} - -impl RemoteTrackPublication { - pub fn new(native_track_publication: swift::RemoteTrackPublication) -> Self { - unsafe { - CFRetain(native_track_publication.0); - } - Self { - native_publication: Mutex::new(native_track_publication), - } - } - - pub fn sid(&self) -> String { - unsafe { - CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid( - *self.native_publication.lock(), - )) - .to_string() - } - } - - pub fn is_muted(&self) -> bool { - unsafe { LKRemoteTrackPublicationIsMuted(*self.native_publication.lock()) } - } - - pub fn set_enabled(&self, enabled: bool) -> impl Future> { - let (tx, rx) = futures::channel::oneshot::channel(); - - extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { - let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; - if error.is_null() { - tx.send(Ok(())).ok(); - } else { - let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; - tx.send(Err(anyhow!(error))).ok(); - } - } - - unsafe { - LKRemoteTrackPublicationSetEnabled( - *self.native_publication.lock(), - enabled, - complete_callback, - Box::into_raw(Box::new(tx)) as *mut c_void, - ) - } - - async move { rx.await.unwrap() } - } -} - -impl Drop for RemoteTrackPublication { - fn drop(&mut self) { - unsafe { CFRelease((*self.native_publication.lock()).0) } - } -} - -#[derive(Debug)] -pub struct RemoteAudioTrack { - native_track: Mutex, - sid: Sid, - publisher_id: String, -} - -impl RemoteAudioTrack { - fn new(native_track: swift::RemoteAudioTrack, sid: Sid, publisher_id: String) -> Self { - unsafe { - CFRetain(native_track.0); - } - Self { - native_track: Mutex::new(native_track), - sid, - publisher_id, - } - } - - pub fn sid(&self) -> &str { - &self.sid - } - - pub fn publisher_id(&self) -> &str { - &self.publisher_id - } - - pub fn enable(&self) -> impl Future> { - async { Ok(()) } - } - - pub fn disable(&self) -> impl Future> { - async { Ok(()) } - } -} - -impl Drop for RemoteAudioTrack { - fn drop(&mut self) { - unsafe { CFRelease(self.native_track.lock().0) } - } -} - -#[derive(Debug)] -pub struct RemoteVideoTrack { - native_track: Mutex, - sid: Sid, - publisher_id: String, -} - -impl RemoteVideoTrack { - fn new(native_track: swift::RemoteVideoTrack, sid: Sid, publisher_id: String) -> Self { - unsafe { - CFRetain(native_track.0); - } - Self { - native_track: Mutex::new(native_track), - sid, - publisher_id, - } - } - - pub fn sid(&self) -> &str { - &self.sid - } - - pub fn publisher_id(&self) -> &str { - &self.publisher_id - } - - pub fn frames(&self) -> async_broadcast::Receiver { - extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool { - unsafe { - let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender); - let buffer = CVImageBuffer::wrap_under_get_rule(frame); - let result = tx.try_broadcast(Frame(buffer)); - let _ = Box::into_raw(tx); - match result { - Ok(_) => true, - Err(async_broadcast::TrySendError::Closed(_)) - | Err(async_broadcast::TrySendError::Inactive(_)) => { - log::warn!("no active receiver for frame"); - false - } - Err(async_broadcast::TrySendError::Full(_)) => { - log::warn!("skipping frame as receiver is not keeping up"); - true - } - } - } - } - - extern "C" fn on_drop(callback_data: *mut c_void) { - unsafe { - let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender); - } - } - - let (tx, rx) = async_broadcast::broadcast(64); - unsafe { - let renderer = LKVideoRendererCreate( - Box::into_raw(Box::new(tx)) as *mut c_void, - on_frame, - on_drop, - ); - LKVideoTrackAddRenderer(*self.native_track.lock(), renderer); - rx - } - } -} - -impl Drop for RemoteVideoTrack { - fn drop(&mut self) { - unsafe { CFRelease(self.native_track.lock().0) } - } -} - -pub enum RemoteVideoTrackUpdate { - Subscribed(Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -pub enum RemoteAudioTrackUpdate { - ActiveSpeakersChanged { speakers: Vec }, - MuteChanged { track_id: Sid, muted: bool }, - Subscribed(Arc, Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -pub struct MacOSDisplay(swift::MacOSDisplay); - -impl MacOSDisplay { - fn new(ptr: swift::MacOSDisplay) -> Self { - unsafe { - CFRetain(ptr.0); - } - Self(ptr) - } -} - -impl Drop for MacOSDisplay { - fn drop(&mut self) { - unsafe { CFRelease(self.0 .0) } - } -} - -#[derive(Clone)] -pub struct Frame(CVImageBuffer); - -impl Frame { - pub fn width(&self) -> usize { - self.0.width() - } - - pub fn height(&self) -> usize { - self.0.height() - } - - pub fn image(&self) -> CVImageBuffer { - self.0.clone() - } -} diff --git a/crates/live_kit_client2/src/test.rs b/crates/live_kit_client2/src/test.rs deleted file mode 100644 index 1106e66f31..0000000000 --- a/crates/live_kit_client2/src/test.rs +++ /dev/null @@ -1,651 +0,0 @@ -use anyhow::{anyhow, Context, Result}; -use async_trait::async_trait; -use collections::{BTreeMap, HashMap}; -use futures::Stream; -use gpui::BackgroundExecutor; -use live_kit_server::token; -use media::core_video::CVImageBuffer; -use parking_lot::Mutex; -use postage::watch; -use std::{future::Future, mem, sync::Arc}; - -static SERVERS: Mutex>> = Mutex::new(BTreeMap::new()); - -pub struct TestServer { - pub url: String, - pub api_key: String, - pub secret_key: String, - rooms: Mutex>, - executor: BackgroundExecutor, -} - -impl TestServer { - pub fn create( - url: String, - api_key: String, - secret_key: String, - executor: BackgroundExecutor, - ) -> Result> { - let mut servers = SERVERS.lock(); - if servers.contains_key(&url) { - Err(anyhow!("a server with url {:?} already exists", url)) - } else { - let server = Arc::new(TestServer { - url: url.clone(), - api_key, - secret_key, - rooms: Default::default(), - executor, - }); - servers.insert(url, server.clone()); - Ok(server) - } - } - - fn get(url: &str) -> Result> { - Ok(SERVERS - .lock() - .get(url) - .ok_or_else(|| anyhow!("no server found for url"))? - .clone()) - } - - pub fn teardown(&self) -> Result<()> { - SERVERS - .lock() - .remove(&self.url) - .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?; - Ok(()) - } - - pub fn create_api_client(&self) -> TestApiClient { - TestApiClient { - url: self.url.clone(), - } - } - - pub async fn create_room(&self, room: String) -> Result<()> { - self.executor.simulate_random_delay().await; - let mut server_rooms = self.rooms.lock(); - if server_rooms.contains_key(&room) { - Err(anyhow!("room {:?} already exists", room)) - } else { - server_rooms.insert(room, Default::default()); - Ok(()) - } - } - - async fn delete_room(&self, room: String) -> Result<()> { - // TODO: clear state associated with all `Room`s. - self.executor.simulate_random_delay().await; - let mut server_rooms = self.rooms.lock(); - server_rooms - .remove(&room) - .ok_or_else(|| anyhow!("room {:?} does not exist", room))?; - Ok(()) - } - - async fn join_room(&self, token: String, client_room: Arc) -> Result<()> { - self.executor.simulate_random_delay().await; - let claims = live_kit_server::token::validate(&token, &self.secret_key)?; - let identity = claims.sub.unwrap().to_string(); - let room_name = claims.video.room.unwrap(); - let mut server_rooms = self.rooms.lock(); - let room = (*server_rooms).entry(room_name.to_string()).or_default(); - - if room.client_rooms.contains_key(&identity) { - Err(anyhow!( - "{:?} attempted to join room {:?} twice", - identity, - room_name - )) - } else { - for track in &room.video_tracks { - client_room - .0 - .lock() - .video_track_updates - .0 - .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) - .unwrap(); - } - room.client_rooms.insert(identity, client_room); - Ok(()) - } - } - - async fn leave_room(&self, token: String) -> Result<()> { - self.executor.simulate_random_delay().await; - let claims = live_kit_server::token::validate(&token, &self.secret_key)?; - let identity = claims.sub.unwrap().to_string(); - let room_name = claims.video.room.unwrap(); - let mut server_rooms = self.rooms.lock(); - let room = server_rooms - .get_mut(&*room_name) - .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - room.client_rooms.remove(&identity).ok_or_else(|| { - anyhow!( - "{:?} attempted to leave room {:?} before joining it", - identity, - room_name - ) - })?; - Ok(()) - } - - async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> { - // TODO: clear state associated with the `Room`. - - self.executor.simulate_random_delay().await; - let mut server_rooms = self.rooms.lock(); - let room = server_rooms - .get_mut(&room_name) - .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - room.client_rooms.remove(&identity).ok_or_else(|| { - anyhow!( - "participant {:?} did not join room {:?}", - identity, - room_name - ) - })?; - Ok(()) - } - - pub async fn disconnect_client(&self, client_identity: String) { - self.executor.simulate_random_delay().await; - let mut server_rooms = self.rooms.lock(); - for room in server_rooms.values_mut() { - if let Some(room) = room.client_rooms.remove(&client_identity) { - *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected; - } - } - } - - async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { - self.executor.simulate_random_delay().await; - let claims = live_kit_server::token::validate(&token, &self.secret_key)?; - let identity = claims.sub.unwrap().to_string(); - let room_name = claims.video.room.unwrap(); - - let mut server_rooms = self.rooms.lock(); - let room = server_rooms - .get_mut(&*room_name) - .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - - let track = Arc::new(RemoteVideoTrack { - sid: nanoid::nanoid!(17), - publisher_id: identity.clone(), - frames_rx: local_track.frames_rx.clone(), - }); - - room.video_tracks.push(track.clone()); - - for (id, client_room) in &room.client_rooms { - if *id != identity { - let _ = client_room - .0 - .lock() - .video_track_updates - .0 - .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) - .unwrap(); - } - } - - Ok(()) - } - - async fn publish_audio_track( - &self, - token: String, - _local_track: &LocalAudioTrack, - ) -> Result<()> { - self.executor.simulate_random_delay().await; - let claims = live_kit_server::token::validate(&token, &self.secret_key)?; - let identity = claims.sub.unwrap().to_string(); - let room_name = claims.video.room.unwrap(); - - let mut server_rooms = self.rooms.lock(); - let room = server_rooms - .get_mut(&*room_name) - .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - - let track = Arc::new(RemoteAudioTrack { - sid: nanoid::nanoid!(17), - publisher_id: identity.clone(), - }); - - let publication = Arc::new(RemoteTrackPublication); - - room.audio_tracks.push(track.clone()); - - for (id, client_room) in &room.client_rooms { - if *id != identity { - let _ = client_room - .0 - .lock() - .audio_track_updates - .0 - .try_broadcast(RemoteAudioTrackUpdate::Subscribed( - track.clone(), - publication.clone(), - )) - .unwrap(); - } - } - - Ok(()) - } - - fn video_tracks(&self, token: String) -> Result>> { - let claims = live_kit_server::token::validate(&token, &self.secret_key)?; - let room_name = claims.video.room.unwrap(); - - let mut server_rooms = self.rooms.lock(); - let room = server_rooms - .get_mut(&*room_name) - .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - Ok(room.video_tracks.clone()) - } - - fn audio_tracks(&self, token: String) -> Result>> { - let claims = live_kit_server::token::validate(&token, &self.secret_key)?; - let room_name = claims.video.room.unwrap(); - - let mut server_rooms = self.rooms.lock(); - let room = server_rooms - .get_mut(&*room_name) - .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - Ok(room.audio_tracks.clone()) - } -} - -#[derive(Default)] -struct TestServerRoom { - client_rooms: HashMap>, - video_tracks: Vec>, - audio_tracks: Vec>, -} - -impl TestServerRoom {} - -pub struct TestApiClient { - url: String, -} - -#[async_trait] -impl live_kit_server::api::Client for TestApiClient { - fn url(&self) -> &str { - &self.url - } - - async fn create_room(&self, name: String) -> Result<()> { - let server = TestServer::get(&self.url)?; - server.create_room(name).await?; - Ok(()) - } - - async fn delete_room(&self, name: String) -> Result<()> { - let server = TestServer::get(&self.url)?; - server.delete_room(name).await?; - Ok(()) - } - - async fn remove_participant(&self, room: String, identity: String) -> Result<()> { - let server = TestServer::get(&self.url)?; - server.remove_participant(room, identity).await?; - Ok(()) - } - - fn room_token(&self, room: &str, identity: &str) -> Result { - let server = TestServer::get(&self.url)?; - token::create( - &server.api_key, - &server.secret_key, - Some(identity), - token::VideoGrant::to_join(room), - ) - } - - fn guest_token(&self, room: &str, identity: &str) -> Result { - let server = TestServer::get(&self.url)?; - token::create( - &server.api_key, - &server.secret_key, - Some(identity), - token::VideoGrant::for_guest(room), - ) - } -} - -pub type Sid = String; - -struct RoomState { - connection: ( - watch::Sender, - watch::Receiver, - ), - display_sources: Vec, - audio_track_updates: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), - video_track_updates: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), -} - -#[derive(Clone, Eq, PartialEq)] -pub enum ConnectionState { - Disconnected, - Connected { url: String, token: String }, -} - -pub struct Room(Mutex); - -impl Room { - pub fn new() -> Arc { - Arc::new(Self(Mutex::new(RoomState { - connection: watch::channel_with(ConnectionState::Disconnected), - display_sources: Default::default(), - video_track_updates: async_broadcast::broadcast(128), - audio_track_updates: async_broadcast::broadcast(128), - }))) - } - - pub fn status(&self) -> watch::Receiver { - self.0.lock().connection.1.clone() - } - - pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { - let this = self.clone(); - let url = url.to_string(); - let token = token.to_string(); - async move { - let server = TestServer::get(&url)?; - server - .join_room(token.clone(), this.clone()) - .await - .context("room join")?; - *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token }; - Ok(()) - } - } - - pub fn display_sources(self: &Arc) -> impl Future>> { - let this = self.clone(); - async move { - let server = this.test_server(); - server.executor.simulate_random_delay().await; - Ok(this.0.lock().display_sources.clone()) - } - } - - pub fn publish_video_track( - self: &Arc, - track: LocalVideoTrack, - ) -> impl Future> { - let this = self.clone(); - let track = track.clone(); - async move { - this.test_server() - .publish_video_track(this.token(), track) - .await?; - Ok(LocalTrackPublication) - } - } - pub fn publish_audio_track( - self: &Arc, - track: LocalAudioTrack, - ) -> impl Future> { - let this = self.clone(); - let track = track.clone(); - async move { - this.test_server() - .publish_audio_track(this.token(), &track) - .await?; - Ok(LocalTrackPublication) - } - } - - pub fn unpublish_track(&self, _publication: LocalTrackPublication) {} - - pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec> { - if !self.is_connected() { - return Vec::new(); - } - - self.test_server() - .audio_tracks(self.token()) - .unwrap() - .into_iter() - .filter(|track| track.publisher_id() == publisher_id) - .collect() - } - - pub fn remote_audio_track_publications( - &self, - publisher_id: &str, - ) -> Vec> { - if !self.is_connected() { - return Vec::new(); - } - - self.test_server() - .audio_tracks(self.token()) - .unwrap() - .into_iter() - .filter(|track| track.publisher_id() == publisher_id) - .map(|_track| Arc::new(RemoteTrackPublication {})) - .collect() - } - - pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec> { - if !self.is_connected() { - return Vec::new(); - } - - self.test_server() - .video_tracks(self.token()) - .unwrap() - .into_iter() - .filter(|track| track.publisher_id() == publisher_id) - .collect() - } - - pub fn remote_audio_track_updates(&self) -> impl Stream { - self.0.lock().audio_track_updates.1.clone() - } - - pub fn remote_video_track_updates(&self) -> impl Stream { - self.0.lock().video_track_updates.1.clone() - } - - pub fn set_display_sources(&self, sources: Vec) { - self.0.lock().display_sources = sources; - } - - fn test_server(&self) -> Arc { - match self.0.lock().connection.1.borrow().clone() { - ConnectionState::Disconnected => panic!("must be connected to call this method"), - ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(), - } - } - - fn token(&self) -> String { - match self.0.lock().connection.1.borrow().clone() { - ConnectionState::Disconnected => panic!("must be connected to call this method"), - ConnectionState::Connected { token, .. } => token, - } - } - - fn is_connected(&self) -> bool { - match *self.0.lock().connection.1.borrow() { - ConnectionState::Disconnected => false, - ConnectionState::Connected { .. } => true, - } - } -} - -impl Drop for Room { - fn drop(&mut self) { - if let ConnectionState::Connected { token, .. } = mem::replace( - &mut *self.0.lock().connection.0.borrow_mut(), - ConnectionState::Disconnected, - ) { - if let Ok(server) = TestServer::get(&token) { - let executor = server.executor.clone(); - executor - .spawn(async move { server.leave_room(token).await.unwrap() }) - .detach(); - } - } - } -} - -pub struct LocalTrackPublication; - -impl LocalTrackPublication { - pub fn set_mute(&self, _mute: bool) -> impl Future> { - async { Ok(()) } - } -} - -pub struct RemoteTrackPublication; - -impl RemoteTrackPublication { - pub fn set_enabled(&self, _enabled: bool) -> impl Future> { - async { Ok(()) } - } - - pub fn is_muted(&self) -> bool { - false - } - - pub fn sid(&self) -> String { - "".to_string() - } -} - -#[derive(Clone)] -pub struct LocalVideoTrack { - frames_rx: async_broadcast::Receiver, -} - -impl LocalVideoTrack { - pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { - Self { - frames_rx: display.frames.1.clone(), - } - } -} - -#[derive(Clone)] -pub struct LocalAudioTrack; - -impl LocalAudioTrack { - pub fn create() -> Self { - Self - } -} - -#[derive(Debug)] -pub struct RemoteVideoTrack { - sid: Sid, - publisher_id: Sid, - frames_rx: async_broadcast::Receiver, -} - -impl RemoteVideoTrack { - pub fn sid(&self) -> &str { - &self.sid - } - - pub fn publisher_id(&self) -> &str { - &self.publisher_id - } - - pub fn frames(&self) -> async_broadcast::Receiver { - self.frames_rx.clone() - } -} - -#[derive(Debug)] -pub struct RemoteAudioTrack { - sid: Sid, - publisher_id: Sid, -} - -impl RemoteAudioTrack { - pub fn sid(&self) -> &str { - &self.sid - } - - pub fn publisher_id(&self) -> &str { - &self.publisher_id - } - - pub fn enable(&self) -> impl Future> { - async { Ok(()) } - } - - pub fn disable(&self) -> impl Future> { - async { Ok(()) } - } -} - -#[derive(Clone)] -pub enum RemoteVideoTrackUpdate { - Subscribed(Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -#[derive(Clone)] -pub enum RemoteAudioTrackUpdate { - ActiveSpeakersChanged { speakers: Vec }, - MuteChanged { track_id: Sid, muted: bool }, - Subscribed(Arc, Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -#[derive(Clone)] -pub struct MacOSDisplay { - frames: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), -} - -impl MacOSDisplay { - pub fn new() -> Self { - Self { - frames: async_broadcast::broadcast(128), - } - } - - pub fn send_frame(&self, frame: Frame) { - self.frames.0.try_broadcast(frame).unwrap(); - } -} - -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Frame { - pub label: String, - pub width: usize, - pub height: usize, -} - -impl Frame { - pub fn width(&self) -> usize { - self.width - } - - pub fn height(&self) -> usize { - self.height - } - - pub fn image(&self) -> CVImageBuffer { - unimplemented!("you can't call this in test mode") - } -}