diff --git a/Cargo.lock b/Cargo.lock index 3aca27106c..bb72f5d6ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1169,7 +1169,7 @@ dependencies = [ "futures 0.3.28", "gpui2", "language2", - "live_kit_client", + "live_kit_client2", "log", "media", "postage", @@ -4589,6 +4589,39 @@ dependencies = [ "simplelog", ] +[[package]] +name = "live_kit_client2" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-broadcast", + "async-trait", + "block", + "byteorder", + "bytes 1.5.0", + "cocoa", + "collections", + "core-foundation", + "core-graphics", + "foreign-types", + "futures 0.3.28", + "gpui2", + "hmac 0.12.1", + "jwt", + "live_kit_server", + "log", + "media", + "nanoid", + "objc", + "parking_lot 0.11.2", + "postage", + "serde", + "serde_derive", + "serde_json", + "sha2 0.10.7", + "simplelog", +] + [[package]] name = "live_kit_server" version = "0.1.0" diff --git a/crates/call2/Cargo.toml b/crates/call2/Cargo.toml index f0e47832ed..e918ada3e8 100644 --- a/crates/call2/Cargo.toml +++ b/crates/call2/Cargo.toml @@ -13,7 +13,7 @@ test-support = [ "client2/test-support", "collections/test-support", "gpui2/test-support", - "live_kit_client/test-support", + "live_kit_client2/test-support", "project2/test-support", "util/test-support" ] @@ -24,7 +24,7 @@ client2 = { path = "../client2" } collections = { path = "../collections" } gpui2 = { path = "../gpui2" } log.workspace = true -live_kit_client = { path = "../live_kit_client" } +live_kit_client2 = { path = "../live_kit_client2" } fs2 = { path = "../fs2" } language2 = { path = "../language2" } media = { path = "../media" } @@ -47,6 +47,6 @@ fs2 = { path = "../fs2", features = ["test-support"] } language2 = { path = "../language2", features = ["test-support"] } collections = { path = "../collections", features = ["test-support"] } gpui2 = { path = "../gpui2", features = ["test-support"] } -live_kit_client = { path = "../live_kit_client", features = ["test-support"] } +live_kit_client2 = { path = "../live_kit_client2", features = ["test-support"] } project2 = { path = "../project2", features = ["test-support"] } util = { path = "../util", features = ["test-support"] } diff --git a/crates/call2/src/participant.rs b/crates/call2/src/participant.rs index 7f3e91dbba..a1837e3ad0 100644 --- a/crates/call2/src/participant.rs +++ b/crates/call2/src/participant.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use client2::ParticipantIndex; use client2::{proto, User}; use gpui2::WeakModel; -pub use live_kit_client::Frame; +pub use live_kit_client2::Frame; use project2::Project; use std::{fmt, sync::Arc}; @@ -51,7 +51,7 @@ pub struct RemoteParticipant { #[derive(Clone)] pub struct RemoteVideoTrack { - pub(crate) live_kit_track: Arc, + pub(crate) live_kit_track: Arc, } unsafe impl Send for RemoteVideoTrack {} diff --git a/crates/call2/src/room.rs b/crates/call2/src/room.rs index b7bac52a8b..f0e0b8de17 100644 --- a/crates/call2/src/room.rs +++ b/crates/call2/src/room.rs @@ -19,7 +19,7 @@ use gpui2::{ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel, }; use language2::LanguageRegistry; -use live_kit_client::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate}; +use live_kit_client2::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate}; use postage::{sink::Sink, stream::Stream, watch}; use project2::Project; use settings2::Settings; @@ -59,7 +59,7 @@ pub enum Event { pub struct Room { id: u64, channel_id: Option, - // live_kit: Option, + live_kit: Option, status: RoomStatus, shared_projects: HashSet>, joined_projects: HashSet>, @@ -114,125 +114,130 @@ impl Room { user_store: Model, cx: &mut ModelContext, ) -> Self { - todo!() - // let _live_kit_room = if let Some(connection_info) = live_kit_connection_info { - // let room = live_kit_client::Room::new(); - // let mut status = room.status(); - // // Consume the initial status of the room. - // let _ = status.try_recv(); - // let _maintain_room = cx.spawn(|this, mut cx| async move { - // while let Some(status) = status.next().await { - // let this = if let Some(this) = this.upgrade() { - // this - // } else { - // break; - // }; + let live_kit_room = if let Some(connection_info) = live_kit_connection_info { + let room = live_kit_client2::Room::new(); + let mut status = room.status(); + // Consume the initial status of the room. + let _ = status.try_recv(); + let _maintain_room = cx.spawn(|this, mut cx| async move { + while let Some(status) = status.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; - // if status == live_kit_client::ConnectionState::Disconnected { - // this.update(&mut cx, |this, cx| this.leave(cx).log_err()) - // .ok(); - // break; - // } - // } - // }); + if status == live_kit_client2::ConnectionState::Disconnected { + this.update(&mut cx, |this, cx| this.leave(cx).log_err()) + .ok(); + break; + } + } + }); - // let mut track_video_changes = room.remote_video_track_updates(); - // let _maintain_video_tracks = cx.spawn(|this, mut cx| async move { - // while let Some(track_change) = track_video_changes.next().await { - // let this = if let Some(this) = this.upgrade() { - // this - // } else { - // break; - // }; + let _maintain_video_tracks = cx.spawn_on_main({ + let room = room.clone(); + move |this, mut cx| async move { + let mut track_video_changes = room.remote_video_track_updates(); + while let Some(track_change) = track_video_changes.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; - // this.update(&mut cx, |this, cx| { - // this.remote_video_track_updated(track_change, cx).log_err() - // }) - // .ok(); - // } - // }); + this.update(&mut cx, |this, cx| { + this.remote_video_track_updated(track_change, cx).log_err() + }) + .ok(); + } + } + }); - // let mut track_audio_changes = room.remote_audio_track_updates(); - // let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move { - // while let Some(track_change) = track_audio_changes.next().await { - // let this = if let Some(this) = this.upgrade() { - // this - // } else { - // break; - // }; + let _maintain_audio_tracks = cx.spawn_on_main({ + let room = room.clone(); + |this, mut cx| async move { + let mut track_audio_changes = room.remote_audio_track_updates(); + while let Some(track_change) = track_audio_changes.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; - // this.update(&mut cx, |this, cx| { - // this.remote_audio_track_updated(track_change, cx).log_err() - // }) - // .ok(); - // } - // }); + this.update(&mut cx, |this, cx| { + this.remote_audio_track_updated(track_change, cx).log_err() + }) + .ok(); + } + } + }); - // let connect = room.connect(&connection_info.server_url, &connection_info.token); - // cx.spawn(|this, mut cx| async move { - // connect.await?; + let connect = room.connect(&connection_info.server_url, &connection_info.token); + cx.spawn(|this, mut cx| async move { + connect.await?; - // if !cx.update(|cx| Self::mute_on_join(cx))? { - // this.update(&mut cx, |this, cx| this.share_microphone(cx))? - // .await?; - // } + if !cx.update(|cx| Self::mute_on_join(cx))? { + this.update(&mut cx, |this, cx| this.share_microphone(cx))? + .await?; + } - // anyhow::Ok(()) - // }) - // .detach_and_log_err(cx); + anyhow::Ok(()) + }) + .detach_and_log_err(cx); - // Some(LiveKitRoom { - // room, - // screen_track: LocalTrack::None, - // microphone_track: LocalTrack::None, - // next_publish_id: 0, - // muted_by_user: false, - // deafened: false, - // speaking: false, - // _maintain_room, - // _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], - // }) - // } else { - // None - // }; + Some(LiveKitRoom { + room, + screen_track: LocalTrack::None, + microphone_track: LocalTrack::None, + next_publish_id: 0, + muted_by_user: false, + deafened: false, + speaking: false, + _maintain_room, + _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], + }) + } else { + None + }; - // let maintain_connection = cx.spawn({ - // let client = client.clone(); - // move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err() - // }); + let maintain_connection = cx.spawn({ + let client = client.clone(); + move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err() + }); - // Audio::play_sound(Sound::Joined, cx); + Audio::play_sound(Sound::Joined, cx); - // let (room_update_completed_tx, room_update_completed_rx) = watch::channel(); + let (room_update_completed_tx, room_update_completed_rx) = watch::channel(); - // Self { - // id, - // channel_id, - // // live_kit: live_kit_room, - // status: RoomStatus::Online, - // shared_projects: Default::default(), - // joined_projects: Default::default(), - // participant_user_ids: Default::default(), - // local_participant: Default::default(), - // remote_participants: Default::default(), - // pending_participants: Default::default(), - // pending_call_count: 0, - // client_subscriptions: vec![ - // client.add_message_handler(cx.weak_handle(), Self::handle_room_updated) - // ], - // _subscriptions: vec![ - // cx.on_release(Self::released), - // cx.on_app_quit(Self::app_will_quit), - // ], - // leave_when_empty: false, - // pending_room_update: None, - // client, - // user_store, - // follows_by_leader_id_project_id: Default::default(), - // maintain_connection: Some(maintain_connection), - // room_update_completed_tx, - // room_update_completed_rx, - // } + Self { + id, + channel_id, + live_kit: live_kit_room, + status: RoomStatus::Online, + shared_projects: Default::default(), + joined_projects: Default::default(), + participant_user_ids: Default::default(), + local_participant: Default::default(), + remote_participants: Default::default(), + pending_participants: Default::default(), + pending_call_count: 0, + client_subscriptions: vec![ + client.add_message_handler(cx.weak_model(), Self::handle_room_updated) + ], + _subscriptions: vec![ + cx.on_release(Self::released), + cx.on_app_quit(Self::app_will_quit), + ], + leave_when_empty: false, + pending_room_update: None, + client, + user_store, + follows_by_leader_id_project_id: Default::default(), + maintain_connection: Some(maintain_connection), + room_update_completed_tx, + room_update_completed_rx, + } } pub(crate) fn create( @@ -1518,7 +1523,7 @@ impl Room { } #[cfg(any(test, feature = "test-support"))] - pub fn set_display_sources(&self, sources: Vec) { + pub fn set_display_sources(&self, sources: Vec) { todo!() // self.live_kit // .as_ref() @@ -1529,7 +1534,7 @@ impl Room { } struct LiveKitRoom { - room: Arc, + room: Arc, screen_track: LocalTrack, microphone_track: LocalTrack, /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user. diff --git a/crates/live_kit_client/LiveKitBridge/Package.resolved b/crates/live_kit_client/LiveKitBridge/Package.resolved index b925bc8f0d..85ae088565 100644 --- a/crates/live_kit_client/LiveKitBridge/Package.resolved +++ b/crates/live_kit_client/LiveKitBridge/Package.resolved @@ -42,8 +42,8 @@ "repositoryURL": "https://github.com/apple/swift-protobuf.git", "state": { "branch": null, - "revision": "ce20dc083ee485524b802669890291c0d8090170", - "version": "1.22.1" + "revision": "0af9125c4eae12a4973fb66574c53a54962a9e1e", + "version": "1.21.0" } } ] diff --git a/crates/live_kit_client2/.cargo/config.toml b/crates/live_kit_client2/.cargo/config.toml new file mode 100644 index 0000000000..b33fe211bd --- /dev/null +++ b/crates/live_kit_client2/.cargo/config.toml @@ -0,0 +1,2 @@ +[live_kit_client_test] +rustflags = ["-C", "link-args=-ObjC"] diff --git a/crates/live_kit_client2/Cargo.toml b/crates/live_kit_client2/Cargo.toml new file mode 100644 index 0000000000..b5b45a8d45 --- /dev/null +++ b/crates/live_kit_client2/Cargo.toml @@ -0,0 +1,71 @@ +[package] +name = "live_kit_client2" +version = "0.1.0" +edition = "2021" +description = "Bindings to LiveKit Swift client SDK" +publish = false + +[lib] +path = "src/live_kit_client2.rs" +doctest = false + +[[example]] +name = "test_app" + +[features] +test-support = [ + "async-trait", + "collections/test-support", + "gpui2/test-support", + "live_kit_server", + "nanoid", +] + +[dependencies] +collections = { path = "../collections", optional = true } +gpui2 = { path = "../gpui2", optional = true } +live_kit_server = { path = "../live_kit_server", optional = true } +media = { path = "../media" } + +anyhow.workspace = true +async-broadcast = "0.4" +core-foundation = "0.9.3" +core-graphics = "0.22.3" +futures.workspace = true +log.workspace = true +parking_lot.workspace = true +postage.workspace = true + +async-trait = { workspace = true, optional = true } +nanoid = { version ="0.4", optional = true} + +[dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } +gpui2 = { path = "../gpui2", features = ["test-support"] } +live_kit_server = { path = "../live_kit_server" } +media = { path = "../media" } +nanoid = "0.4" + +anyhow.workspace = true +async-trait.workspace = true +block = "0.1" +bytes = "1.2" +byteorder = "1.4" +cocoa = "0.24" +core-foundation = "0.9.3" +core-graphics = "0.22.3" +foreign-types = "0.3" +futures.workspace = true +hmac = "0.12" +jwt = "0.16" +objc = "0.2" +parking_lot.workspace = true +serde.workspace = true +serde_derive.workspace = true +sha2 = "0.10" +simplelog = "0.9" + +[build-dependencies] +serde.workspace = true +serde_derive.workspace = true +serde_json.workspace = true diff --git a/crates/live_kit_client2/LiveKitBridge/Package.resolved b/crates/live_kit_client2/LiveKitBridge/Package.resolved new file mode 100644 index 0000000000..b925bc8f0d --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/Package.resolved @@ -0,0 +1,52 @@ +{ + "object": { + "pins": [ + { + "package": "LiveKit", + "repositoryURL": "https://github.com/livekit/client-sdk-swift.git", + "state": { + "branch": null, + "revision": "7331b813a5ab8a95cfb81fb2b4ed10519428b9ff", + "version": "1.0.12" + } + }, + { + "package": "Promises", + "repositoryURL": "https://github.com/google/promises.git", + "state": { + "branch": null, + "revision": "ec957ccddbcc710ccc64c9dcbd4c7006fcf8b73a", + "version": "2.2.0" + } + }, + { + "package": "WebRTC", + "repositoryURL": "https://github.com/webrtc-sdk/Specs.git", + "state": { + "branch": null, + "revision": "2f6bab30c8df0fe59ab3e58bc99097f757f85f65", + "version": "104.5112.17" + } + }, + { + "package": "swift-log", + "repositoryURL": "https://github.com/apple/swift-log.git", + "state": { + "branch": null, + "revision": "32e8d724467f8fe623624570367e3d50c5638e46", + "version": "1.5.2" + } + }, + { + "package": "SwiftProtobuf", + "repositoryURL": "https://github.com/apple/swift-protobuf.git", + "state": { + "branch": null, + "revision": "ce20dc083ee485524b802669890291c0d8090170", + "version": "1.22.1" + } + } + ] + }, + "version": 1 +} diff --git a/crates/live_kit_client2/LiveKitBridge/Package.swift b/crates/live_kit_client2/LiveKitBridge/Package.swift new file mode 100644 index 0000000000..d7b5c271b9 --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/Package.swift @@ -0,0 +1,27 @@ +// swift-tools-version: 5.5 + +import PackageDescription + +let package = Package( + name: "LiveKitBridge", + platforms: [ + .macOS(.v10_15) + ], + products: [ + // Products define the executables and libraries a package produces, and make them visible to other packages. + .library( + name: "LiveKitBridge", + type: .static, + targets: ["LiveKitBridge"]), + ], + dependencies: [ + .package(url: "https://github.com/livekit/client-sdk-swift.git", .exact("1.0.12")), + ], + targets: [ + // Targets are the basic building blocks of a package. A target can define a module or a test suite. + // Targets can depend on other targets in this package, and on products in packages this package depends on. + .target( + name: "LiveKitBridge", + dependencies: [.product(name: "LiveKit", package: "client-sdk-swift")]), + ] +) diff --git a/crates/live_kit_client2/LiveKitBridge/README.md b/crates/live_kit_client2/LiveKitBridge/README.md new file mode 100644 index 0000000000..b982c67286 --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/README.md @@ -0,0 +1,3 @@ +# LiveKitBridge + +A description of this package. diff --git a/crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift new file mode 100644 index 0000000000..5f22acf581 --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -0,0 +1,327 @@ +import Foundation +import LiveKit +import WebRTC +import ScreenCaptureKit + +class LKRoomDelegate: RoomDelegate { + var data: UnsafeRawPointer + var onDidDisconnect: @convention(c) (UnsafeRawPointer) -> Void + var onDidSubscribeToRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void + var onDidUnsubscribeFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + var onMuteChangedFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void + var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void + var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void + var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + + init( + data: UnsafeRawPointer, + onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void, + onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, + onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void, + onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void) + { + self.data = data + self.onDidDisconnect = onDidDisconnect + self.onDidSubscribeToRemoteAudioTrack = onDidSubscribeToRemoteAudioTrack + self.onDidUnsubscribeFromRemoteAudioTrack = onDidUnsubscribeFromRemoteAudioTrack + self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack + self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack + self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack + self.onActiveSpeakersChanged = onActiveSpeakersChanged + } + + func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) { + if connectionState.isDisconnected { + self.onDidDisconnect(self.data) + } + } + + func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) { + if track.kind == .video { + self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque()) + } else if track.kind == .audio { + self.onDidSubscribeToRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque(), Unmanaged.passUnretained(publication).toOpaque()) + } + } + + func room(_ room: Room, participant: Participant, didUpdate publication: TrackPublication, muted: Bool) { + if publication.kind == .audio { + self.onMuteChangedFromRemoteAudioTrack(self.data, publication.sid as CFString, muted) + } + } + + func room(_ room: Room, didUpdate speakers: [Participant]) { + guard let speaker_ids = speakers.compactMap({ $0.identity as CFString }) as CFArray? else { return } + self.onActiveSpeakersChanged(self.data, speaker_ids) + } + + func room(_ room: Room, participant: RemoteParticipant, didUnsubscribe publication: RemoteTrackPublication, track: Track) { + if track.kind == .video { + self.onDidUnsubscribeFromRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString) + } else if track.kind == .audio { + self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString) + } + } +} + +class LKVideoRenderer: NSObject, VideoRenderer { + var data: UnsafeRawPointer + var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool + var onDrop: @convention(c) (UnsafeRawPointer) -> Void + var adaptiveStreamIsEnabled: Bool = false + var adaptiveStreamSize: CGSize = .zero + weak var track: VideoTrack? + + init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) { + self.data = data + self.onFrame = onFrame + self.onDrop = onDrop + } + + deinit { + self.onDrop(self.data) + } + + func setSize(_ size: CGSize) { + } + + func renderFrame(_ frame: RTCVideoFrame?) { + let buffer = frame?.buffer as? RTCCVPixelBuffer + if let pixelBuffer = buffer?.pixelBuffer { + if !self.onFrame(self.data, pixelBuffer) { + DispatchQueue.main.async { + self.track?.remove(videoRenderer: self) + } + } + } + } +} + +@_cdecl("LKRoomDelegateCreate") +public func LKRoomDelegateCreate( + data: UnsafeRawPointer, + onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void, + onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, + onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void, + onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void +) -> UnsafeMutableRawPointer { + let delegate = LKRoomDelegate( + data: data, + onDidDisconnect: onDidDisconnect, + onDidSubscribeToRemoteAudioTrack: onDidSubscribeToRemoteAudioTrack, + onDidUnsubscribeFromRemoteAudioTrack: onDidUnsubscribeFromRemoteAudioTrack, + onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack, + onActiveSpeakersChanged: onActiveSpeakerChanged, + onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack, + onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack + ) + return Unmanaged.passRetained(delegate).toOpaque() +} + +@_cdecl("LKRoomCreate") +public func LKRoomCreate(delegate: UnsafeRawPointer) -> UnsafeMutableRawPointer { + let delegate = Unmanaged.fromOpaque(delegate).takeUnretainedValue() + return Unmanaged.passRetained(Room(delegate: delegate)).toOpaque() +} + +@_cdecl("LKRoomConnect") +public func LKRoomConnect(room: UnsafeRawPointer, url: CFString, token: CFString, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + room.connect(url as String, token as String).then { _ in + callback(callback_data, UnsafeRawPointer(nil) as! CFString?) + }.catch { error in + callback(callback_data, error.localizedDescription as CFString) + } +} + +@_cdecl("LKRoomDisconnect") +public func LKRoomDisconnect(room: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + room.disconnect() +} + +@_cdecl("LKRoomPublishVideoTrack") +public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + room.localParticipant?.publishVideoTrack(track: track).then { publication in + callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil) + }.catch { error in + callback(callback_data, nil, error.localizedDescription as CFString) + } +} + +@_cdecl("LKRoomPublishAudioTrack") +public func LKRoomPublishAudioTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + room.localParticipant?.publishAudioTrack(track: track).then { publication in + callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil) + }.catch { error in + callback(callback_data, nil, error.localizedDescription as CFString) + } +} + + +@_cdecl("LKRoomUnpublishTrack") +public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + let _ = room.localParticipant?.unpublish(publication: publication) +} + +@_cdecl("LKRoomAudioTracksForRemoteParticipant") +public func LKRoomAudioTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + return participant.audioTracks.compactMap { $0.track as? RemoteAudioTrack } as CFArray? + } + } + + return nil; +} + +@_cdecl("LKRoomAudioTrackPublicationsForRemoteParticipant") +public func LKRoomAudioTrackPublicationsForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + return participant.audioTracks.compactMap { $0 as? RemoteTrackPublication } as CFArray? + } + } + + return nil; +} + +@_cdecl("LKRoomVideoTracksForRemoteParticipant") +public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + return participant.videoTracks.compactMap { $0.track as? RemoteVideoTrack } as CFArray? + } + } + + return nil; +} + +@_cdecl("LKLocalAudioTrackCreateTrack") +public func LKLocalAudioTrackCreateTrack() -> UnsafeMutableRawPointer { + let track = LocalAudioTrack.createTrack(options: AudioCaptureOptions( + echoCancellation: true, + noiseSuppression: true + )) + + return Unmanaged.passRetained(track).toOpaque() +} + + +@_cdecl("LKCreateScreenShareTrackForDisplay") +public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { + let display = Unmanaged.fromOpaque(display).takeUnretainedValue() + let track = LocalVideoTrack.createMacOSScreenShareTrack(source: display, preferredMethod: .legacy) + return Unmanaged.passRetained(track).toOpaque() +} + +@_cdecl("LKVideoRendererCreate") +public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer { + Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque() +} + +@_cdecl("LKVideoTrackAddRenderer") +public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() as! VideoTrack + let renderer = Unmanaged.fromOpaque(renderer).takeRetainedValue() + renderer.track = track + track.add(videoRenderer: renderer) +} + +@_cdecl("LKRemoteVideoTrackGetSid") +public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + return track.sid! as CFString +} + +@_cdecl("LKRemoteAudioTrackGetSid") +public func LKRemoteAudioTrackGetSid(track: UnsafeRawPointer) -> CFString { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + return track.sid! as CFString +} + +@_cdecl("LKDisplaySources") +public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) { + MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in + callback(data, displaySources as CFArray, nil) + }.catch { error in + callback(data, nil, error.localizedDescription as CFString) + } +} + +@_cdecl("LKLocalTrackPublicationSetMute") +public func LKLocalTrackPublicationSetMute( + publication: UnsafeRawPointer, + muted: Bool, + on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, + callback_data: UnsafeRawPointer +) { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + if muted { + publication.mute().then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } + } else { + publication.unmute().then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } + } +} + +@_cdecl("LKRemoteTrackPublicationSetEnabled") +public func LKRemoteTrackPublicationSetEnabled( + publication: UnsafeRawPointer, + enabled: Bool, + on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, + callback_data: UnsafeRawPointer +) { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + publication.set(enabled: enabled).then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } +} + +@_cdecl("LKRemoteTrackPublicationIsMuted") +public func LKRemoteTrackPublicationIsMuted( + publication: UnsafeRawPointer +) -> Bool { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + return publication.muted +} + +@_cdecl("LKRemoteTrackPublicationGetSid") +public func LKRemoteTrackPublicationGetSid( + publication: UnsafeRawPointer +) -> CFString { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + return publication.sid as CFString +} diff --git a/crates/live_kit_client2/build.rs b/crates/live_kit_client2/build.rs new file mode 100644 index 0000000000..1445704b46 --- /dev/null +++ b/crates/live_kit_client2/build.rs @@ -0,0 +1,172 @@ +use serde::Deserialize; +use std::{ + env, + path::{Path, PathBuf}, + process::Command, +}; + +const SWIFT_PACKAGE_NAME: &str = "LiveKitBridge"; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SwiftTargetInfo { + pub triple: String, + pub unversioned_triple: String, + pub module_triple: String, + pub swift_runtime_compatibility_version: String, + #[serde(rename = "librariesRequireRPath")] + pub libraries_require_rpath: bool, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SwiftPaths { + pub runtime_library_paths: Vec, + pub runtime_library_import_paths: Vec, + pub runtime_resource_path: String, +} + +#[derive(Debug, Deserialize)] +pub struct SwiftTarget { + pub target: SwiftTargetInfo, + pub paths: SwiftPaths, +} + +const MACOS_TARGET_VERSION: &str = "10.15.7"; + +fn main() { + if cfg!(not(any(test, feature = "test-support"))) { + let swift_target = get_swift_target(); + + build_bridge(&swift_target); + link_swift_stdlib(&swift_target); + link_webrtc_framework(&swift_target); + + // Register exported Objective-C selectors, protocols, etc when building example binaries. + println!("cargo:rustc-link-arg=-Wl,-ObjC"); + } +} + +fn build_bridge(swift_target: &SwiftTarget) { + println!("cargo:rerun-if-env-changed=MACOSX_DEPLOYMENT_TARGET"); + println!("cargo:rerun-if-changed={}/Sources", SWIFT_PACKAGE_NAME); + println!( + "cargo:rerun-if-changed={}/Package.swift", + SWIFT_PACKAGE_NAME + ); + println!( + "cargo:rerun-if-changed={}/Package.resolved", + SWIFT_PACKAGE_NAME + ); + + let swift_package_root = swift_package_root(); + let swift_target_folder = swift_target_folder(); + if !Command::new("swift") + .arg("build") + .arg("--disable-automatic-resolution") + .args(["--configuration", &env::var("PROFILE").unwrap()]) + .args(["--triple", &swift_target.target.triple]) + .args(["--build-path".into(), swift_target_folder]) + .current_dir(&swift_package_root) + .status() + .unwrap() + .success() + { + panic!( + "Failed to compile swift package in {}", + swift_package_root.display() + ); + } + + println!( + "cargo:rustc-link-search=native={}", + swift_target.out_dir_path().display() + ); + println!("cargo:rustc-link-lib=static={}", SWIFT_PACKAGE_NAME); +} + +fn link_swift_stdlib(swift_target: &SwiftTarget) { + for path in &swift_target.paths.runtime_library_paths { + println!("cargo:rustc-link-search=native={}", path); + } +} + +fn link_webrtc_framework(swift_target: &SwiftTarget) { + let swift_out_dir_path = swift_target.out_dir_path(); + println!("cargo:rustc-link-lib=framework=WebRTC"); + println!( + "cargo:rustc-link-search=framework={}", + swift_out_dir_path.display() + ); + // Find WebRTC.framework as a sibling of the executable when running tests. + println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path"); + // Find WebRTC.framework in parent directory of the executable when running examples. + println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/.."); + + let source_path = swift_out_dir_path.join("WebRTC.framework"); + let deps_dir_path = + PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../deps/WebRTC.framework"); + let target_dir_path = + PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../WebRTC.framework"); + copy_dir(&source_path, &deps_dir_path); + copy_dir(&source_path, &target_dir_path); +} + +fn get_swift_target() -> SwiftTarget { + let mut arch = env::var("CARGO_CFG_TARGET_ARCH").unwrap(); + if arch == "aarch64" { + arch = "arm64".into(); + } + let target = format!("{}-apple-macosx{}", arch, MACOS_TARGET_VERSION); + + let swift_target_info_str = Command::new("swift") + .args(["-target", &target, "-print-target-info"]) + .output() + .unwrap() + .stdout; + + serde_json::from_slice(&swift_target_info_str).unwrap() +} + +fn swift_package_root() -> PathBuf { + env::current_dir().unwrap().join(SWIFT_PACKAGE_NAME) +} + +fn swift_target_folder() -> PathBuf { + env::current_dir() + .unwrap() + .join(format!("../../target/{SWIFT_PACKAGE_NAME}")) +} + +fn copy_dir(source: &Path, destination: &Path) { + assert!( + Command::new("rm") + .arg("-rf") + .arg(destination) + .status() + .unwrap() + .success(), + "could not remove {:?} before copying", + destination + ); + + assert!( + Command::new("cp") + .arg("-R") + .args([source, destination]) + .status() + .unwrap() + .success(), + "could not copy {:?} to {:?}", + source, + destination + ); +} + +impl SwiftTarget { + fn out_dir_path(&self) -> PathBuf { + swift_target_folder() + .join(&self.target.unversioned_triple) + .join(env::var("PROFILE").unwrap()) + } +} diff --git a/crates/live_kit_client2/examples/test_app.rs b/crates/live_kit_client2/examples/test_app.rs new file mode 100644 index 0000000000..2147f6ab8c --- /dev/null +++ b/crates/live_kit_client2/examples/test_app.rs @@ -0,0 +1,175 @@ +// use std::time::Duration; +// todo!() + +// use futures::StreamExt; +// use gpui2::{actions, keymap_matcher::Binding, Menu, MenuItem}; +// use live_kit_client2::{ +// LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, +// }; +// use live_kit_server::token::{self, VideoGrant}; +// use log::LevelFilter; +// use simplelog::SimpleLogger; + +// actions!(capture, [Quit]); + +fn main() { + // SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger"); + + // gpui2::App::new(()).unwrap().run(|cx| { + // #[cfg(any(test, feature = "test-support"))] + // println!("USING TEST LIVEKIT"); + + // #[cfg(not(any(test, feature = "test-support")))] + // println!("USING REAL LIVEKIT"); + + // cx.platform().activate(true); + // cx.add_global_action(quit); + + // cx.add_bindings([Binding::new("cmd-q", Quit, None)]); + // cx.set_menus(vec![Menu { + // name: "Zed", + // items: vec![MenuItem::Action { + // name: "Quit", + // action: Box::new(Quit), + // os_action: None, + // }], + // }]); + + // let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into()); + // let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into()); + // let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into()); + + // cx.spawn(|cx| async move { + // let user_a_token = token::create( + // &live_kit_key, + // &live_kit_secret, + // Some("test-participant-1"), + // VideoGrant::to_join("test-room"), + // ) + // .unwrap(); + // let room_a = Room::new(); + // room_a.connect(&live_kit_url, &user_a_token).await.unwrap(); + + // let user2_token = token::create( + // &live_kit_key, + // &live_kit_secret, + // Some("test-participant-2"), + // VideoGrant::to_join("test-room"), + // ) + // .unwrap(); + // let room_b = Room::new(); + // room_b.connect(&live_kit_url, &user2_token).await.unwrap(); + + // let mut audio_track_updates = room_b.remote_audio_track_updates(); + // let audio_track = LocalAudioTrack::create(); + // let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); + + // if let RemoteAudioTrackUpdate::Subscribed(track, _) = + // audio_track_updates.next().await.unwrap() + // { + // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + // assert_eq!(remote_tracks.len(), 1); + // assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); + // assert_eq!(track.publisher_id(), "test-participant-1"); + // } else { + // panic!("unexpected message"); + // } + + // audio_track_publication.set_mute(true).await.unwrap(); + + // println!("waiting for mute changed!"); + // if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = + // audio_track_updates.next().await.unwrap() + // { + // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + // assert_eq!(remote_tracks[0].sid(), track_id); + // assert_eq!(muted, true); + // } else { + // panic!("unexpected message"); + // } + + // audio_track_publication.set_mute(false).await.unwrap(); + + // if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = + // audio_track_updates.next().await.unwrap() + // { + // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + // assert_eq!(remote_tracks[0].sid(), track_id); + // assert_eq!(muted, false); + // } else { + // panic!("unexpected message"); + // } + + // println!("Pausing for 5 seconds to test audio, make some noise!"); + // let timer = cx.background().timer(Duration::from_secs(5)); + // timer.await; + // let remote_audio_track = room_b + // .remote_audio_tracks("test-participant-1") + // .pop() + // .unwrap(); + // room_a.unpublish_track(audio_track_publication); + + // // Clear out any active speakers changed messages + // let mut next = audio_track_updates.next().await.unwrap(); + // while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { + // println!("Speakers changed: {:?}", speakers); + // next = audio_track_updates.next().await.unwrap(); + // } + + // if let RemoteAudioTrackUpdate::Unsubscribed { + // publisher_id, + // track_id, + // } = next + // { + // assert_eq!(publisher_id, "test-participant-1"); + // assert_eq!(remote_audio_track.sid(), track_id); + // assert_eq!(room_b.remote_audio_tracks("test-participant-1").len(), 0); + // } else { + // panic!("unexpected message"); + // } + + // let mut video_track_updates = room_b.remote_video_track_updates(); + // let displays = room_a.display_sources().await.unwrap(); + // let display = displays.into_iter().next().unwrap(); + + // let local_video_track = LocalVideoTrack::screen_share_for_display(&display); + // let local_video_track_publication = + // room_a.publish_video_track(local_video_track).await.unwrap(); + + // if let RemoteVideoTrackUpdate::Subscribed(track) = + // video_track_updates.next().await.unwrap() + // { + // let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); + // assert_eq!(remote_video_tracks.len(), 1); + // assert_eq!(remote_video_tracks[0].publisher_id(), "test-participant-1"); + // assert_eq!(track.publisher_id(), "test-participant-1"); + // } else { + // panic!("unexpected message"); + // } + + // let remote_video_track = room_b + // .remote_video_tracks("test-participant-1") + // .pop() + // .unwrap(); + // room_a.unpublish_track(local_video_track_publication); + // if let RemoteVideoTrackUpdate::Unsubscribed { + // publisher_id, + // track_id, + // } = video_track_updates.next().await.unwrap() + // { + // assert_eq!(publisher_id, "test-participant-1"); + // assert_eq!(remote_video_track.sid(), track_id); + // assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0); + // } else { + // panic!("unexpected message"); + // } + + // cx.platform().quit(); + // }) + // .detach(); + // }); +} + +// fn quit(_: &Quit, cx: &mut gpui2::AppContext) { +// cx.platform().quit(); +// } diff --git a/crates/live_kit_client2/src/live_kit_client2.rs b/crates/live_kit_client2/src/live_kit_client2.rs new file mode 100644 index 0000000000..35682382e9 --- /dev/null +++ b/crates/live_kit_client2/src/live_kit_client2.rs @@ -0,0 +1,11 @@ +// #[cfg(not(any(test, feature = "test-support")))] +pub mod prod; + +// #[cfg(not(any(test, feature = "test-support")))] +pub use prod::*; + +// #[cfg(any(test, feature = "test-support"))] +// pub mod test; + +// #[cfg(any(test, feature = "test-support"))] +// pub use test::*; diff --git a/crates/live_kit_client2/src/prod.rs b/crates/live_kit_client2/src/prod.rs new file mode 100644 index 0000000000..65ed8b754f --- /dev/null +++ b/crates/live_kit_client2/src/prod.rs @@ -0,0 +1,943 @@ +use anyhow::{anyhow, Context, Result}; +use core_foundation::{ + array::{CFArray, CFArrayRef}, + base::{CFRelease, CFRetain, TCFType}, + string::{CFString, CFStringRef}, +}; +use futures::{ + channel::{mpsc, oneshot}, + Future, +}; +pub use media::core_video::CVImageBuffer; +use media::core_video::CVImageBufferRef; +use parking_lot::Mutex; +use postage::watch; +use std::{ + ffi::c_void, + sync::{Arc, Weak}, +}; + +// SAFETY: Most live kit types are threadsafe: +// https://github.com/livekit/client-sdk-swift#thread-safety +macro_rules! pointer_type { + ($pointer_name:ident) => { + #[repr(transparent)] + #[derive(Copy, Clone, Debug)] + pub struct $pointer_name(pub *const std::ffi::c_void); + unsafe impl Send for $pointer_name {} + }; +} + +mod swift { + pointer_type!(Room); + pointer_type!(LocalAudioTrack); + pointer_type!(RemoteAudioTrack); + pointer_type!(LocalVideoTrack); + pointer_type!(RemoteVideoTrack); + pointer_type!(LocalTrackPublication); + pointer_type!(RemoteTrackPublication); + pointer_type!(MacOSDisplay); + pointer_type!(RoomDelegate); +} + +extern "C" { + fn LKRoomDelegateCreate( + callback_data: *mut c_void, + on_did_disconnect: extern "C" fn(callback_data: *mut c_void), + on_did_subscribe_to_remote_audio_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + remote_track: swift::RemoteAudioTrack, + remote_publication: swift::RemoteTrackPublication, + ), + on_did_unsubscribe_from_remote_audio_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ), + on_mute_changed_from_remote_audio_track: extern "C" fn( + callback_data: *mut c_void, + track_id: CFStringRef, + muted: bool, + ), + on_active_speakers_changed: extern "C" fn( + callback_data: *mut c_void, + participants: CFArrayRef, + ), + on_did_subscribe_to_remote_video_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + remote_track: swift::RemoteVideoTrack, + ), + on_did_unsubscribe_from_remote_video_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ), + ) -> swift::RoomDelegate; + + fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room; + fn LKRoomConnect( + room: swift::Room, + url: CFStringRef, + token: CFStringRef, + callback: extern "C" fn(*mut c_void, CFStringRef), + callback_data: *mut c_void, + ); + fn LKRoomDisconnect(room: swift::Room); + fn LKRoomPublishVideoTrack( + room: swift::Room, + track: swift::LocalVideoTrack, + callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), + callback_data: *mut c_void, + ); + fn LKRoomPublishAudioTrack( + room: swift::Room, + track: swift::LocalAudioTrack, + callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), + callback_data: *mut c_void, + ); + fn LKRoomUnpublishTrack(room: swift::Room, publication: swift::LocalTrackPublication); + + fn LKRoomAudioTracksForRemoteParticipant( + room: swift::Room, + participant_id: CFStringRef, + ) -> CFArrayRef; + + fn LKRoomAudioTrackPublicationsForRemoteParticipant( + room: swift::Room, + participant_id: CFStringRef, + ) -> CFArrayRef; + + fn LKRoomVideoTracksForRemoteParticipant( + room: swift::Room, + participant_id: CFStringRef, + ) -> CFArrayRef; + + fn LKVideoRendererCreate( + callback_data: *mut c_void, + on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool, + on_drop: extern "C" fn(callback_data: *mut c_void), + ) -> *const c_void; + + fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef; + fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void); + fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef; + + fn LKDisplaySources( + callback_data: *mut c_void, + callback: extern "C" fn( + callback_data: *mut c_void, + sources: CFArrayRef, + error: CFStringRef, + ), + ); + fn LKCreateScreenShareTrackForDisplay(display: swift::MacOSDisplay) -> swift::LocalVideoTrack; + fn LKLocalAudioTrackCreateTrack() -> swift::LocalAudioTrack; + + fn LKLocalTrackPublicationSetMute( + publication: swift::LocalTrackPublication, + muted: bool, + on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), + callback_data: *mut c_void, + ); + + fn LKRemoteTrackPublicationSetEnabled( + publication: swift::RemoteTrackPublication, + enabled: bool, + on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), + callback_data: *mut c_void, + ); + + fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool; + fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; +} + +pub type Sid = String; + +#[derive(Clone, Eq, PartialEq)] +pub enum ConnectionState { + Disconnected, + Connected { url: String, token: String }, +} + +pub struct Room { + native_room: Mutex, + connection: Mutex<( + watch::Sender, + watch::Receiver, + )>, + remote_audio_track_subscribers: Mutex>>, + remote_video_track_subscribers: Mutex>>, + _delegate: Mutex, +} + +trait AssertSendSync: Send {} +impl AssertSendSync for Room {} + +impl Room { + pub fn new() -> Arc { + Arc::new_cyclic(|weak_room| { + let delegate = RoomDelegate::new(weak_room.clone()); + Self { + native_room: Mutex::new(unsafe { LKRoomCreate(delegate.native_delegate) }), + connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)), + remote_audio_track_subscribers: Default::default(), + remote_video_track_subscribers: Default::default(), + _delegate: Mutex::new(delegate), + } + }) + } + + pub fn status(&self) -> watch::Receiver { + self.connection.lock().1.clone() + } + + pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { + let url = CFString::new(url); + let token = CFString::new(token); + let (did_connect, tx, rx) = Self::build_done_callback(); + unsafe { + LKRoomConnect( + *self.native_room.lock(), + url.as_concrete_TypeRef(), + token.as_concrete_TypeRef(), + did_connect, + tx, + ) + } + + let this = self.clone(); + let url = url.to_string(); + let token = token.to_string(); + async move { + rx.await.unwrap().context("error connecting to room")?; + *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token }; + Ok(()) + } + } + + fn did_disconnect(&self) { + *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected; + } + + pub fn display_sources(self: &Arc) -> impl Future>> { + extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) { + unsafe { + let tx = Box::from_raw(tx as *mut oneshot::Sender>>); + + if sources.is_null() { + let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error)))); + } else { + let sources = CFArray::wrap_under_get_rule(sources) + .into_iter() + .map(|source| MacOSDisplay::new(swift::MacOSDisplay(*source))) + .collect(); + + let _ = tx.send(Ok(sources)); + } + } + } + + let (tx, rx) = oneshot::channel(); + + unsafe { + LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback); + } + + async move { rx.await.unwrap() } + } + + pub fn publish_video_track( + self: &Arc, + track: LocalVideoTrack, + ) -> impl Future> { + let (tx, rx) = oneshot::channel::>(); + extern "C" fn callback( + tx: *mut c_void, + publication: swift::LocalTrackPublication, + error: CFStringRef, + ) { + let tx = + unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; + if error.is_null() { + let _ = tx.send(Ok(LocalTrackPublication::new(publication))); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + let _ = tx.send(Err(anyhow!(error))); + } + } + unsafe { + LKRoomPublishVideoTrack( + *self.native_room.lock(), + track.0, + callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ); + } + async { rx.await.unwrap().context("error publishing video track") } + } + + pub fn publish_audio_track( + self: &Arc, + track: LocalAudioTrack, + ) -> impl Future> { + let (tx, rx) = oneshot::channel::>(); + extern "C" fn callback( + tx: *mut c_void, + publication: swift::LocalTrackPublication, + error: CFStringRef, + ) { + let tx = + unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; + if error.is_null() { + let _ = tx.send(Ok(LocalTrackPublication::new(publication))); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + let _ = tx.send(Err(anyhow!(error))); + } + } + unsafe { + LKRoomPublishAudioTrack( + *self.native_room.lock(), + track.0, + callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ); + } + async { rx.await.unwrap().context("error publishing audio track") } + } + + pub fn unpublish_track(&self, publication: LocalTrackPublication) { + unsafe { + LKRoomUnpublishTrack(*self.native_room.lock(), publication.0); + } + } + + pub fn remote_video_tracks(&self, participant_id: &str) -> Vec> { + unsafe { + let tracks = LKRoomVideoTracksForRemoteParticipant( + *self.native_room.lock(), + CFString::new(participant_id).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + let tracks = CFArray::wrap_under_get_rule(tracks); + tracks + .into_iter() + .map(|native_track| { + let native_track = swift::RemoteVideoTrack(*native_track); + let id = + CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track)) + .to_string(); + Arc::new(RemoteVideoTrack::new( + native_track, + id, + participant_id.into(), + )) + }) + .collect() + } + } + } + + pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec> { + unsafe { + let tracks = LKRoomAudioTracksForRemoteParticipant( + *self.native_room.lock(), + CFString::new(participant_id).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + let tracks = CFArray::wrap_under_get_rule(tracks); + tracks + .into_iter() + .map(|native_track| { + let native_track = swift::RemoteAudioTrack(*native_track); + let id = + CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track)) + .to_string(); + Arc::new(RemoteAudioTrack::new( + native_track, + id, + participant_id.into(), + )) + }) + .collect() + } + } + } + + pub fn remote_audio_track_publications( + &self, + participant_id: &str, + ) -> Vec> { + unsafe { + let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant( + *self.native_room.lock(), + CFString::new(participant_id).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + let tracks = CFArray::wrap_under_get_rule(tracks); + tracks + .into_iter() + .map(|native_track_publication| { + let native_track_publication = + swift::RemoteTrackPublication(*native_track_publication); + Arc::new(RemoteTrackPublication::new(native_track_publication)) + }) + .collect() + } + } + } + + pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + self.remote_audio_track_subscribers.lock().push(tx); + rx + } + + pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + self.remote_video_track_subscribers.lock().push(tx); + rx + } + + fn did_subscribe_to_remote_audio_track( + &self, + track: RemoteAudioTrack, + publication: RemoteTrackPublication, + ) { + let track = Arc::new(track); + let publication = Arc::new(publication); + self.remote_audio_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed( + track.clone(), + publication.clone(), + )) + .is_ok() + }); + } + + fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) { + self.remote_audio_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed { + publisher_id: publisher_id.clone(), + track_id: track_id.clone(), + }) + .is_ok() + }); + } + + fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) { + self.remote_audio_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged { + track_id: track_id.clone(), + muted, + }) + .is_ok() + }); + } + + // A vec of publisher IDs + fn active_speakers_changed(&self, speakers: Vec) { + self.remote_audio_track_subscribers + .lock() + .retain(move |tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged { + speakers: speakers.clone(), + }) + .is_ok() + }); + } + + fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) { + let track = Arc::new(track); + self.remote_video_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .is_ok() + }); + } + + fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) { + self.remote_video_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed { + publisher_id: publisher_id.clone(), + track_id: track_id.clone(), + }) + .is_ok() + }); + } + + fn build_done_callback() -> ( + extern "C" fn(*mut c_void, CFStringRef), + *mut c_void, + oneshot::Receiver>, + ) { + let (tx, rx) = oneshot::channel(); + extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) { + let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; + if error.is_null() { + let _ = tx.send(Ok(())); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + let _ = tx.send(Err(anyhow!(error))); + } + } + ( + done_callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + rx, + ) + } +} + +impl Drop for Room { + fn drop(&mut self) { + unsafe { + let native_room = &*self.native_room.lock(); + LKRoomDisconnect(*native_room); + CFRelease(native_room.0); + } + } +} + +struct RoomDelegate { + native_delegate: swift::RoomDelegate, + _weak_room: Weak, +} + +impl RoomDelegate { + fn new(weak_room: Weak) -> Self { + let native_delegate = unsafe { + LKRoomDelegateCreate( + weak_room.as_ptr() as *mut c_void, + Self::on_did_disconnect, + Self::on_did_subscribe_to_remote_audio_track, + Self::on_did_unsubscribe_from_remote_audio_track, + Self::on_mute_change_from_remote_audio_track, + Self::on_active_speakers_changed, + Self::on_did_subscribe_to_remote_video_track, + Self::on_did_unsubscribe_from_remote_video_track, + ) + }; + Self { + native_delegate, + _weak_room: weak_room, + } + } + + extern "C" fn on_did_disconnect(room: *mut c_void) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + if let Some(room) = room.upgrade() { + room.did_disconnect(); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_subscribe_to_remote_audio_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + track: swift::RemoteAudioTrack, + publication: swift::RemoteTrackPublication, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + let track = RemoteAudioTrack::new(track, track_id, publisher_id); + let publication = RemoteTrackPublication::new(publication); + if let Some(room) = room.upgrade() { + room.did_subscribe_to_remote_audio_track(track, publication); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_unsubscribe_from_remote_audio_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + if let Some(room) = room.upgrade() { + room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_mute_change_from_remote_audio_track( + room: *mut c_void, + track_id: CFStringRef, + muted: bool, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + if let Some(room) = room.upgrade() { + room.mute_changed_from_remote_audio_track(track_id, muted); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) { + if participants.is_null() { + return; + } + + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let speakers = unsafe { + CFArray::wrap_under_get_rule(participants) + .into_iter() + .map( + |speaker: core_foundation::base::ItemRef<'_, *const c_void>| { + CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string() + }, + ) + .collect() + }; + + if let Some(room) = room.upgrade() { + room.active_speakers_changed(speakers); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_subscribe_to_remote_video_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + track: swift::RemoteVideoTrack, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + let track = RemoteVideoTrack::new(track, track_id, publisher_id); + if let Some(room) = room.upgrade() { + room.did_subscribe_to_remote_video_track(track); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_unsubscribe_from_remote_video_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + if let Some(room) = room.upgrade() { + room.did_unsubscribe_from_remote_video_track(publisher_id, track_id); + } + let _ = Weak::into_raw(room); + } +} + +impl Drop for RoomDelegate { + fn drop(&mut self) { + unsafe { + CFRelease(self.native_delegate.0); + } + } +} + +pub struct LocalAudioTrack(swift::LocalAudioTrack); + +impl LocalAudioTrack { + pub fn create() -> Self { + Self(unsafe { LKLocalAudioTrackCreateTrack() }) + } +} + +impl Drop for LocalAudioTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +pub struct LocalVideoTrack(swift::LocalVideoTrack); + +impl LocalVideoTrack { + pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { + Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) }) + } +} + +impl Drop for LocalVideoTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +pub struct LocalTrackPublication(swift::LocalTrackPublication); + +impl LocalTrackPublication { + pub fn new(native_track_publication: swift::LocalTrackPublication) -> Self { + unsafe { + CFRetain(native_track_publication.0); + } + Self(native_track_publication) + } + + pub fn set_mute(&self, muted: bool) -> impl Future> { + let (tx, rx) = futures::channel::oneshot::channel(); + + extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { + let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; + if error.is_null() { + tx.send(Ok(())).ok(); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + tx.send(Err(anyhow!(error))).ok(); + } + } + + unsafe { + LKLocalTrackPublicationSetMute( + self.0, + muted, + complete_callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ) + } + + async move { rx.await.unwrap() } + } +} + +impl Drop for LocalTrackPublication { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +pub struct RemoteTrackPublication { + native_publication: Mutex, +} + +impl RemoteTrackPublication { + pub fn new(native_track_publication: swift::RemoteTrackPublication) -> Self { + unsafe { + CFRetain(native_track_publication.0); + } + Self { + native_publication: Mutex::new(native_track_publication), + } + } + + pub fn sid(&self) -> String { + unsafe { + CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid( + *self.native_publication.lock(), + )) + .to_string() + } + } + + pub fn is_muted(&self) -> bool { + unsafe { LKRemoteTrackPublicationIsMuted(*self.native_publication.lock()) } + } + + pub fn set_enabled(&self, enabled: bool) -> impl Future> { + let (tx, rx) = futures::channel::oneshot::channel(); + + extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { + let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; + if error.is_null() { + tx.send(Ok(())).ok(); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + tx.send(Err(anyhow!(error))).ok(); + } + } + + unsafe { + LKRemoteTrackPublicationSetEnabled( + *self.native_publication.lock(), + enabled, + complete_callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ) + } + + async move { rx.await.unwrap() } + } +} + +impl Drop for RemoteTrackPublication { + fn drop(&mut self) { + unsafe { CFRelease((*self.native_publication.lock()).0) } + } +} + +#[derive(Debug)] +pub struct RemoteAudioTrack { + native_track: Mutex, + sid: Sid, + publisher_id: String, +} + +impl RemoteAudioTrack { + fn new(native_track: swift::RemoteAudioTrack, sid: Sid, publisher_id: String) -> Self { + unsafe { + CFRetain(native_track.0); + } + Self { + native_track: Mutex::new(native_track), + sid, + publisher_id, + } + } + + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn enable(&self) -> impl Future> { + async { Ok(()) } + } + + pub fn disable(&self) -> impl Future> { + async { Ok(()) } + } +} + +impl Drop for RemoteAudioTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.native_track.lock().0) } + } +} + +#[derive(Debug)] +pub struct RemoteVideoTrack { + native_track: Mutex, + sid: Sid, + publisher_id: String, +} + +impl RemoteVideoTrack { + fn new(native_track: swift::RemoteVideoTrack, sid: Sid, publisher_id: String) -> Self { + unsafe { + CFRetain(native_track.0); + } + Self { + native_track: Mutex::new(native_track), + sid, + publisher_id, + } + } + + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn frames(&self) -> async_broadcast::Receiver { + extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool { + unsafe { + let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender); + let buffer = CVImageBuffer::wrap_under_get_rule(frame); + let result = tx.try_broadcast(Frame(buffer)); + let _ = Box::into_raw(tx); + match result { + Ok(_) => true, + Err(async_broadcast::TrySendError::Closed(_)) + | Err(async_broadcast::TrySendError::Inactive(_)) => { + log::warn!("no active receiver for frame"); + false + } + Err(async_broadcast::TrySendError::Full(_)) => { + log::warn!("skipping frame as receiver is not keeping up"); + true + } + } + } + } + + extern "C" fn on_drop(callback_data: *mut c_void) { + unsafe { + let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender); + } + } + + let (tx, rx) = async_broadcast::broadcast(64); + unsafe { + let renderer = LKVideoRendererCreate( + Box::into_raw(Box::new(tx)) as *mut c_void, + on_frame, + on_drop, + ); + LKVideoTrackAddRenderer(*self.native_track.lock(), renderer); + rx + } + } +} + +impl Drop for RemoteVideoTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.native_track.lock().0) } + } +} + +pub enum RemoteVideoTrackUpdate { + Subscribed(Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +pub enum RemoteAudioTrackUpdate { + ActiveSpeakersChanged { speakers: Vec }, + MuteChanged { track_id: Sid, muted: bool }, + Subscribed(Arc, Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +pub struct MacOSDisplay(swift::MacOSDisplay); + +impl MacOSDisplay { + fn new(ptr: swift::MacOSDisplay) -> Self { + unsafe { + CFRetain(ptr.0); + } + Self(ptr) + } +} + +impl Drop for MacOSDisplay { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +#[derive(Clone)] +pub struct Frame(CVImageBuffer); + +impl Frame { + pub fn width(&self) -> usize { + self.0.width() + } + + pub fn height(&self) -> usize { + self.0.height() + } + + pub fn image(&self) -> CVImageBuffer { + self.0.clone() + } +} diff --git a/crates/live_kit_client2/src/test.rs b/crates/live_kit_client2/src/test.rs new file mode 100644 index 0000000000..7185c11fa8 --- /dev/null +++ b/crates/live_kit_client2/src/test.rs @@ -0,0 +1,647 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use collections::{BTreeMap, HashMap}; +use futures::Stream; +use gpui2::Executor; +use live_kit_server::token; +use media::core_video::CVImageBuffer; +use parking_lot::Mutex; +use postage::watch; +use std::{future::Future, mem, sync::Arc}; + +static SERVERS: Mutex>> = Mutex::new(BTreeMap::new()); + +pub struct TestServer { + pub url: String, + pub api_key: String, + pub secret_key: String, + rooms: Mutex>, + executor: Arc, +} + +impl TestServer { + pub fn create( + url: String, + api_key: String, + secret_key: String, + executor: Arc, + ) -> Result> { + let mut servers = SERVERS.lock(); + if servers.contains_key(&url) { + Err(anyhow!("a server with url {:?} already exists", url)) + } else { + let server = Arc::new(TestServer { + url: url.clone(), + api_key, + secret_key, + rooms: Default::default(), + executor, + }); + servers.insert(url, server.clone()); + Ok(server) + } + } + + fn get(url: &str) -> Result> { + Ok(SERVERS + .lock() + .get(url) + .ok_or_else(|| anyhow!("no server found for url"))? + .clone()) + } + + pub fn teardown(&self) -> Result<()> { + SERVERS + .lock() + .remove(&self.url) + .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?; + Ok(()) + } + + pub fn create_api_client(&self) -> TestApiClient { + TestApiClient { + url: self.url.clone(), + } + } + + pub async fn create_room(&self, room: String) -> Result<()> { + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + if server_rooms.contains_key(&room) { + Err(anyhow!("room {:?} already exists", room)) + } else { + server_rooms.insert(room, Default::default()); + Ok(()) + } + } + + async fn delete_room(&self, room: String) -> Result<()> { + // TODO: clear state associated with all `Room`s. + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + server_rooms + .remove(&room) + .ok_or_else(|| anyhow!("room {:?} does not exist", room))?; + Ok(()) + } + + async fn join_room(&self, token: String, client_room: Arc) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + let mut server_rooms = self.rooms.lock(); + let room = (*server_rooms).entry(room_name.to_string()).or_default(); + + if room.client_rooms.contains_key(&identity) { + Err(anyhow!( + "{:?} attempted to join room {:?} twice", + identity, + room_name + )) + } else { + for track in &room.video_tracks { + client_room + .0 + .lock() + .video_track_updates + .0 + .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .unwrap(); + } + room.client_rooms.insert(identity, client_room); + Ok(()) + } + } + + async fn leave_room(&self, token: String) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + room.client_rooms.remove(&identity).ok_or_else(|| { + anyhow!( + "{:?} attempted to leave room {:?} before joining it", + identity, + room_name + ) + })?; + Ok(()) + } + + async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> { + // TODO: clear state associated with the `Room`. + + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + room.client_rooms.remove(&identity).ok_or_else(|| { + anyhow!( + "participant {:?} did not join room {:?}", + identity, + room_name + ) + })?; + Ok(()) + } + + pub async fn disconnect_client(&self, client_identity: String) { + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + for room in server_rooms.values_mut() { + if let Some(room) = room.client_rooms.remove(&client_identity) { + *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected; + } + } + } + + async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + + let track = Arc::new(RemoteVideoTrack { + sid: nanoid::nanoid!(17), + publisher_id: identity.clone(), + frames_rx: local_track.frames_rx.clone(), + }); + + room.video_tracks.push(track.clone()); + + for (id, client_room) in &room.client_rooms { + if *id != identity { + let _ = client_room + .0 + .lock() + .video_track_updates + .0 + .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .unwrap(); + } + } + + Ok(()) + } + + async fn publish_audio_track( + &self, + token: String, + _local_track: &LocalAudioTrack, + ) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + + let track = Arc::new(RemoteAudioTrack { + sid: nanoid::nanoid!(17), + publisher_id: identity.clone(), + }); + + let publication = Arc::new(RemoteTrackPublication); + + room.audio_tracks.push(track.clone()); + + for (id, client_room) in &room.client_rooms { + if *id != identity { + let _ = client_room + .0 + .lock() + .audio_track_updates + .0 + .try_broadcast(RemoteAudioTrackUpdate::Subscribed( + track.clone(), + publication.clone(), + )) + .unwrap(); + } + } + + Ok(()) + } + + fn video_tracks(&self, token: String) -> Result>> { + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + Ok(room.video_tracks.clone()) + } + + fn audio_tracks(&self, token: String) -> Result>> { + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + Ok(room.audio_tracks.clone()) + } +} + +#[derive(Default)] +struct TestServerRoom { + client_rooms: HashMap>, + video_tracks: Vec>, + audio_tracks: Vec>, +} + +impl TestServerRoom {} + +pub struct TestApiClient { + url: String, +} + +#[async_trait] +impl live_kit_server::api::Client for TestApiClient { + fn url(&self) -> &str { + &self.url + } + + async fn create_room(&self, name: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.create_room(name).await?; + Ok(()) + } + + async fn delete_room(&self, name: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.delete_room(name).await?; + Ok(()) + } + + async fn remove_participant(&self, room: String, identity: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.remove_participant(room, identity).await?; + Ok(()) + } + + fn room_token(&self, room: &str, identity: &str) -> Result { + let server = TestServer::get(&self.url)?; + token::create( + &server.api_key, + &server.secret_key, + Some(identity), + token::VideoGrant::to_join(room), + ) + } + + fn guest_token(&self, room: &str, identity: &str) -> Result { + let server = TestServer::get(&self.url)?; + token::create( + &server.api_key, + &server.secret_key, + Some(identity), + token::VideoGrant::for_guest(room), + ) + } +} + +pub type Sid = String; + +struct RoomState { + connection: ( + watch::Sender, + watch::Receiver, + ), + display_sources: Vec, + audio_track_updates: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), + video_track_updates: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), +} + +#[derive(Clone, Eq, PartialEq)] +pub enum ConnectionState { + Disconnected, + Connected { url: String, token: String }, +} + +pub struct Room(Mutex); + +impl Room { + pub fn new() -> Arc { + Arc::new(Self(Mutex::new(RoomState { + connection: watch::channel_with(ConnectionState::Disconnected), + display_sources: Default::default(), + video_track_updates: async_broadcast::broadcast(128), + audio_track_updates: async_broadcast::broadcast(128), + }))) + } + + pub fn status(&self) -> watch::Receiver { + self.0.lock().connection.1.clone() + } + + pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { + let this = self.clone(); + let url = url.to_string(); + let token = token.to_string(); + async move { + let server = TestServer::get(&url)?; + server.join_room(token.clone(), this.clone()).await?; + *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token }; + Ok(()) + } + } + + pub fn display_sources(self: &Arc) -> impl Future>> { + let this = self.clone(); + async move { + let server = this.test_server(); + server.executor.simulate_random_delay().await; + Ok(this.0.lock().display_sources.clone()) + } + } + + pub fn publish_video_track( + self: &Arc, + track: LocalVideoTrack, + ) -> impl Future> { + let this = self.clone(); + let track = track.clone(); + async move { + this.test_server() + .publish_video_track(this.token(), track) + .await?; + Ok(LocalTrackPublication) + } + } + pub fn publish_audio_track( + self: &Arc, + track: LocalAudioTrack, + ) -> impl Future> { + let this = self.clone(); + let track = track.clone(); + async move { + this.test_server() + .publish_audio_track(this.token(), &track) + .await?; + Ok(LocalTrackPublication) + } + } + + pub fn unpublish_track(&self, _publication: LocalTrackPublication) {} + + pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .audio_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .collect() + } + + pub fn remote_audio_track_publications( + &self, + publisher_id: &str, + ) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .audio_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .map(|_track| Arc::new(RemoteTrackPublication {})) + .collect() + } + + pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .video_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .collect() + } + + pub fn remote_audio_track_updates(&self) -> impl Stream { + self.0.lock().audio_track_updates.1.clone() + } + + pub fn remote_video_track_updates(&self) -> impl Stream { + self.0.lock().video_track_updates.1.clone() + } + + pub fn set_display_sources(&self, sources: Vec) { + self.0.lock().display_sources = sources; + } + + fn test_server(&self) -> Arc { + match self.0.lock().connection.1.borrow().clone() { + ConnectionState::Disconnected => panic!("must be connected to call this method"), + ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(), + } + } + + fn token(&self) -> String { + match self.0.lock().connection.1.borrow().clone() { + ConnectionState::Disconnected => panic!("must be connected to call this method"), + ConnectionState::Connected { token, .. } => token, + } + } + + fn is_connected(&self) -> bool { + match *self.0.lock().connection.1.borrow() { + ConnectionState::Disconnected => false, + ConnectionState::Connected { .. } => true, + } + } +} + +impl Drop for Room { + fn drop(&mut self) { + if let ConnectionState::Connected { token, .. } = mem::replace( + &mut *self.0.lock().connection.0.borrow_mut(), + ConnectionState::Disconnected, + ) { + if let Ok(server) = TestServer::get(&token) { + let executor = server.executor.clone(); + executor + .spawn(async move { server.leave_room(token).await.unwrap() }) + .detach(); + } + } + } +} + +pub struct LocalTrackPublication; + +impl LocalTrackPublication { + pub fn set_mute(&self, _mute: bool) -> impl Future> { + async { Ok(()) } + } +} + +pub struct RemoteTrackPublication; + +impl RemoteTrackPublication { + pub fn set_enabled(&self, _enabled: bool) -> impl Future> { + async { Ok(()) } + } + + pub fn is_muted(&self) -> bool { + false + } + + pub fn sid(&self) -> String { + "".to_string() + } +} + +#[derive(Clone)] +pub struct LocalVideoTrack { + frames_rx: async_broadcast::Receiver, +} + +impl LocalVideoTrack { + pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { + Self { + frames_rx: display.frames.1.clone(), + } + } +} + +#[derive(Clone)] +pub struct LocalAudioTrack; + +impl LocalAudioTrack { + pub fn create() -> Self { + Self + } +} + +pub struct RemoteVideoTrack { + sid: Sid, + publisher_id: Sid, + frames_rx: async_broadcast::Receiver, +} + +impl RemoteVideoTrack { + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn frames(&self) -> async_broadcast::Receiver { + self.frames_rx.clone() + } +} + +#[derive(Debug)] +pub struct RemoteAudioTrack { + sid: Sid, + publisher_id: Sid, +} + +impl RemoteAudioTrack { + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn enable(&self) -> impl Future> { + async { Ok(()) } + } + + pub fn disable(&self) -> impl Future> { + async { Ok(()) } + } +} + +#[derive(Clone)] +pub enum RemoteVideoTrackUpdate { + Subscribed(Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +#[derive(Clone)] +pub enum RemoteAudioTrackUpdate { + ActiveSpeakersChanged { speakers: Vec }, + MuteChanged { track_id: Sid, muted: bool }, + Subscribed(Arc, Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +#[derive(Clone)] +pub struct MacOSDisplay { + frames: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), +} + +impl MacOSDisplay { + pub fn new() -> Self { + Self { + frames: async_broadcast::broadcast(128), + } + } + + pub fn send_frame(&self, frame: Frame) { + self.frames.0.try_broadcast(frame).unwrap(); + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Frame { + pub label: String, + pub width: usize, + pub height: usize, +} + +impl Frame { + pub fn width(&self) -> usize { + self.width + } + + pub fn height(&self) -> usize { + self.height + } + + pub fn image(&self) -> CVImageBuffer { + unimplemented!("you can't call this in test mode") + } +}