From 244e8ce101cedb56d8c073c5acff99b0cf344b70 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Tue, 31 Oct 2023 14:04:03 -0700 Subject: [PATCH 1/3] WIP - make livekit work in GPUI2 --- Cargo.lock | 35 +- crates/call2/Cargo.toml | 6 +- crates/call2/src/participant.rs | 4 +- crates/call2/src/room.rs | 225 +++-- .../LiveKitBridge/Package.resolved | 4 +- crates/live_kit_client2/.cargo/config.toml | 2 + crates/live_kit_client2/Cargo.toml | 71 ++ .../LiveKitBridge/Package.resolved | 52 + .../LiveKitBridge/Package.swift | 27 + .../live_kit_client2/LiveKitBridge/README.md | 3 + .../Sources/LiveKitBridge/LiveKitBridge.swift | 327 ++++++ crates/live_kit_client2/build.rs | 172 ++++ crates/live_kit_client2/examples/test_app.rs | 175 ++++ .../live_kit_client2/src/live_kit_client2.rs | 11 + crates/live_kit_client2/src/prod.rs | 943 ++++++++++++++++++ crates/live_kit_client2/src/test.rs | 647 ++++++++++++ 16 files changed, 2586 insertions(+), 118 deletions(-) create mode 100644 crates/live_kit_client2/.cargo/config.toml create mode 100644 crates/live_kit_client2/Cargo.toml create mode 100644 crates/live_kit_client2/LiveKitBridge/Package.resolved create mode 100644 crates/live_kit_client2/LiveKitBridge/Package.swift create mode 100644 crates/live_kit_client2/LiveKitBridge/README.md create mode 100644 crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift create mode 100644 crates/live_kit_client2/build.rs create mode 100644 crates/live_kit_client2/examples/test_app.rs create mode 100644 crates/live_kit_client2/src/live_kit_client2.rs create mode 100644 crates/live_kit_client2/src/prod.rs create mode 100644 crates/live_kit_client2/src/test.rs diff --git a/Cargo.lock b/Cargo.lock index 3aca27106c..bb72f5d6ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1169,7 +1169,7 @@ dependencies = [ "futures 0.3.28", "gpui2", "language2", - "live_kit_client", + "live_kit_client2", "log", "media", "postage", @@ -4589,6 +4589,39 @@ dependencies = [ "simplelog", ] +[[package]] +name = "live_kit_client2" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-broadcast", + "async-trait", + "block", + "byteorder", + "bytes 1.5.0", + "cocoa", + "collections", + "core-foundation", + "core-graphics", + "foreign-types", + "futures 0.3.28", + "gpui2", + "hmac 0.12.1", + "jwt", + "live_kit_server", + "log", + "media", + "nanoid", + "objc", + "parking_lot 0.11.2", + "postage", + "serde", + "serde_derive", + "serde_json", + "sha2 0.10.7", + "simplelog", +] + [[package]] name = "live_kit_server" version = "0.1.0" diff --git a/crates/call2/Cargo.toml b/crates/call2/Cargo.toml index f0e47832ed..e918ada3e8 100644 --- a/crates/call2/Cargo.toml +++ b/crates/call2/Cargo.toml @@ -13,7 +13,7 @@ test-support = [ "client2/test-support", "collections/test-support", "gpui2/test-support", - "live_kit_client/test-support", + "live_kit_client2/test-support", "project2/test-support", "util/test-support" ] @@ -24,7 +24,7 @@ client2 = { path = "../client2" } collections = { path = "../collections" } gpui2 = { path = "../gpui2" } log.workspace = true -live_kit_client = { path = "../live_kit_client" } +live_kit_client2 = { path = "../live_kit_client2" } fs2 = { path = "../fs2" } language2 = { path = "../language2" } media = { path = "../media" } @@ -47,6 +47,6 @@ fs2 = { path = "../fs2", features = ["test-support"] } language2 = { path = "../language2", features = ["test-support"] } collections = { path = "../collections", features = ["test-support"] } gpui2 = { path = "../gpui2", features = ["test-support"] } -live_kit_client = { path = "../live_kit_client", features = ["test-support"] } +live_kit_client2 = { path = "../live_kit_client2", features = ["test-support"] } project2 = { path = "../project2", features = ["test-support"] } util = { path = "../util", features = ["test-support"] } diff --git a/crates/call2/src/participant.rs b/crates/call2/src/participant.rs index 7f3e91dbba..a1837e3ad0 100644 --- a/crates/call2/src/participant.rs +++ b/crates/call2/src/participant.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use client2::ParticipantIndex; use client2::{proto, User}; use gpui2::WeakModel; -pub use live_kit_client::Frame; +pub use live_kit_client2::Frame; use project2::Project; use std::{fmt, sync::Arc}; @@ -51,7 +51,7 @@ pub struct RemoteParticipant { #[derive(Clone)] pub struct RemoteVideoTrack { - pub(crate) live_kit_track: Arc, + pub(crate) live_kit_track: Arc, } unsafe impl Send for RemoteVideoTrack {} diff --git a/crates/call2/src/room.rs b/crates/call2/src/room.rs index b7bac52a8b..f0e0b8de17 100644 --- a/crates/call2/src/room.rs +++ b/crates/call2/src/room.rs @@ -19,7 +19,7 @@ use gpui2::{ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel, }; use language2::LanguageRegistry; -use live_kit_client::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate}; +use live_kit_client2::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate}; use postage::{sink::Sink, stream::Stream, watch}; use project2::Project; use settings2::Settings; @@ -59,7 +59,7 @@ pub enum Event { pub struct Room { id: u64, channel_id: Option, - // live_kit: Option, + live_kit: Option, status: RoomStatus, shared_projects: HashSet>, joined_projects: HashSet>, @@ -114,125 +114,130 @@ impl Room { user_store: Model, cx: &mut ModelContext, ) -> Self { - todo!() - // let _live_kit_room = if let Some(connection_info) = live_kit_connection_info { - // let room = live_kit_client::Room::new(); - // let mut status = room.status(); - // // Consume the initial status of the room. - // let _ = status.try_recv(); - // let _maintain_room = cx.spawn(|this, mut cx| async move { - // while let Some(status) = status.next().await { - // let this = if let Some(this) = this.upgrade() { - // this - // } else { - // break; - // }; + let live_kit_room = if let Some(connection_info) = live_kit_connection_info { + let room = live_kit_client2::Room::new(); + let mut status = room.status(); + // Consume the initial status of the room. + let _ = status.try_recv(); + let _maintain_room = cx.spawn(|this, mut cx| async move { + while let Some(status) = status.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; - // if status == live_kit_client::ConnectionState::Disconnected { - // this.update(&mut cx, |this, cx| this.leave(cx).log_err()) - // .ok(); - // break; - // } - // } - // }); + if status == live_kit_client2::ConnectionState::Disconnected { + this.update(&mut cx, |this, cx| this.leave(cx).log_err()) + .ok(); + break; + } + } + }); - // let mut track_video_changes = room.remote_video_track_updates(); - // let _maintain_video_tracks = cx.spawn(|this, mut cx| async move { - // while let Some(track_change) = track_video_changes.next().await { - // let this = if let Some(this) = this.upgrade() { - // this - // } else { - // break; - // }; + let _maintain_video_tracks = cx.spawn_on_main({ + let room = room.clone(); + move |this, mut cx| async move { + let mut track_video_changes = room.remote_video_track_updates(); + while let Some(track_change) = track_video_changes.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; - // this.update(&mut cx, |this, cx| { - // this.remote_video_track_updated(track_change, cx).log_err() - // }) - // .ok(); - // } - // }); + this.update(&mut cx, |this, cx| { + this.remote_video_track_updated(track_change, cx).log_err() + }) + .ok(); + } + } + }); - // let mut track_audio_changes = room.remote_audio_track_updates(); - // let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move { - // while let Some(track_change) = track_audio_changes.next().await { - // let this = if let Some(this) = this.upgrade() { - // this - // } else { - // break; - // }; + let _maintain_audio_tracks = cx.spawn_on_main({ + let room = room.clone(); + |this, mut cx| async move { + let mut track_audio_changes = room.remote_audio_track_updates(); + while let Some(track_change) = track_audio_changes.next().await { + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; - // this.update(&mut cx, |this, cx| { - // this.remote_audio_track_updated(track_change, cx).log_err() - // }) - // .ok(); - // } - // }); + this.update(&mut cx, |this, cx| { + this.remote_audio_track_updated(track_change, cx).log_err() + }) + .ok(); + } + } + }); - // let connect = room.connect(&connection_info.server_url, &connection_info.token); - // cx.spawn(|this, mut cx| async move { - // connect.await?; + let connect = room.connect(&connection_info.server_url, &connection_info.token); + cx.spawn(|this, mut cx| async move { + connect.await?; - // if !cx.update(|cx| Self::mute_on_join(cx))? { - // this.update(&mut cx, |this, cx| this.share_microphone(cx))? - // .await?; - // } + if !cx.update(|cx| Self::mute_on_join(cx))? { + this.update(&mut cx, |this, cx| this.share_microphone(cx))? + .await?; + } - // anyhow::Ok(()) - // }) - // .detach_and_log_err(cx); + anyhow::Ok(()) + }) + .detach_and_log_err(cx); - // Some(LiveKitRoom { - // room, - // screen_track: LocalTrack::None, - // microphone_track: LocalTrack::None, - // next_publish_id: 0, - // muted_by_user: false, - // deafened: false, - // speaking: false, - // _maintain_room, - // _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], - // }) - // } else { - // None - // }; + Some(LiveKitRoom { + room, + screen_track: LocalTrack::None, + microphone_track: LocalTrack::None, + next_publish_id: 0, + muted_by_user: false, + deafened: false, + speaking: false, + _maintain_room, + _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], + }) + } else { + None + }; - // let maintain_connection = cx.spawn({ - // let client = client.clone(); - // move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err() - // }); + let maintain_connection = cx.spawn({ + let client = client.clone(); + move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err() + }); - // Audio::play_sound(Sound::Joined, cx); + Audio::play_sound(Sound::Joined, cx); - // let (room_update_completed_tx, room_update_completed_rx) = watch::channel(); + let (room_update_completed_tx, room_update_completed_rx) = watch::channel(); - // Self { - // id, - // channel_id, - // // live_kit: live_kit_room, - // status: RoomStatus::Online, - // shared_projects: Default::default(), - // joined_projects: Default::default(), - // participant_user_ids: Default::default(), - // local_participant: Default::default(), - // remote_participants: Default::default(), - // pending_participants: Default::default(), - // pending_call_count: 0, - // client_subscriptions: vec![ - // client.add_message_handler(cx.weak_handle(), Self::handle_room_updated) - // ], - // _subscriptions: vec![ - // cx.on_release(Self::released), - // cx.on_app_quit(Self::app_will_quit), - // ], - // leave_when_empty: false, - // pending_room_update: None, - // client, - // user_store, - // follows_by_leader_id_project_id: Default::default(), - // maintain_connection: Some(maintain_connection), - // room_update_completed_tx, - // room_update_completed_rx, - // } + Self { + id, + channel_id, + live_kit: live_kit_room, + status: RoomStatus::Online, + shared_projects: Default::default(), + joined_projects: Default::default(), + participant_user_ids: Default::default(), + local_participant: Default::default(), + remote_participants: Default::default(), + pending_participants: Default::default(), + pending_call_count: 0, + client_subscriptions: vec![ + client.add_message_handler(cx.weak_model(), Self::handle_room_updated) + ], + _subscriptions: vec![ + cx.on_release(Self::released), + cx.on_app_quit(Self::app_will_quit), + ], + leave_when_empty: false, + pending_room_update: None, + client, + user_store, + follows_by_leader_id_project_id: Default::default(), + maintain_connection: Some(maintain_connection), + room_update_completed_tx, + room_update_completed_rx, + } } pub(crate) fn create( @@ -1518,7 +1523,7 @@ impl Room { } #[cfg(any(test, feature = "test-support"))] - pub fn set_display_sources(&self, sources: Vec) { + pub fn set_display_sources(&self, sources: Vec) { todo!() // self.live_kit // .as_ref() @@ -1529,7 +1534,7 @@ impl Room { } struct LiveKitRoom { - room: Arc, + room: Arc, screen_track: LocalTrack, microphone_track: LocalTrack, /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user. diff --git a/crates/live_kit_client/LiveKitBridge/Package.resolved b/crates/live_kit_client/LiveKitBridge/Package.resolved index b925bc8f0d..85ae088565 100644 --- a/crates/live_kit_client/LiveKitBridge/Package.resolved +++ b/crates/live_kit_client/LiveKitBridge/Package.resolved @@ -42,8 +42,8 @@ "repositoryURL": "https://github.com/apple/swift-protobuf.git", "state": { "branch": null, - "revision": "ce20dc083ee485524b802669890291c0d8090170", - "version": "1.22.1" + "revision": "0af9125c4eae12a4973fb66574c53a54962a9e1e", + "version": "1.21.0" } } ] diff --git a/crates/live_kit_client2/.cargo/config.toml b/crates/live_kit_client2/.cargo/config.toml new file mode 100644 index 0000000000..b33fe211bd --- /dev/null +++ b/crates/live_kit_client2/.cargo/config.toml @@ -0,0 +1,2 @@ +[live_kit_client_test] +rustflags = ["-C", "link-args=-ObjC"] diff --git a/crates/live_kit_client2/Cargo.toml b/crates/live_kit_client2/Cargo.toml new file mode 100644 index 0000000000..b5b45a8d45 --- /dev/null +++ b/crates/live_kit_client2/Cargo.toml @@ -0,0 +1,71 @@ +[package] +name = "live_kit_client2" +version = "0.1.0" +edition = "2021" +description = "Bindings to LiveKit Swift client SDK" +publish = false + +[lib] +path = "src/live_kit_client2.rs" +doctest = false + +[[example]] +name = "test_app" + +[features] +test-support = [ + "async-trait", + "collections/test-support", + "gpui2/test-support", + "live_kit_server", + "nanoid", +] + +[dependencies] +collections = { path = "../collections", optional = true } +gpui2 = { path = "../gpui2", optional = true } +live_kit_server = { path = "../live_kit_server", optional = true } +media = { path = "../media" } + +anyhow.workspace = true +async-broadcast = "0.4" +core-foundation = "0.9.3" +core-graphics = "0.22.3" +futures.workspace = true +log.workspace = true +parking_lot.workspace = true +postage.workspace = true + +async-trait = { workspace = true, optional = true } +nanoid = { version ="0.4", optional = true} + +[dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } +gpui2 = { path = "../gpui2", features = ["test-support"] } +live_kit_server = { path = "../live_kit_server" } +media = { path = "../media" } +nanoid = "0.4" + +anyhow.workspace = true +async-trait.workspace = true +block = "0.1" +bytes = "1.2" +byteorder = "1.4" +cocoa = "0.24" +core-foundation = "0.9.3" +core-graphics = "0.22.3" +foreign-types = "0.3" +futures.workspace = true +hmac = "0.12" +jwt = "0.16" +objc = "0.2" +parking_lot.workspace = true +serde.workspace = true +serde_derive.workspace = true +sha2 = "0.10" +simplelog = "0.9" + +[build-dependencies] +serde.workspace = true +serde_derive.workspace = true +serde_json.workspace = true diff --git a/crates/live_kit_client2/LiveKitBridge/Package.resolved b/crates/live_kit_client2/LiveKitBridge/Package.resolved new file mode 100644 index 0000000000..b925bc8f0d --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/Package.resolved @@ -0,0 +1,52 @@ +{ + "object": { + "pins": [ + { + "package": "LiveKit", + "repositoryURL": "https://github.com/livekit/client-sdk-swift.git", + "state": { + "branch": null, + "revision": "7331b813a5ab8a95cfb81fb2b4ed10519428b9ff", + "version": "1.0.12" + } + }, + { + "package": "Promises", + "repositoryURL": "https://github.com/google/promises.git", + "state": { + "branch": null, + "revision": "ec957ccddbcc710ccc64c9dcbd4c7006fcf8b73a", + "version": "2.2.0" + } + }, + { + "package": "WebRTC", + "repositoryURL": "https://github.com/webrtc-sdk/Specs.git", + "state": { + "branch": null, + "revision": "2f6bab30c8df0fe59ab3e58bc99097f757f85f65", + "version": "104.5112.17" + } + }, + { + "package": "swift-log", + "repositoryURL": "https://github.com/apple/swift-log.git", + "state": { + "branch": null, + "revision": "32e8d724467f8fe623624570367e3d50c5638e46", + "version": "1.5.2" + } + }, + { + "package": "SwiftProtobuf", + "repositoryURL": "https://github.com/apple/swift-protobuf.git", + "state": { + "branch": null, + "revision": "ce20dc083ee485524b802669890291c0d8090170", + "version": "1.22.1" + } + } + ] + }, + "version": 1 +} diff --git a/crates/live_kit_client2/LiveKitBridge/Package.swift b/crates/live_kit_client2/LiveKitBridge/Package.swift new file mode 100644 index 0000000000..d7b5c271b9 --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/Package.swift @@ -0,0 +1,27 @@ +// swift-tools-version: 5.5 + +import PackageDescription + +let package = Package( + name: "LiveKitBridge", + platforms: [ + .macOS(.v10_15) + ], + products: [ + // Products define the executables and libraries a package produces, and make them visible to other packages. + .library( + name: "LiveKitBridge", + type: .static, + targets: ["LiveKitBridge"]), + ], + dependencies: [ + .package(url: "https://github.com/livekit/client-sdk-swift.git", .exact("1.0.12")), + ], + targets: [ + // Targets are the basic building blocks of a package. A target can define a module or a test suite. + // Targets can depend on other targets in this package, and on products in packages this package depends on. + .target( + name: "LiveKitBridge", + dependencies: [.product(name: "LiveKit", package: "client-sdk-swift")]), + ] +) diff --git a/crates/live_kit_client2/LiveKitBridge/README.md b/crates/live_kit_client2/LiveKitBridge/README.md new file mode 100644 index 0000000000..b982c67286 --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/README.md @@ -0,0 +1,3 @@ +# LiveKitBridge + +A description of this package. diff --git a/crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift new file mode 100644 index 0000000000..5f22acf581 --- /dev/null +++ b/crates/live_kit_client2/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -0,0 +1,327 @@ +import Foundation +import LiveKit +import WebRTC +import ScreenCaptureKit + +class LKRoomDelegate: RoomDelegate { + var data: UnsafeRawPointer + var onDidDisconnect: @convention(c) (UnsafeRawPointer) -> Void + var onDidSubscribeToRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void + var onDidUnsubscribeFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + var onMuteChangedFromRemoteAudioTrack: @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void + var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void + var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void + var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void + + init( + data: UnsafeRawPointer, + onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void, + onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, + onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void, + onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void) + { + self.data = data + self.onDidDisconnect = onDidDisconnect + self.onDidSubscribeToRemoteAudioTrack = onDidSubscribeToRemoteAudioTrack + self.onDidUnsubscribeFromRemoteAudioTrack = onDidUnsubscribeFromRemoteAudioTrack + self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack + self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack + self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack + self.onActiveSpeakersChanged = onActiveSpeakersChanged + } + + func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) { + if connectionState.isDisconnected { + self.onDidDisconnect(self.data) + } + } + + func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) { + if track.kind == .video { + self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque()) + } else if track.kind == .audio { + self.onDidSubscribeToRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque(), Unmanaged.passUnretained(publication).toOpaque()) + } + } + + func room(_ room: Room, participant: Participant, didUpdate publication: TrackPublication, muted: Bool) { + if publication.kind == .audio { + self.onMuteChangedFromRemoteAudioTrack(self.data, publication.sid as CFString, muted) + } + } + + func room(_ room: Room, didUpdate speakers: [Participant]) { + guard let speaker_ids = speakers.compactMap({ $0.identity as CFString }) as CFArray? else { return } + self.onActiveSpeakersChanged(self.data, speaker_ids) + } + + func room(_ room: Room, participant: RemoteParticipant, didUnsubscribe publication: RemoteTrackPublication, track: Track) { + if track.kind == .video { + self.onDidUnsubscribeFromRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString) + } else if track.kind == .audio { + self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString) + } + } +} + +class LKVideoRenderer: NSObject, VideoRenderer { + var data: UnsafeRawPointer + var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool + var onDrop: @convention(c) (UnsafeRawPointer) -> Void + var adaptiveStreamIsEnabled: Bool = false + var adaptiveStreamSize: CGSize = .zero + weak var track: VideoTrack? + + init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) { + self.data = data + self.onFrame = onFrame + self.onDrop = onDrop + } + + deinit { + self.onDrop(self.data) + } + + func setSize(_ size: CGSize) { + } + + func renderFrame(_ frame: RTCVideoFrame?) { + let buffer = frame?.buffer as? RTCCVPixelBuffer + if let pixelBuffer = buffer?.pixelBuffer { + if !self.onFrame(self.data, pixelBuffer) { + DispatchQueue.main.async { + self.track?.remove(videoRenderer: self) + } + } + } + } +} + +@_cdecl("LKRoomDelegateCreate") +public func LKRoomDelegateCreate( + data: UnsafeRawPointer, + onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void, + onDidSubscribeToRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void, + onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void, + onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void, + onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void, + onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void +) -> UnsafeMutableRawPointer { + let delegate = LKRoomDelegate( + data: data, + onDidDisconnect: onDidDisconnect, + onDidSubscribeToRemoteAudioTrack: onDidSubscribeToRemoteAudioTrack, + onDidUnsubscribeFromRemoteAudioTrack: onDidUnsubscribeFromRemoteAudioTrack, + onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack, + onActiveSpeakersChanged: onActiveSpeakerChanged, + onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack, + onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack + ) + return Unmanaged.passRetained(delegate).toOpaque() +} + +@_cdecl("LKRoomCreate") +public func LKRoomCreate(delegate: UnsafeRawPointer) -> UnsafeMutableRawPointer { + let delegate = Unmanaged.fromOpaque(delegate).takeUnretainedValue() + return Unmanaged.passRetained(Room(delegate: delegate)).toOpaque() +} + +@_cdecl("LKRoomConnect") +public func LKRoomConnect(room: UnsafeRawPointer, url: CFString, token: CFString, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + room.connect(url as String, token as String).then { _ in + callback(callback_data, UnsafeRawPointer(nil) as! CFString?) + }.catch { error in + callback(callback_data, error.localizedDescription as CFString) + } +} + +@_cdecl("LKRoomDisconnect") +public func LKRoomDisconnect(room: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + room.disconnect() +} + +@_cdecl("LKRoomPublishVideoTrack") +public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + room.localParticipant?.publishVideoTrack(track: track).then { publication in + callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil) + }.catch { error in + callback(callback_data, nil, error.localizedDescription as CFString) + } +} + +@_cdecl("LKRoomPublishAudioTrack") +public func LKRoomPublishAudioTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + room.localParticipant?.publishAudioTrack(track: track).then { publication in + callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil) + }.catch { error in + callback(callback_data, nil, error.localizedDescription as CFString) + } +} + + +@_cdecl("LKRoomUnpublishTrack") +public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + let _ = room.localParticipant?.unpublish(publication: publication) +} + +@_cdecl("LKRoomAudioTracksForRemoteParticipant") +public func LKRoomAudioTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + return participant.audioTracks.compactMap { $0.track as? RemoteAudioTrack } as CFArray? + } + } + + return nil; +} + +@_cdecl("LKRoomAudioTrackPublicationsForRemoteParticipant") +public func LKRoomAudioTrackPublicationsForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + return participant.audioTracks.compactMap { $0 as? RemoteTrackPublication } as CFArray? + } + } + + return nil; +} + +@_cdecl("LKRoomVideoTracksForRemoteParticipant") +public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + return participant.videoTracks.compactMap { $0.track as? RemoteVideoTrack } as CFArray? + } + } + + return nil; +} + +@_cdecl("LKLocalAudioTrackCreateTrack") +public func LKLocalAudioTrackCreateTrack() -> UnsafeMutableRawPointer { + let track = LocalAudioTrack.createTrack(options: AudioCaptureOptions( + echoCancellation: true, + noiseSuppression: true + )) + + return Unmanaged.passRetained(track).toOpaque() +} + + +@_cdecl("LKCreateScreenShareTrackForDisplay") +public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { + let display = Unmanaged.fromOpaque(display).takeUnretainedValue() + let track = LocalVideoTrack.createMacOSScreenShareTrack(source: display, preferredMethod: .legacy) + return Unmanaged.passRetained(track).toOpaque() +} + +@_cdecl("LKVideoRendererCreate") +public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer { + Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque() +} + +@_cdecl("LKVideoTrackAddRenderer") +public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() as! VideoTrack + let renderer = Unmanaged.fromOpaque(renderer).takeRetainedValue() + renderer.track = track + track.add(videoRenderer: renderer) +} + +@_cdecl("LKRemoteVideoTrackGetSid") +public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + return track.sid! as CFString +} + +@_cdecl("LKRemoteAudioTrackGetSid") +public func LKRemoteAudioTrackGetSid(track: UnsafeRawPointer) -> CFString { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + return track.sid! as CFString +} + +@_cdecl("LKDisplaySources") +public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) { + MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in + callback(data, displaySources as CFArray, nil) + }.catch { error in + callback(data, nil, error.localizedDescription as CFString) + } +} + +@_cdecl("LKLocalTrackPublicationSetMute") +public func LKLocalTrackPublicationSetMute( + publication: UnsafeRawPointer, + muted: Bool, + on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, + callback_data: UnsafeRawPointer +) { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + if muted { + publication.mute().then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } + } else { + publication.unmute().then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } + } +} + +@_cdecl("LKRemoteTrackPublicationSetEnabled") +public func LKRemoteTrackPublicationSetEnabled( + publication: UnsafeRawPointer, + enabled: Bool, + on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, + callback_data: UnsafeRawPointer +) { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + publication.set(enabled: enabled).then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } +} + +@_cdecl("LKRemoteTrackPublicationIsMuted") +public func LKRemoteTrackPublicationIsMuted( + publication: UnsafeRawPointer +) -> Bool { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + return publication.muted +} + +@_cdecl("LKRemoteTrackPublicationGetSid") +public func LKRemoteTrackPublicationGetSid( + publication: UnsafeRawPointer +) -> CFString { + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + + return publication.sid as CFString +} diff --git a/crates/live_kit_client2/build.rs b/crates/live_kit_client2/build.rs new file mode 100644 index 0000000000..1445704b46 --- /dev/null +++ b/crates/live_kit_client2/build.rs @@ -0,0 +1,172 @@ +use serde::Deserialize; +use std::{ + env, + path::{Path, PathBuf}, + process::Command, +}; + +const SWIFT_PACKAGE_NAME: &str = "LiveKitBridge"; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SwiftTargetInfo { + pub triple: String, + pub unversioned_triple: String, + pub module_triple: String, + pub swift_runtime_compatibility_version: String, + #[serde(rename = "librariesRequireRPath")] + pub libraries_require_rpath: bool, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SwiftPaths { + pub runtime_library_paths: Vec, + pub runtime_library_import_paths: Vec, + pub runtime_resource_path: String, +} + +#[derive(Debug, Deserialize)] +pub struct SwiftTarget { + pub target: SwiftTargetInfo, + pub paths: SwiftPaths, +} + +const MACOS_TARGET_VERSION: &str = "10.15.7"; + +fn main() { + if cfg!(not(any(test, feature = "test-support"))) { + let swift_target = get_swift_target(); + + build_bridge(&swift_target); + link_swift_stdlib(&swift_target); + link_webrtc_framework(&swift_target); + + // Register exported Objective-C selectors, protocols, etc when building example binaries. + println!("cargo:rustc-link-arg=-Wl,-ObjC"); + } +} + +fn build_bridge(swift_target: &SwiftTarget) { + println!("cargo:rerun-if-env-changed=MACOSX_DEPLOYMENT_TARGET"); + println!("cargo:rerun-if-changed={}/Sources", SWIFT_PACKAGE_NAME); + println!( + "cargo:rerun-if-changed={}/Package.swift", + SWIFT_PACKAGE_NAME + ); + println!( + "cargo:rerun-if-changed={}/Package.resolved", + SWIFT_PACKAGE_NAME + ); + + let swift_package_root = swift_package_root(); + let swift_target_folder = swift_target_folder(); + if !Command::new("swift") + .arg("build") + .arg("--disable-automatic-resolution") + .args(["--configuration", &env::var("PROFILE").unwrap()]) + .args(["--triple", &swift_target.target.triple]) + .args(["--build-path".into(), swift_target_folder]) + .current_dir(&swift_package_root) + .status() + .unwrap() + .success() + { + panic!( + "Failed to compile swift package in {}", + swift_package_root.display() + ); + } + + println!( + "cargo:rustc-link-search=native={}", + swift_target.out_dir_path().display() + ); + println!("cargo:rustc-link-lib=static={}", SWIFT_PACKAGE_NAME); +} + +fn link_swift_stdlib(swift_target: &SwiftTarget) { + for path in &swift_target.paths.runtime_library_paths { + println!("cargo:rustc-link-search=native={}", path); + } +} + +fn link_webrtc_framework(swift_target: &SwiftTarget) { + let swift_out_dir_path = swift_target.out_dir_path(); + println!("cargo:rustc-link-lib=framework=WebRTC"); + println!( + "cargo:rustc-link-search=framework={}", + swift_out_dir_path.display() + ); + // Find WebRTC.framework as a sibling of the executable when running tests. + println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path"); + // Find WebRTC.framework in parent directory of the executable when running examples. + println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/.."); + + let source_path = swift_out_dir_path.join("WebRTC.framework"); + let deps_dir_path = + PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../deps/WebRTC.framework"); + let target_dir_path = + PathBuf::from(env::var("OUT_DIR").unwrap()).join("../../../WebRTC.framework"); + copy_dir(&source_path, &deps_dir_path); + copy_dir(&source_path, &target_dir_path); +} + +fn get_swift_target() -> SwiftTarget { + let mut arch = env::var("CARGO_CFG_TARGET_ARCH").unwrap(); + if arch == "aarch64" { + arch = "arm64".into(); + } + let target = format!("{}-apple-macosx{}", arch, MACOS_TARGET_VERSION); + + let swift_target_info_str = Command::new("swift") + .args(["-target", &target, "-print-target-info"]) + .output() + .unwrap() + .stdout; + + serde_json::from_slice(&swift_target_info_str).unwrap() +} + +fn swift_package_root() -> PathBuf { + env::current_dir().unwrap().join(SWIFT_PACKAGE_NAME) +} + +fn swift_target_folder() -> PathBuf { + env::current_dir() + .unwrap() + .join(format!("../../target/{SWIFT_PACKAGE_NAME}")) +} + +fn copy_dir(source: &Path, destination: &Path) { + assert!( + Command::new("rm") + .arg("-rf") + .arg(destination) + .status() + .unwrap() + .success(), + "could not remove {:?} before copying", + destination + ); + + assert!( + Command::new("cp") + .arg("-R") + .args([source, destination]) + .status() + .unwrap() + .success(), + "could not copy {:?} to {:?}", + source, + destination + ); +} + +impl SwiftTarget { + fn out_dir_path(&self) -> PathBuf { + swift_target_folder() + .join(&self.target.unversioned_triple) + .join(env::var("PROFILE").unwrap()) + } +} diff --git a/crates/live_kit_client2/examples/test_app.rs b/crates/live_kit_client2/examples/test_app.rs new file mode 100644 index 0000000000..2147f6ab8c --- /dev/null +++ b/crates/live_kit_client2/examples/test_app.rs @@ -0,0 +1,175 @@ +// use std::time::Duration; +// todo!() + +// use futures::StreamExt; +// use gpui2::{actions, keymap_matcher::Binding, Menu, MenuItem}; +// use live_kit_client2::{ +// LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, +// }; +// use live_kit_server::token::{self, VideoGrant}; +// use log::LevelFilter; +// use simplelog::SimpleLogger; + +// actions!(capture, [Quit]); + +fn main() { + // SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger"); + + // gpui2::App::new(()).unwrap().run(|cx| { + // #[cfg(any(test, feature = "test-support"))] + // println!("USING TEST LIVEKIT"); + + // #[cfg(not(any(test, feature = "test-support")))] + // println!("USING REAL LIVEKIT"); + + // cx.platform().activate(true); + // cx.add_global_action(quit); + + // cx.add_bindings([Binding::new("cmd-q", Quit, None)]); + // cx.set_menus(vec![Menu { + // name: "Zed", + // items: vec![MenuItem::Action { + // name: "Quit", + // action: Box::new(Quit), + // os_action: None, + // }], + // }]); + + // let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into()); + // let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into()); + // let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into()); + + // cx.spawn(|cx| async move { + // let user_a_token = token::create( + // &live_kit_key, + // &live_kit_secret, + // Some("test-participant-1"), + // VideoGrant::to_join("test-room"), + // ) + // .unwrap(); + // let room_a = Room::new(); + // room_a.connect(&live_kit_url, &user_a_token).await.unwrap(); + + // let user2_token = token::create( + // &live_kit_key, + // &live_kit_secret, + // Some("test-participant-2"), + // VideoGrant::to_join("test-room"), + // ) + // .unwrap(); + // let room_b = Room::new(); + // room_b.connect(&live_kit_url, &user2_token).await.unwrap(); + + // let mut audio_track_updates = room_b.remote_audio_track_updates(); + // let audio_track = LocalAudioTrack::create(); + // let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); + + // if let RemoteAudioTrackUpdate::Subscribed(track, _) = + // audio_track_updates.next().await.unwrap() + // { + // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + // assert_eq!(remote_tracks.len(), 1); + // assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); + // assert_eq!(track.publisher_id(), "test-participant-1"); + // } else { + // panic!("unexpected message"); + // } + + // audio_track_publication.set_mute(true).await.unwrap(); + + // println!("waiting for mute changed!"); + // if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = + // audio_track_updates.next().await.unwrap() + // { + // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + // assert_eq!(remote_tracks[0].sid(), track_id); + // assert_eq!(muted, true); + // } else { + // panic!("unexpected message"); + // } + + // audio_track_publication.set_mute(false).await.unwrap(); + + // if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = + // audio_track_updates.next().await.unwrap() + // { + // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + // assert_eq!(remote_tracks[0].sid(), track_id); + // assert_eq!(muted, false); + // } else { + // panic!("unexpected message"); + // } + + // println!("Pausing for 5 seconds to test audio, make some noise!"); + // let timer = cx.background().timer(Duration::from_secs(5)); + // timer.await; + // let remote_audio_track = room_b + // .remote_audio_tracks("test-participant-1") + // .pop() + // .unwrap(); + // room_a.unpublish_track(audio_track_publication); + + // // Clear out any active speakers changed messages + // let mut next = audio_track_updates.next().await.unwrap(); + // while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { + // println!("Speakers changed: {:?}", speakers); + // next = audio_track_updates.next().await.unwrap(); + // } + + // if let RemoteAudioTrackUpdate::Unsubscribed { + // publisher_id, + // track_id, + // } = next + // { + // assert_eq!(publisher_id, "test-participant-1"); + // assert_eq!(remote_audio_track.sid(), track_id); + // assert_eq!(room_b.remote_audio_tracks("test-participant-1").len(), 0); + // } else { + // panic!("unexpected message"); + // } + + // let mut video_track_updates = room_b.remote_video_track_updates(); + // let displays = room_a.display_sources().await.unwrap(); + // let display = displays.into_iter().next().unwrap(); + + // let local_video_track = LocalVideoTrack::screen_share_for_display(&display); + // let local_video_track_publication = + // room_a.publish_video_track(local_video_track).await.unwrap(); + + // if let RemoteVideoTrackUpdate::Subscribed(track) = + // video_track_updates.next().await.unwrap() + // { + // let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); + // assert_eq!(remote_video_tracks.len(), 1); + // assert_eq!(remote_video_tracks[0].publisher_id(), "test-participant-1"); + // assert_eq!(track.publisher_id(), "test-participant-1"); + // } else { + // panic!("unexpected message"); + // } + + // let remote_video_track = room_b + // .remote_video_tracks("test-participant-1") + // .pop() + // .unwrap(); + // room_a.unpublish_track(local_video_track_publication); + // if let RemoteVideoTrackUpdate::Unsubscribed { + // publisher_id, + // track_id, + // } = video_track_updates.next().await.unwrap() + // { + // assert_eq!(publisher_id, "test-participant-1"); + // assert_eq!(remote_video_track.sid(), track_id); + // assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0); + // } else { + // panic!("unexpected message"); + // } + + // cx.platform().quit(); + // }) + // .detach(); + // }); +} + +// fn quit(_: &Quit, cx: &mut gpui2::AppContext) { +// cx.platform().quit(); +// } diff --git a/crates/live_kit_client2/src/live_kit_client2.rs b/crates/live_kit_client2/src/live_kit_client2.rs new file mode 100644 index 0000000000..35682382e9 --- /dev/null +++ b/crates/live_kit_client2/src/live_kit_client2.rs @@ -0,0 +1,11 @@ +// #[cfg(not(any(test, feature = "test-support")))] +pub mod prod; + +// #[cfg(not(any(test, feature = "test-support")))] +pub use prod::*; + +// #[cfg(any(test, feature = "test-support"))] +// pub mod test; + +// #[cfg(any(test, feature = "test-support"))] +// pub use test::*; diff --git a/crates/live_kit_client2/src/prod.rs b/crates/live_kit_client2/src/prod.rs new file mode 100644 index 0000000000..65ed8b754f --- /dev/null +++ b/crates/live_kit_client2/src/prod.rs @@ -0,0 +1,943 @@ +use anyhow::{anyhow, Context, Result}; +use core_foundation::{ + array::{CFArray, CFArrayRef}, + base::{CFRelease, CFRetain, TCFType}, + string::{CFString, CFStringRef}, +}; +use futures::{ + channel::{mpsc, oneshot}, + Future, +}; +pub use media::core_video::CVImageBuffer; +use media::core_video::CVImageBufferRef; +use parking_lot::Mutex; +use postage::watch; +use std::{ + ffi::c_void, + sync::{Arc, Weak}, +}; + +// SAFETY: Most live kit types are threadsafe: +// https://github.com/livekit/client-sdk-swift#thread-safety +macro_rules! pointer_type { + ($pointer_name:ident) => { + #[repr(transparent)] + #[derive(Copy, Clone, Debug)] + pub struct $pointer_name(pub *const std::ffi::c_void); + unsafe impl Send for $pointer_name {} + }; +} + +mod swift { + pointer_type!(Room); + pointer_type!(LocalAudioTrack); + pointer_type!(RemoteAudioTrack); + pointer_type!(LocalVideoTrack); + pointer_type!(RemoteVideoTrack); + pointer_type!(LocalTrackPublication); + pointer_type!(RemoteTrackPublication); + pointer_type!(MacOSDisplay); + pointer_type!(RoomDelegate); +} + +extern "C" { + fn LKRoomDelegateCreate( + callback_data: *mut c_void, + on_did_disconnect: extern "C" fn(callback_data: *mut c_void), + on_did_subscribe_to_remote_audio_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + remote_track: swift::RemoteAudioTrack, + remote_publication: swift::RemoteTrackPublication, + ), + on_did_unsubscribe_from_remote_audio_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ), + on_mute_changed_from_remote_audio_track: extern "C" fn( + callback_data: *mut c_void, + track_id: CFStringRef, + muted: bool, + ), + on_active_speakers_changed: extern "C" fn( + callback_data: *mut c_void, + participants: CFArrayRef, + ), + on_did_subscribe_to_remote_video_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + remote_track: swift::RemoteVideoTrack, + ), + on_did_unsubscribe_from_remote_video_track: extern "C" fn( + callback_data: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ), + ) -> swift::RoomDelegate; + + fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room; + fn LKRoomConnect( + room: swift::Room, + url: CFStringRef, + token: CFStringRef, + callback: extern "C" fn(*mut c_void, CFStringRef), + callback_data: *mut c_void, + ); + fn LKRoomDisconnect(room: swift::Room); + fn LKRoomPublishVideoTrack( + room: swift::Room, + track: swift::LocalVideoTrack, + callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), + callback_data: *mut c_void, + ); + fn LKRoomPublishAudioTrack( + room: swift::Room, + track: swift::LocalAudioTrack, + callback: extern "C" fn(*mut c_void, swift::LocalTrackPublication, CFStringRef), + callback_data: *mut c_void, + ); + fn LKRoomUnpublishTrack(room: swift::Room, publication: swift::LocalTrackPublication); + + fn LKRoomAudioTracksForRemoteParticipant( + room: swift::Room, + participant_id: CFStringRef, + ) -> CFArrayRef; + + fn LKRoomAudioTrackPublicationsForRemoteParticipant( + room: swift::Room, + participant_id: CFStringRef, + ) -> CFArrayRef; + + fn LKRoomVideoTracksForRemoteParticipant( + room: swift::Room, + participant_id: CFStringRef, + ) -> CFArrayRef; + + fn LKVideoRendererCreate( + callback_data: *mut c_void, + on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool, + on_drop: extern "C" fn(callback_data: *mut c_void), + ) -> *const c_void; + + fn LKRemoteAudioTrackGetSid(track: swift::RemoteAudioTrack) -> CFStringRef; + fn LKVideoTrackAddRenderer(track: swift::RemoteVideoTrack, renderer: *const c_void); + fn LKRemoteVideoTrackGetSid(track: swift::RemoteVideoTrack) -> CFStringRef; + + fn LKDisplaySources( + callback_data: *mut c_void, + callback: extern "C" fn( + callback_data: *mut c_void, + sources: CFArrayRef, + error: CFStringRef, + ), + ); + fn LKCreateScreenShareTrackForDisplay(display: swift::MacOSDisplay) -> swift::LocalVideoTrack; + fn LKLocalAudioTrackCreateTrack() -> swift::LocalAudioTrack; + + fn LKLocalTrackPublicationSetMute( + publication: swift::LocalTrackPublication, + muted: bool, + on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), + callback_data: *mut c_void, + ); + + fn LKRemoteTrackPublicationSetEnabled( + publication: swift::RemoteTrackPublication, + enabled: bool, + on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), + callback_data: *mut c_void, + ); + + fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool; + fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; +} + +pub type Sid = String; + +#[derive(Clone, Eq, PartialEq)] +pub enum ConnectionState { + Disconnected, + Connected { url: String, token: String }, +} + +pub struct Room { + native_room: Mutex, + connection: Mutex<( + watch::Sender, + watch::Receiver, + )>, + remote_audio_track_subscribers: Mutex>>, + remote_video_track_subscribers: Mutex>>, + _delegate: Mutex, +} + +trait AssertSendSync: Send {} +impl AssertSendSync for Room {} + +impl Room { + pub fn new() -> Arc { + Arc::new_cyclic(|weak_room| { + let delegate = RoomDelegate::new(weak_room.clone()); + Self { + native_room: Mutex::new(unsafe { LKRoomCreate(delegate.native_delegate) }), + connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)), + remote_audio_track_subscribers: Default::default(), + remote_video_track_subscribers: Default::default(), + _delegate: Mutex::new(delegate), + } + }) + } + + pub fn status(&self) -> watch::Receiver { + self.connection.lock().1.clone() + } + + pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { + let url = CFString::new(url); + let token = CFString::new(token); + let (did_connect, tx, rx) = Self::build_done_callback(); + unsafe { + LKRoomConnect( + *self.native_room.lock(), + url.as_concrete_TypeRef(), + token.as_concrete_TypeRef(), + did_connect, + tx, + ) + } + + let this = self.clone(); + let url = url.to_string(); + let token = token.to_string(); + async move { + rx.await.unwrap().context("error connecting to room")?; + *this.connection.lock().0.borrow_mut() = ConnectionState::Connected { url, token }; + Ok(()) + } + } + + fn did_disconnect(&self) { + *self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected; + } + + pub fn display_sources(self: &Arc) -> impl Future>> { + extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) { + unsafe { + let tx = Box::from_raw(tx as *mut oneshot::Sender>>); + + if sources.is_null() { + let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error)))); + } else { + let sources = CFArray::wrap_under_get_rule(sources) + .into_iter() + .map(|source| MacOSDisplay::new(swift::MacOSDisplay(*source))) + .collect(); + + let _ = tx.send(Ok(sources)); + } + } + } + + let (tx, rx) = oneshot::channel(); + + unsafe { + LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback); + } + + async move { rx.await.unwrap() } + } + + pub fn publish_video_track( + self: &Arc, + track: LocalVideoTrack, + ) -> impl Future> { + let (tx, rx) = oneshot::channel::>(); + extern "C" fn callback( + tx: *mut c_void, + publication: swift::LocalTrackPublication, + error: CFStringRef, + ) { + let tx = + unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; + if error.is_null() { + let _ = tx.send(Ok(LocalTrackPublication::new(publication))); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + let _ = tx.send(Err(anyhow!(error))); + } + } + unsafe { + LKRoomPublishVideoTrack( + *self.native_room.lock(), + track.0, + callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ); + } + async { rx.await.unwrap().context("error publishing video track") } + } + + pub fn publish_audio_track( + self: &Arc, + track: LocalAudioTrack, + ) -> impl Future> { + let (tx, rx) = oneshot::channel::>(); + extern "C" fn callback( + tx: *mut c_void, + publication: swift::LocalTrackPublication, + error: CFStringRef, + ) { + let tx = + unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; + if error.is_null() { + let _ = tx.send(Ok(LocalTrackPublication::new(publication))); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + let _ = tx.send(Err(anyhow!(error))); + } + } + unsafe { + LKRoomPublishAudioTrack( + *self.native_room.lock(), + track.0, + callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ); + } + async { rx.await.unwrap().context("error publishing audio track") } + } + + pub fn unpublish_track(&self, publication: LocalTrackPublication) { + unsafe { + LKRoomUnpublishTrack(*self.native_room.lock(), publication.0); + } + } + + pub fn remote_video_tracks(&self, participant_id: &str) -> Vec> { + unsafe { + let tracks = LKRoomVideoTracksForRemoteParticipant( + *self.native_room.lock(), + CFString::new(participant_id).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + let tracks = CFArray::wrap_under_get_rule(tracks); + tracks + .into_iter() + .map(|native_track| { + let native_track = swift::RemoteVideoTrack(*native_track); + let id = + CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track)) + .to_string(); + Arc::new(RemoteVideoTrack::new( + native_track, + id, + participant_id.into(), + )) + }) + .collect() + } + } + } + + pub fn remote_audio_tracks(&self, participant_id: &str) -> Vec> { + unsafe { + let tracks = LKRoomAudioTracksForRemoteParticipant( + *self.native_room.lock(), + CFString::new(participant_id).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + let tracks = CFArray::wrap_under_get_rule(tracks); + tracks + .into_iter() + .map(|native_track| { + let native_track = swift::RemoteAudioTrack(*native_track); + let id = + CFString::wrap_under_get_rule(LKRemoteAudioTrackGetSid(native_track)) + .to_string(); + Arc::new(RemoteAudioTrack::new( + native_track, + id, + participant_id.into(), + )) + }) + .collect() + } + } + } + + pub fn remote_audio_track_publications( + &self, + participant_id: &str, + ) -> Vec> { + unsafe { + let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant( + *self.native_room.lock(), + CFString::new(participant_id).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + let tracks = CFArray::wrap_under_get_rule(tracks); + tracks + .into_iter() + .map(|native_track_publication| { + let native_track_publication = + swift::RemoteTrackPublication(*native_track_publication); + Arc::new(RemoteTrackPublication::new(native_track_publication)) + }) + .collect() + } + } + } + + pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + self.remote_audio_track_subscribers.lock().push(tx); + rx + } + + pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + self.remote_video_track_subscribers.lock().push(tx); + rx + } + + fn did_subscribe_to_remote_audio_track( + &self, + track: RemoteAudioTrack, + publication: RemoteTrackPublication, + ) { + let track = Arc::new(track); + let publication = Arc::new(publication); + self.remote_audio_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed( + track.clone(), + publication.clone(), + )) + .is_ok() + }); + } + + fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) { + self.remote_audio_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed { + publisher_id: publisher_id.clone(), + track_id: track_id.clone(), + }) + .is_ok() + }); + } + + fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) { + self.remote_audio_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged { + track_id: track_id.clone(), + muted, + }) + .is_ok() + }); + } + + // A vec of publisher IDs + fn active_speakers_changed(&self, speakers: Vec) { + self.remote_audio_track_subscribers + .lock() + .retain(move |tx| { + tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged { + speakers: speakers.clone(), + }) + .is_ok() + }); + } + + fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) { + let track = Arc::new(track); + self.remote_video_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .is_ok() + }); + } + + fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) { + self.remote_video_track_subscribers.lock().retain(|tx| { + tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed { + publisher_id: publisher_id.clone(), + track_id: track_id.clone(), + }) + .is_ok() + }); + } + + fn build_done_callback() -> ( + extern "C" fn(*mut c_void, CFStringRef), + *mut c_void, + oneshot::Receiver>, + ) { + let (tx, rx) = oneshot::channel(); + extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) { + let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; + if error.is_null() { + let _ = tx.send(Ok(())); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + let _ = tx.send(Err(anyhow!(error))); + } + } + ( + done_callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + rx, + ) + } +} + +impl Drop for Room { + fn drop(&mut self) { + unsafe { + let native_room = &*self.native_room.lock(); + LKRoomDisconnect(*native_room); + CFRelease(native_room.0); + } + } +} + +struct RoomDelegate { + native_delegate: swift::RoomDelegate, + _weak_room: Weak, +} + +impl RoomDelegate { + fn new(weak_room: Weak) -> Self { + let native_delegate = unsafe { + LKRoomDelegateCreate( + weak_room.as_ptr() as *mut c_void, + Self::on_did_disconnect, + Self::on_did_subscribe_to_remote_audio_track, + Self::on_did_unsubscribe_from_remote_audio_track, + Self::on_mute_change_from_remote_audio_track, + Self::on_active_speakers_changed, + Self::on_did_subscribe_to_remote_video_track, + Self::on_did_unsubscribe_from_remote_video_track, + ) + }; + Self { + native_delegate, + _weak_room: weak_room, + } + } + + extern "C" fn on_did_disconnect(room: *mut c_void) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + if let Some(room) = room.upgrade() { + room.did_disconnect(); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_subscribe_to_remote_audio_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + track: swift::RemoteAudioTrack, + publication: swift::RemoteTrackPublication, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + let track = RemoteAudioTrack::new(track, track_id, publisher_id); + let publication = RemoteTrackPublication::new(publication); + if let Some(room) = room.upgrade() { + room.did_subscribe_to_remote_audio_track(track, publication); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_unsubscribe_from_remote_audio_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + if let Some(room) = room.upgrade() { + room.did_unsubscribe_from_remote_audio_track(publisher_id, track_id); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_mute_change_from_remote_audio_track( + room: *mut c_void, + track_id: CFStringRef, + muted: bool, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + if let Some(room) = room.upgrade() { + room.mute_changed_from_remote_audio_track(track_id, muted); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_active_speakers_changed(room: *mut c_void, participants: CFArrayRef) { + if participants.is_null() { + return; + } + + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let speakers = unsafe { + CFArray::wrap_under_get_rule(participants) + .into_iter() + .map( + |speaker: core_foundation::base::ItemRef<'_, *const c_void>| { + CFString::wrap_under_get_rule(*speaker as CFStringRef).to_string() + }, + ) + .collect() + }; + + if let Some(room) = room.upgrade() { + room.active_speakers_changed(speakers); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_subscribe_to_remote_video_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + track: swift::RemoteVideoTrack, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + let track = RemoteVideoTrack::new(track, track_id, publisher_id); + if let Some(room) = room.upgrade() { + room.did_subscribe_to_remote_video_track(track); + } + let _ = Weak::into_raw(room); + } + + extern "C" fn on_did_unsubscribe_from_remote_video_track( + room: *mut c_void, + publisher_id: CFStringRef, + track_id: CFStringRef, + ) { + let room = unsafe { Weak::from_raw(room as *mut Room) }; + let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() }; + let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() }; + if let Some(room) = room.upgrade() { + room.did_unsubscribe_from_remote_video_track(publisher_id, track_id); + } + let _ = Weak::into_raw(room); + } +} + +impl Drop for RoomDelegate { + fn drop(&mut self) { + unsafe { + CFRelease(self.native_delegate.0); + } + } +} + +pub struct LocalAudioTrack(swift::LocalAudioTrack); + +impl LocalAudioTrack { + pub fn create() -> Self { + Self(unsafe { LKLocalAudioTrackCreateTrack() }) + } +} + +impl Drop for LocalAudioTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +pub struct LocalVideoTrack(swift::LocalVideoTrack); + +impl LocalVideoTrack { + pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { + Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) }) + } +} + +impl Drop for LocalVideoTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +pub struct LocalTrackPublication(swift::LocalTrackPublication); + +impl LocalTrackPublication { + pub fn new(native_track_publication: swift::LocalTrackPublication) -> Self { + unsafe { + CFRetain(native_track_publication.0); + } + Self(native_track_publication) + } + + pub fn set_mute(&self, muted: bool) -> impl Future> { + let (tx, rx) = futures::channel::oneshot::channel(); + + extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { + let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; + if error.is_null() { + tx.send(Ok(())).ok(); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + tx.send(Err(anyhow!(error))).ok(); + } + } + + unsafe { + LKLocalTrackPublicationSetMute( + self.0, + muted, + complete_callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ) + } + + async move { rx.await.unwrap() } + } +} + +impl Drop for LocalTrackPublication { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +pub struct RemoteTrackPublication { + native_publication: Mutex, +} + +impl RemoteTrackPublication { + pub fn new(native_track_publication: swift::RemoteTrackPublication) -> Self { + unsafe { + CFRetain(native_track_publication.0); + } + Self { + native_publication: Mutex::new(native_track_publication), + } + } + + pub fn sid(&self) -> String { + unsafe { + CFString::wrap_under_get_rule(LKRemoteTrackPublicationGetSid( + *self.native_publication.lock(), + )) + .to_string() + } + } + + pub fn is_muted(&self) -> bool { + unsafe { LKRemoteTrackPublicationIsMuted(*self.native_publication.lock()) } + } + + pub fn set_enabled(&self, enabled: bool) -> impl Future> { + let (tx, rx) = futures::channel::oneshot::channel(); + + extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { + let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; + if error.is_null() { + tx.send(Ok(())).ok(); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + tx.send(Err(anyhow!(error))).ok(); + } + } + + unsafe { + LKRemoteTrackPublicationSetEnabled( + *self.native_publication.lock(), + enabled, + complete_callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ) + } + + async move { rx.await.unwrap() } + } +} + +impl Drop for RemoteTrackPublication { + fn drop(&mut self) { + unsafe { CFRelease((*self.native_publication.lock()).0) } + } +} + +#[derive(Debug)] +pub struct RemoteAudioTrack { + native_track: Mutex, + sid: Sid, + publisher_id: String, +} + +impl RemoteAudioTrack { + fn new(native_track: swift::RemoteAudioTrack, sid: Sid, publisher_id: String) -> Self { + unsafe { + CFRetain(native_track.0); + } + Self { + native_track: Mutex::new(native_track), + sid, + publisher_id, + } + } + + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn enable(&self) -> impl Future> { + async { Ok(()) } + } + + pub fn disable(&self) -> impl Future> { + async { Ok(()) } + } +} + +impl Drop for RemoteAudioTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.native_track.lock().0) } + } +} + +#[derive(Debug)] +pub struct RemoteVideoTrack { + native_track: Mutex, + sid: Sid, + publisher_id: String, +} + +impl RemoteVideoTrack { + fn new(native_track: swift::RemoteVideoTrack, sid: Sid, publisher_id: String) -> Self { + unsafe { + CFRetain(native_track.0); + } + Self { + native_track: Mutex::new(native_track), + sid, + publisher_id, + } + } + + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn frames(&self) -> async_broadcast::Receiver { + extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool { + unsafe { + let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender); + let buffer = CVImageBuffer::wrap_under_get_rule(frame); + let result = tx.try_broadcast(Frame(buffer)); + let _ = Box::into_raw(tx); + match result { + Ok(_) => true, + Err(async_broadcast::TrySendError::Closed(_)) + | Err(async_broadcast::TrySendError::Inactive(_)) => { + log::warn!("no active receiver for frame"); + false + } + Err(async_broadcast::TrySendError::Full(_)) => { + log::warn!("skipping frame as receiver is not keeping up"); + true + } + } + } + } + + extern "C" fn on_drop(callback_data: *mut c_void) { + unsafe { + let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender); + } + } + + let (tx, rx) = async_broadcast::broadcast(64); + unsafe { + let renderer = LKVideoRendererCreate( + Box::into_raw(Box::new(tx)) as *mut c_void, + on_frame, + on_drop, + ); + LKVideoTrackAddRenderer(*self.native_track.lock(), renderer); + rx + } + } +} + +impl Drop for RemoteVideoTrack { + fn drop(&mut self) { + unsafe { CFRelease(self.native_track.lock().0) } + } +} + +pub enum RemoteVideoTrackUpdate { + Subscribed(Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +pub enum RemoteAudioTrackUpdate { + ActiveSpeakersChanged { speakers: Vec }, + MuteChanged { track_id: Sid, muted: bool }, + Subscribed(Arc, Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +pub struct MacOSDisplay(swift::MacOSDisplay); + +impl MacOSDisplay { + fn new(ptr: swift::MacOSDisplay) -> Self { + unsafe { + CFRetain(ptr.0); + } + Self(ptr) + } +} + +impl Drop for MacOSDisplay { + fn drop(&mut self) { + unsafe { CFRelease(self.0 .0) } + } +} + +#[derive(Clone)] +pub struct Frame(CVImageBuffer); + +impl Frame { + pub fn width(&self) -> usize { + self.0.width() + } + + pub fn height(&self) -> usize { + self.0.height() + } + + pub fn image(&self) -> CVImageBuffer { + self.0.clone() + } +} diff --git a/crates/live_kit_client2/src/test.rs b/crates/live_kit_client2/src/test.rs new file mode 100644 index 0000000000..7185c11fa8 --- /dev/null +++ b/crates/live_kit_client2/src/test.rs @@ -0,0 +1,647 @@ +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use collections::{BTreeMap, HashMap}; +use futures::Stream; +use gpui2::Executor; +use live_kit_server::token; +use media::core_video::CVImageBuffer; +use parking_lot::Mutex; +use postage::watch; +use std::{future::Future, mem, sync::Arc}; + +static SERVERS: Mutex>> = Mutex::new(BTreeMap::new()); + +pub struct TestServer { + pub url: String, + pub api_key: String, + pub secret_key: String, + rooms: Mutex>, + executor: Arc, +} + +impl TestServer { + pub fn create( + url: String, + api_key: String, + secret_key: String, + executor: Arc, + ) -> Result> { + let mut servers = SERVERS.lock(); + if servers.contains_key(&url) { + Err(anyhow!("a server with url {:?} already exists", url)) + } else { + let server = Arc::new(TestServer { + url: url.clone(), + api_key, + secret_key, + rooms: Default::default(), + executor, + }); + servers.insert(url, server.clone()); + Ok(server) + } + } + + fn get(url: &str) -> Result> { + Ok(SERVERS + .lock() + .get(url) + .ok_or_else(|| anyhow!("no server found for url"))? + .clone()) + } + + pub fn teardown(&self) -> Result<()> { + SERVERS + .lock() + .remove(&self.url) + .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?; + Ok(()) + } + + pub fn create_api_client(&self) -> TestApiClient { + TestApiClient { + url: self.url.clone(), + } + } + + pub async fn create_room(&self, room: String) -> Result<()> { + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + if server_rooms.contains_key(&room) { + Err(anyhow!("room {:?} already exists", room)) + } else { + server_rooms.insert(room, Default::default()); + Ok(()) + } + } + + async fn delete_room(&self, room: String) -> Result<()> { + // TODO: clear state associated with all `Room`s. + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + server_rooms + .remove(&room) + .ok_or_else(|| anyhow!("room {:?} does not exist", room))?; + Ok(()) + } + + async fn join_room(&self, token: String, client_room: Arc) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + let mut server_rooms = self.rooms.lock(); + let room = (*server_rooms).entry(room_name.to_string()).or_default(); + + if room.client_rooms.contains_key(&identity) { + Err(anyhow!( + "{:?} attempted to join room {:?} twice", + identity, + room_name + )) + } else { + for track in &room.video_tracks { + client_room + .0 + .lock() + .video_track_updates + .0 + .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .unwrap(); + } + room.client_rooms.insert(identity, client_room); + Ok(()) + } + } + + async fn leave_room(&self, token: String) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + room.client_rooms.remove(&identity).ok_or_else(|| { + anyhow!( + "{:?} attempted to leave room {:?} before joining it", + identity, + room_name + ) + })?; + Ok(()) + } + + async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> { + // TODO: clear state associated with the `Room`. + + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + room.client_rooms.remove(&identity).ok_or_else(|| { + anyhow!( + "participant {:?} did not join room {:?}", + identity, + room_name + ) + })?; + Ok(()) + } + + pub async fn disconnect_client(&self, client_identity: String) { + self.executor.simulate_random_delay().await; + let mut server_rooms = self.rooms.lock(); + for room in server_rooms.values_mut() { + if let Some(room) = room.client_rooms.remove(&client_identity) { + *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected; + } + } + } + + async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + + let track = Arc::new(RemoteVideoTrack { + sid: nanoid::nanoid!(17), + publisher_id: identity.clone(), + frames_rx: local_track.frames_rx.clone(), + }); + + room.video_tracks.push(track.clone()); + + for (id, client_room) in &room.client_rooms { + if *id != identity { + let _ = client_room + .0 + .lock() + .video_track_updates + .0 + .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .unwrap(); + } + } + + Ok(()) + } + + async fn publish_audio_track( + &self, + token: String, + _local_track: &LocalAudioTrack, + ) -> Result<()> { + self.executor.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + + let track = Arc::new(RemoteAudioTrack { + sid: nanoid::nanoid!(17), + publisher_id: identity.clone(), + }); + + let publication = Arc::new(RemoteTrackPublication); + + room.audio_tracks.push(track.clone()); + + for (id, client_room) in &room.client_rooms { + if *id != identity { + let _ = client_room + .0 + .lock() + .audio_track_updates + .0 + .try_broadcast(RemoteAudioTrackUpdate::Subscribed( + track.clone(), + publication.clone(), + )) + .unwrap(); + } + } + + Ok(()) + } + + fn video_tracks(&self, token: String) -> Result>> { + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + Ok(room.video_tracks.clone()) + } + + fn audio_tracks(&self, token: String) -> Result>> { + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + Ok(room.audio_tracks.clone()) + } +} + +#[derive(Default)] +struct TestServerRoom { + client_rooms: HashMap>, + video_tracks: Vec>, + audio_tracks: Vec>, +} + +impl TestServerRoom {} + +pub struct TestApiClient { + url: String, +} + +#[async_trait] +impl live_kit_server::api::Client for TestApiClient { + fn url(&self) -> &str { + &self.url + } + + async fn create_room(&self, name: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.create_room(name).await?; + Ok(()) + } + + async fn delete_room(&self, name: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.delete_room(name).await?; + Ok(()) + } + + async fn remove_participant(&self, room: String, identity: String) -> Result<()> { + let server = TestServer::get(&self.url)?; + server.remove_participant(room, identity).await?; + Ok(()) + } + + fn room_token(&self, room: &str, identity: &str) -> Result { + let server = TestServer::get(&self.url)?; + token::create( + &server.api_key, + &server.secret_key, + Some(identity), + token::VideoGrant::to_join(room), + ) + } + + fn guest_token(&self, room: &str, identity: &str) -> Result { + let server = TestServer::get(&self.url)?; + token::create( + &server.api_key, + &server.secret_key, + Some(identity), + token::VideoGrant::for_guest(room), + ) + } +} + +pub type Sid = String; + +struct RoomState { + connection: ( + watch::Sender, + watch::Receiver, + ), + display_sources: Vec, + audio_track_updates: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), + video_track_updates: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), +} + +#[derive(Clone, Eq, PartialEq)] +pub enum ConnectionState { + Disconnected, + Connected { url: String, token: String }, +} + +pub struct Room(Mutex); + +impl Room { + pub fn new() -> Arc { + Arc::new(Self(Mutex::new(RoomState { + connection: watch::channel_with(ConnectionState::Disconnected), + display_sources: Default::default(), + video_track_updates: async_broadcast::broadcast(128), + audio_track_updates: async_broadcast::broadcast(128), + }))) + } + + pub fn status(&self) -> watch::Receiver { + self.0.lock().connection.1.clone() + } + + pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { + let this = self.clone(); + let url = url.to_string(); + let token = token.to_string(); + async move { + let server = TestServer::get(&url)?; + server.join_room(token.clone(), this.clone()).await?; + *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token }; + Ok(()) + } + } + + pub fn display_sources(self: &Arc) -> impl Future>> { + let this = self.clone(); + async move { + let server = this.test_server(); + server.executor.simulate_random_delay().await; + Ok(this.0.lock().display_sources.clone()) + } + } + + pub fn publish_video_track( + self: &Arc, + track: LocalVideoTrack, + ) -> impl Future> { + let this = self.clone(); + let track = track.clone(); + async move { + this.test_server() + .publish_video_track(this.token(), track) + .await?; + Ok(LocalTrackPublication) + } + } + pub fn publish_audio_track( + self: &Arc, + track: LocalAudioTrack, + ) -> impl Future> { + let this = self.clone(); + let track = track.clone(); + async move { + this.test_server() + .publish_audio_track(this.token(), &track) + .await?; + Ok(LocalTrackPublication) + } + } + + pub fn unpublish_track(&self, _publication: LocalTrackPublication) {} + + pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .audio_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .collect() + } + + pub fn remote_audio_track_publications( + &self, + publisher_id: &str, + ) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .audio_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .map(|_track| Arc::new(RemoteTrackPublication {})) + .collect() + } + + pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .video_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .collect() + } + + pub fn remote_audio_track_updates(&self) -> impl Stream { + self.0.lock().audio_track_updates.1.clone() + } + + pub fn remote_video_track_updates(&self) -> impl Stream { + self.0.lock().video_track_updates.1.clone() + } + + pub fn set_display_sources(&self, sources: Vec) { + self.0.lock().display_sources = sources; + } + + fn test_server(&self) -> Arc { + match self.0.lock().connection.1.borrow().clone() { + ConnectionState::Disconnected => panic!("must be connected to call this method"), + ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(), + } + } + + fn token(&self) -> String { + match self.0.lock().connection.1.borrow().clone() { + ConnectionState::Disconnected => panic!("must be connected to call this method"), + ConnectionState::Connected { token, .. } => token, + } + } + + fn is_connected(&self) -> bool { + match *self.0.lock().connection.1.borrow() { + ConnectionState::Disconnected => false, + ConnectionState::Connected { .. } => true, + } + } +} + +impl Drop for Room { + fn drop(&mut self) { + if let ConnectionState::Connected { token, .. } = mem::replace( + &mut *self.0.lock().connection.0.borrow_mut(), + ConnectionState::Disconnected, + ) { + if let Ok(server) = TestServer::get(&token) { + let executor = server.executor.clone(); + executor + .spawn(async move { server.leave_room(token).await.unwrap() }) + .detach(); + } + } + } +} + +pub struct LocalTrackPublication; + +impl LocalTrackPublication { + pub fn set_mute(&self, _mute: bool) -> impl Future> { + async { Ok(()) } + } +} + +pub struct RemoteTrackPublication; + +impl RemoteTrackPublication { + pub fn set_enabled(&self, _enabled: bool) -> impl Future> { + async { Ok(()) } + } + + pub fn is_muted(&self) -> bool { + false + } + + pub fn sid(&self) -> String { + "".to_string() + } +} + +#[derive(Clone)] +pub struct LocalVideoTrack { + frames_rx: async_broadcast::Receiver, +} + +impl LocalVideoTrack { + pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { + Self { + frames_rx: display.frames.1.clone(), + } + } +} + +#[derive(Clone)] +pub struct LocalAudioTrack; + +impl LocalAudioTrack { + pub fn create() -> Self { + Self + } +} + +pub struct RemoteVideoTrack { + sid: Sid, + publisher_id: Sid, + frames_rx: async_broadcast::Receiver, +} + +impl RemoteVideoTrack { + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn frames(&self) -> async_broadcast::Receiver { + self.frames_rx.clone() + } +} + +#[derive(Debug)] +pub struct RemoteAudioTrack { + sid: Sid, + publisher_id: Sid, +} + +impl RemoteAudioTrack { + pub fn sid(&self) -> &str { + &self.sid + } + + pub fn publisher_id(&self) -> &str { + &self.publisher_id + } + + pub fn enable(&self) -> impl Future> { + async { Ok(()) } + } + + pub fn disable(&self) -> impl Future> { + async { Ok(()) } + } +} + +#[derive(Clone)] +pub enum RemoteVideoTrackUpdate { + Subscribed(Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +#[derive(Clone)] +pub enum RemoteAudioTrackUpdate { + ActiveSpeakersChanged { speakers: Vec }, + MuteChanged { track_id: Sid, muted: bool }, + Subscribed(Arc, Arc), + Unsubscribed { publisher_id: Sid, track_id: Sid }, +} + +#[derive(Clone)] +pub struct MacOSDisplay { + frames: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), +} + +impl MacOSDisplay { + pub fn new() -> Self { + Self { + frames: async_broadcast::broadcast(128), + } + } + + pub fn send_frame(&self, frame: Frame) { + self.frames.0.try_broadcast(frame).unwrap(); + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Frame { + pub label: String, + pub width: usize, + pub height: usize, +} + +impl Frame { + pub fn width(&self) -> usize { + self.width + } + + pub fn height(&self) -> usize { + self.height + } + + pub fn image(&self) -> CVImageBuffer { + unimplemented!("you can't call this in test mode") + } +} From 51fa80ef066d28816f47d095e76e9c5ef0aede19 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Wed, 1 Nov 2023 09:19:32 -0700 Subject: [PATCH 2/3] ported example app, live_kit_client2 is done --- crates/live_kit_client2/examples/test_app.rs | 299 +++++++++--------- .../live_kit_client2/src/live_kit_client2.rs | 12 +- crates/live_kit_client2/src/test.rs | 4 +- 3 files changed, 159 insertions(+), 156 deletions(-) diff --git a/crates/live_kit_client2/examples/test_app.rs b/crates/live_kit_client2/examples/test_app.rs index 2147f6ab8c..ad10a4c95d 100644 --- a/crates/live_kit_client2/examples/test_app.rs +++ b/crates/live_kit_client2/examples/test_app.rs @@ -1,175 +1,178 @@ -// use std::time::Duration; -// todo!() +use std::{sync::Arc, time::Duration}; -// use futures::StreamExt; -// use gpui2::{actions, keymap_matcher::Binding, Menu, MenuItem}; -// use live_kit_client2::{ -// LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, -// }; -// use live_kit_server::token::{self, VideoGrant}; -// use log::LevelFilter; -// use simplelog::SimpleLogger; +use futures::StreamExt; +use gpui2::KeyBinding; +use live_kit_client2::{ + LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, +}; +use live_kit_server::token::{self, VideoGrant}; +use log::LevelFilter; +use serde_derive::Deserialize; +use simplelog::SimpleLogger; -// actions!(capture, [Quit]); +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)] +struct Quit; fn main() { - // SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger"); + SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger"); - // gpui2::App::new(()).unwrap().run(|cx| { - // #[cfg(any(test, feature = "test-support"))] - // println!("USING TEST LIVEKIT"); + gpui2::App::production(Arc::new(())).run(|cx| { + #[cfg(any(test, feature = "test-support"))] + println!("USING TEST LIVEKIT"); - // #[cfg(not(any(test, feature = "test-support")))] - // println!("USING REAL LIVEKIT"); + #[cfg(not(any(test, feature = "test-support")))] + println!("USING REAL LIVEKIT"); - // cx.platform().activate(true); - // cx.add_global_action(quit); + cx.activate(true); - // cx.add_bindings([Binding::new("cmd-q", Quit, None)]); - // cx.set_menus(vec![Menu { - // name: "Zed", - // items: vec![MenuItem::Action { - // name: "Quit", - // action: Box::new(Quit), - // os_action: None, - // }], - // }]); + cx.on_action(quit); + cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]); - // let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into()); - // let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into()); - // let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into()); + // todo!() + // cx.set_menus(vec![Menu { + // name: "Zed", + // items: vec![MenuItem::Action { + // name: "Quit", + // action: Box::new(Quit), + // os_action: None, + // }], + // }]); - // cx.spawn(|cx| async move { - // let user_a_token = token::create( - // &live_kit_key, - // &live_kit_secret, - // Some("test-participant-1"), - // VideoGrant::to_join("test-room"), - // ) - // .unwrap(); - // let room_a = Room::new(); - // room_a.connect(&live_kit_url, &user_a_token).await.unwrap(); + let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into()); + let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into()); + let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into()); - // let user2_token = token::create( - // &live_kit_key, - // &live_kit_secret, - // Some("test-participant-2"), - // VideoGrant::to_join("test-room"), - // ) - // .unwrap(); - // let room_b = Room::new(); - // room_b.connect(&live_kit_url, &user2_token).await.unwrap(); + cx.spawn_on_main(|cx| async move { + let user_a_token = token::create( + &live_kit_key, + &live_kit_secret, + Some("test-participant-1"), + VideoGrant::to_join("test-room"), + ) + .unwrap(); + let room_a = Room::new(); + room_a.connect(&live_kit_url, &user_a_token).await.unwrap(); - // let mut audio_track_updates = room_b.remote_audio_track_updates(); - // let audio_track = LocalAudioTrack::create(); - // let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); + let user2_token = token::create( + &live_kit_key, + &live_kit_secret, + Some("test-participant-2"), + VideoGrant::to_join("test-room"), + ) + .unwrap(); + let room_b = Room::new(); + room_b.connect(&live_kit_url, &user2_token).await.unwrap(); - // if let RemoteAudioTrackUpdate::Subscribed(track, _) = - // audio_track_updates.next().await.unwrap() - // { - // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); - // assert_eq!(remote_tracks.len(), 1); - // assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); - // assert_eq!(track.publisher_id(), "test-participant-1"); - // } else { - // panic!("unexpected message"); - // } + let mut audio_track_updates = room_b.remote_audio_track_updates(); + let audio_track = LocalAudioTrack::create(); + let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); - // audio_track_publication.set_mute(true).await.unwrap(); + if let RemoteAudioTrackUpdate::Subscribed(track, _) = + audio_track_updates.next().await.unwrap() + { + let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + assert_eq!(remote_tracks.len(), 1); + assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); + assert_eq!(track.publisher_id(), "test-participant-1"); + } else { + panic!("unexpected message"); + } - // println!("waiting for mute changed!"); - // if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - // audio_track_updates.next().await.unwrap() - // { - // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); - // assert_eq!(remote_tracks[0].sid(), track_id); - // assert_eq!(muted, true); - // } else { - // panic!("unexpected message"); - // } + audio_track_publication.set_mute(true).await.unwrap(); - // audio_track_publication.set_mute(false).await.unwrap(); + println!("waiting for mute changed!"); + if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = + audio_track_updates.next().await.unwrap() + { + let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + assert_eq!(remote_tracks[0].sid(), track_id); + assert_eq!(muted, true); + } else { + panic!("unexpected message"); + } - // if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - // audio_track_updates.next().await.unwrap() - // { - // let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); - // assert_eq!(remote_tracks[0].sid(), track_id); - // assert_eq!(muted, false); - // } else { - // panic!("unexpected message"); - // } + audio_track_publication.set_mute(false).await.unwrap(); - // println!("Pausing for 5 seconds to test audio, make some noise!"); - // let timer = cx.background().timer(Duration::from_secs(5)); - // timer.await; - // let remote_audio_track = room_b - // .remote_audio_tracks("test-participant-1") - // .pop() - // .unwrap(); - // room_a.unpublish_track(audio_track_publication); + if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = + audio_track_updates.next().await.unwrap() + { + let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); + assert_eq!(remote_tracks[0].sid(), track_id); + assert_eq!(muted, false); + } else { + panic!("unexpected message"); + } - // // Clear out any active speakers changed messages - // let mut next = audio_track_updates.next().await.unwrap(); - // while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { - // println!("Speakers changed: {:?}", speakers); - // next = audio_track_updates.next().await.unwrap(); - // } + println!("Pausing for 5 seconds to test audio, make some noise!"); + let timer = cx.executor().timer(Duration::from_secs(5)); + timer.await; + let remote_audio_track = room_b + .remote_audio_tracks("test-participant-1") + .pop() + .unwrap(); + room_a.unpublish_track(audio_track_publication); - // if let RemoteAudioTrackUpdate::Unsubscribed { - // publisher_id, - // track_id, - // } = next - // { - // assert_eq!(publisher_id, "test-participant-1"); - // assert_eq!(remote_audio_track.sid(), track_id); - // assert_eq!(room_b.remote_audio_tracks("test-participant-1").len(), 0); - // } else { - // panic!("unexpected message"); - // } + // Clear out any active speakers changed messages + let mut next = audio_track_updates.next().await.unwrap(); + while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { + println!("Speakers changed: {:?}", speakers); + next = audio_track_updates.next().await.unwrap(); + } - // let mut video_track_updates = room_b.remote_video_track_updates(); - // let displays = room_a.display_sources().await.unwrap(); - // let display = displays.into_iter().next().unwrap(); + if let RemoteAudioTrackUpdate::Unsubscribed { + publisher_id, + track_id, + } = next + { + assert_eq!(publisher_id, "test-participant-1"); + assert_eq!(remote_audio_track.sid(), track_id); + assert_eq!(room_b.remote_audio_tracks("test-participant-1").len(), 0); + } else { + panic!("unexpected message"); + } - // let local_video_track = LocalVideoTrack::screen_share_for_display(&display); - // let local_video_track_publication = - // room_a.publish_video_track(local_video_track).await.unwrap(); + let mut video_track_updates = room_b.remote_video_track_updates(); + let displays = room_a.display_sources().await.unwrap(); + let display = displays.into_iter().next().unwrap(); - // if let RemoteVideoTrackUpdate::Subscribed(track) = - // video_track_updates.next().await.unwrap() - // { - // let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); - // assert_eq!(remote_video_tracks.len(), 1); - // assert_eq!(remote_video_tracks[0].publisher_id(), "test-participant-1"); - // assert_eq!(track.publisher_id(), "test-participant-1"); - // } else { - // panic!("unexpected message"); - // } + let local_video_track = LocalVideoTrack::screen_share_for_display(&display); + let local_video_track_publication = + room_a.publish_video_track(local_video_track).await.unwrap(); - // let remote_video_track = room_b - // .remote_video_tracks("test-participant-1") - // .pop() - // .unwrap(); - // room_a.unpublish_track(local_video_track_publication); - // if let RemoteVideoTrackUpdate::Unsubscribed { - // publisher_id, - // track_id, - // } = video_track_updates.next().await.unwrap() - // { - // assert_eq!(publisher_id, "test-participant-1"); - // assert_eq!(remote_video_track.sid(), track_id); - // assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0); - // } else { - // panic!("unexpected message"); - // } + if let RemoteVideoTrackUpdate::Subscribed(track) = + video_track_updates.next().await.unwrap() + { + let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); + assert_eq!(remote_video_tracks.len(), 1); + assert_eq!(remote_video_tracks[0].publisher_id(), "test-participant-1"); + assert_eq!(track.publisher_id(), "test-participant-1"); + } else { + panic!("unexpected message"); + } - // cx.platform().quit(); - // }) - // .detach(); - // }); + let remote_video_track = room_b + .remote_video_tracks("test-participant-1") + .pop() + .unwrap(); + room_a.unpublish_track(local_video_track_publication); + if let RemoteVideoTrackUpdate::Unsubscribed { + publisher_id, + track_id, + } = video_track_updates.next().await.unwrap() + { + assert_eq!(publisher_id, "test-participant-1"); + assert_eq!(remote_video_track.sid(), track_id); + assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0); + } else { + panic!("unexpected message"); + } + + cx.update(|cx| cx.quit()).ok(); + }) + .detach(); + }); } -// fn quit(_: &Quit, cx: &mut gpui2::AppContext) { -// cx.platform().quit(); -// } +fn quit(_: &Quit, cx: &mut gpui2::AppContext) { + cx.quit(); +} diff --git a/crates/live_kit_client2/src/live_kit_client2.rs b/crates/live_kit_client2/src/live_kit_client2.rs index 35682382e9..47cc3873ff 100644 --- a/crates/live_kit_client2/src/live_kit_client2.rs +++ b/crates/live_kit_client2/src/live_kit_client2.rs @@ -1,11 +1,11 @@ -// #[cfg(not(any(test, feature = "test-support")))] +#[cfg(not(any(test, feature = "test-support")))] pub mod prod; -// #[cfg(not(any(test, feature = "test-support")))] +#[cfg(not(any(test, feature = "test-support")))] pub use prod::*; -// #[cfg(any(test, feature = "test-support"))] -// pub mod test; +#[cfg(any(test, feature = "test-support"))] +pub mod test; -// #[cfg(any(test, feature = "test-support"))] -// pub use test::*; +#[cfg(any(test, feature = "test-support"))] +pub use test::*; diff --git a/crates/live_kit_client2/src/test.rs b/crates/live_kit_client2/src/test.rs index 7185c11fa8..535ab20afb 100644 --- a/crates/live_kit_client2/src/test.rs +++ b/crates/live_kit_client2/src/test.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Result, Context}; use async_trait::async_trait; use collections::{BTreeMap, HashMap}; use futures::Stream; @@ -364,7 +364,7 @@ impl Room { let token = token.to_string(); async move { let server = TestServer::get(&url)?; - server.join_room(token.clone(), this.clone()).await?; + server.join_room(token.clone(), this.clone()).await.context("room join")?; *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token }; Ok(()) } From 1568ecbe1ee6c64a54597f3d02a31dc591f05624 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Wed, 1 Nov 2023 09:29:54 -0700 Subject: [PATCH 3/3] Add back room code to call2 --- crates/call2/src/participant.rs | 29 +- crates/call2/src/room.rs | 692 ++++++++++++++-------------- crates/live_kit_client2/src/prod.rs | 4 + crates/live_kit_client2/src/test.rs | 8 +- 4 files changed, 350 insertions(+), 383 deletions(-) diff --git a/crates/call2/src/participant.rs b/crates/call2/src/participant.rs index a1837e3ad0..9fe212e776 100644 --- a/crates/call2/src/participant.rs +++ b/crates/call2/src/participant.rs @@ -1,10 +1,12 @@ use anyhow::{anyhow, Result}; use client2::ParticipantIndex; use client2::{proto, User}; +use collections::HashMap; use gpui2::WeakModel; pub use live_kit_client2::Frame; +use live_kit_client2::{RemoteAudioTrack, RemoteVideoTrack}; use project2::Project; -use std::{fmt, sync::Arc}; +use std::sync::Arc; #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum ParticipantLocation { @@ -45,27 +47,6 @@ pub struct RemoteParticipant { pub participant_index: ParticipantIndex, pub muted: bool, pub speaking: bool, - // pub video_tracks: HashMap>, - // pub audio_tracks: HashMap>, -} - -#[derive(Clone)] -pub struct RemoteVideoTrack { - pub(crate) live_kit_track: Arc, -} - -unsafe impl Send for RemoteVideoTrack {} -// todo!("remove this sync because it's not legit") -unsafe impl Sync for RemoteVideoTrack {} - -impl fmt::Debug for RemoteVideoTrack { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RemoteVideoTrack").finish() - } -} - -impl RemoteVideoTrack { - pub fn frames(&self) -> async_broadcast::Receiver { - self.live_kit_track.frames() - } + pub video_tracks: HashMap>, + pub audio_tracks: HashMap>, } diff --git a/crates/call2/src/room.rs b/crates/call2/src/room.rs index f0e0b8de17..cf98db015b 100644 --- a/crates/call2/src/room.rs +++ b/crates/call2/src/room.rs @@ -1,9 +1,6 @@ -#![allow(dead_code, unused)] -// todo!() - use crate::{ call_settings::CallSettings, - participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack}, + participant::{LocalParticipant, ParticipantLocation, RemoteParticipant}, IncomingCall, }; use anyhow::{anyhow, Result}; @@ -19,12 +16,15 @@ use gpui2::{ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel, }; use language2::LanguageRegistry; -use live_kit_client2::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate}; +use live_kit_client2::{ + LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate, + RemoteVideoTrackUpdate, +}; use postage::{sink::Sink, stream::Stream, watch}; use project2::Project; use settings2::Settings; -use std::{future::Future, sync::Arc, time::Duration}; -use util::{ResultExt, TryFutureExt}; +use std::{future::Future, mem, sync::Arc, time::Duration}; +use util::{post_inc, ResultExt, TryFutureExt}; pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); @@ -95,15 +95,14 @@ impl Room { #[cfg(any(test, feature = "test-support"))] pub fn is_connected(&self) -> bool { - false - // if let Some(live_kit) = self.live_kit.as_ref() { - // matches!( - // *live_kit.room.status().borrow(), - // live_kit_client::ConnectionState::Connected { .. } - // ) - // } else { - // false - // } + if let Some(live_kit) = self.live_kit.as_ref() { + matches!( + *live_kit.room.status().borrow(), + live_kit_client2::ConnectionState::Connected { .. } + ) + } else { + false + } } fn new( @@ -423,7 +422,7 @@ impl Room { self.pending_participants.clear(); self.participant_user_ids.clear(); self.client_subscriptions.clear(); - // self.live_kit.take(); + self.live_kit.take(); self.pending_room_update.take(); self.maintain_connection.take(); } @@ -799,43 +798,43 @@ impl Room { location, muted: true, speaking: false, - // video_tracks: Default::default(), - // audio_tracks: Default::default(), + video_tracks: Default::default(), + audio_tracks: Default::default(), }, ); Audio::play_sound(Sound::Joined, cx); - // if let Some(live_kit) = this.live_kit.as_ref() { - // let video_tracks = - // live_kit.room.remote_video_tracks(&user.id.to_string()); - // let audio_tracks = - // live_kit.room.remote_audio_tracks(&user.id.to_string()); - // let publications = live_kit - // .room - // .remote_audio_track_publications(&user.id.to_string()); + if let Some(live_kit) = this.live_kit.as_ref() { + let video_tracks = + live_kit.room.remote_video_tracks(&user.id.to_string()); + let audio_tracks = + live_kit.room.remote_audio_tracks(&user.id.to_string()); + let publications = live_kit + .room + .remote_audio_track_publications(&user.id.to_string()); - // for track in video_tracks { - // this.remote_video_track_updated( - // RemoteVideoTrackUpdate::Subscribed(track), - // cx, - // ) - // .log_err(); - // } + for track in video_tracks { + this.remote_video_track_updated( + RemoteVideoTrackUpdate::Subscribed(track), + cx, + ) + .log_err(); + } - // for (track, publication) in - // audio_tracks.iter().zip(publications.iter()) - // { - // this.remote_audio_track_updated( - // RemoteAudioTrackUpdate::Subscribed( - // track.clone(), - // publication.clone(), - // ), - // cx, - // ) - // .log_err(); - // } - // } + for (track, publication) in + audio_tracks.iter().zip(publications.iter()) + { + this.remote_audio_track_updated( + RemoteAudioTrackUpdate::Subscribed( + track.clone(), + publication.clone(), + ), + cx, + ) + .log_err(); + } + } } } @@ -923,7 +922,6 @@ impl Room { change: RemoteVideoTrackUpdate, cx: &mut ModelContext, ) -> Result<()> { - todo!(); match change { RemoteVideoTrackUpdate::Subscribed(track) => { let user_id = track.publisher_id().parse()?; @@ -932,12 +930,7 @@ impl Room { .remote_participants .get_mut(&user_id) .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; - // participant.video_tracks.insert( - // track_id.clone(), - // Arc::new(RemoteVideoTrack { - // live_kit_track: track, - // }), - // ); + participant.video_tracks.insert(track_id.clone(), track); cx.emit(Event::RemoteVideoTracksChanged { participant_id: participant.peer_id, }); @@ -951,7 +944,7 @@ impl Room { .remote_participants .get_mut(&user_id) .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; - // participant.video_tracks.remove(&track_id); + participant.video_tracks.remove(&track_id); cx.emit(Event::RemoteVideoTracksChanged { participant_id: participant.peer_id, }); @@ -981,65 +974,61 @@ impl Room { participant.speaking = false; } } - // todo!() - // if let Some(id) = self.client.user_id() { - // if let Some(room) = &mut self.live_kit { - // if let Ok(_) = speaker_ids.binary_search(&id) { - // room.speaking = true; - // } else { - // room.speaking = false; - // } - // } - // } + if let Some(id) = self.client.user_id() { + if let Some(room) = &mut self.live_kit { + if let Ok(_) = speaker_ids.binary_search(&id) { + room.speaking = true; + } else { + room.speaking = false; + } + } + } cx.notify(); } RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => { - // todo!() - // let mut found = false; - // for participant in &mut self.remote_participants.values_mut() { - // for track in participant.audio_tracks.values() { - // if track.sid() == track_id { - // found = true; - // break; - // } - // } - // if found { - // participant.muted = muted; - // break; - // } - // } + let mut found = false; + for participant in &mut self.remote_participants.values_mut() { + for track in participant.audio_tracks.values() { + if track.sid() == track_id { + found = true; + break; + } + } + if found { + participant.muted = muted; + break; + } + } cx.notify(); } RemoteAudioTrackUpdate::Subscribed(track, publication) => { - // todo!() - // let user_id = track.publisher_id().parse()?; - // let track_id = track.sid().to_string(); - // let participant = self - // .remote_participants - // .get_mut(&user_id) - // .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; - // // participant.audio_tracks.insert(track_id.clone(), track); - // participant.muted = publication.is_muted(); + let user_id = track.publisher_id().parse()?; + let track_id = track.sid().to_string(); + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; + participant.audio_tracks.insert(track_id.clone(), track); + participant.muted = publication.is_muted(); - // cx.emit(Event::RemoteAudioTracksChanged { - // participant_id: participant.peer_id, - // }); + cx.emit(Event::RemoteAudioTracksChanged { + participant_id: participant.peer_id, + }); } RemoteAudioTrackUpdate::Unsubscribed { publisher_id, track_id, } => { - // todo!() - // let user_id = publisher_id.parse()?; - // let participant = self - // .remote_participants - // .get_mut(&user_id) - // .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; - // participant.audio_tracks.remove(&track_id); - // cx.emit(Event::RemoteAudioTracksChanged { - // participant_id: participant.peer_id, - // }); + let user_id = publisher_id.parse()?; + let participant = self + .remote_participants + .get_mut(&user_id) + .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; + participant.audio_tracks.remove(&track_id); + cx.emit(Event::RemoteAudioTracksChanged { + participant_id: participant.peer_id, + }); } } @@ -1220,278 +1209,269 @@ impl Room { } pub fn is_screen_sharing(&self) -> bool { - todo!() - // self.live_kit.as_ref().map_or(false, |live_kit| { - // !matches!(live_kit.screen_track, LocalTrack::None) - // }) + self.live_kit.as_ref().map_or(false, |live_kit| { + !matches!(live_kit.screen_track, LocalTrack::None) + }) } pub fn is_sharing_mic(&self) -> bool { - todo!() - // self.live_kit.as_ref().map_or(false, |live_kit| { - // !matches!(live_kit.microphone_track, LocalTrack::None) - // }) + self.live_kit.as_ref().map_or(false, |live_kit| { + !matches!(live_kit.microphone_track, LocalTrack::None) + }) } pub fn is_muted(&self, cx: &AppContext) -> bool { - todo!() - // self.live_kit - // .as_ref() - // .and_then(|live_kit| match &live_kit.microphone_track { - // LocalTrack::None => Some(Self::mute_on_join(cx)), - // LocalTrack::Pending { muted, .. } => Some(*muted), - // LocalTrack::Published { muted, .. } => Some(*muted), - // }) - // .unwrap_or(false) + self.live_kit + .as_ref() + .and_then(|live_kit| match &live_kit.microphone_track { + LocalTrack::None => Some(Self::mute_on_join(cx)), + LocalTrack::Pending { muted, .. } => Some(*muted), + LocalTrack::Published { muted, .. } => Some(*muted), + }) + .unwrap_or(false) } pub fn is_speaking(&self) -> bool { - todo!() - // self.live_kit - // .as_ref() - // .map_or(false, |live_kit| live_kit.speaking) + self.live_kit + .as_ref() + .map_or(false, |live_kit| live_kit.speaking) } pub fn is_deafened(&self) -> Option { - // self.live_kit.as_ref().map(|live_kit| live_kit.deafened) - todo!() + self.live_kit.as_ref().map(|live_kit| live_kit.deafened) } #[track_caller] pub fn share_microphone(&mut self, cx: &mut ModelContext) -> Task> { - todo!() - // if self.status.is_offline() { - // return Task::ready(Err(anyhow!("room is offline"))); - // } else if self.is_sharing_mic() { - // return Task::ready(Err(anyhow!("microphone was already shared"))); - // } + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } else if self.is_sharing_mic() { + return Task::ready(Err(anyhow!("microphone was already shared"))); + } - // let publish_id = if let Some(live_kit) = self.live_kit.as_mut() { - // let publish_id = post_inc(&mut live_kit.next_publish_id); - // live_kit.microphone_track = LocalTrack::Pending { - // publish_id, - // muted: false, - // }; - // cx.notify(); - // publish_id - // } else { - // return Task::ready(Err(anyhow!("live-kit was not initialized"))); - // }; + let publish_id = if let Some(live_kit) = self.live_kit.as_mut() { + let publish_id = post_inc(&mut live_kit.next_publish_id); + live_kit.microphone_track = LocalTrack::Pending { + publish_id, + muted: false, + }; + cx.notify(); + publish_id + } else { + return Task::ready(Err(anyhow!("live-kit was not initialized"))); + }; - // cx.spawn(move |this, mut cx| async move { - // let publish_track = async { - // let track = LocalAudioTrack::create(); - // this.upgrade() - // .ok_or_else(|| anyhow!("room was dropped"))? - // .update(&mut cx, |this, _| { - // this.live_kit - // .as_ref() - // .map(|live_kit| live_kit.room.publish_audio_track(track)) - // })? - // .ok_or_else(|| anyhow!("live-kit was not initialized"))? - // .await - // }; + cx.spawn(move |this, mut cx| async move { + let publish_track = async { + let track = LocalAudioTrack::create(); + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, _| { + this.live_kit + .as_ref() + .map(|live_kit| live_kit.room.publish_audio_track(track)) + })? + .ok_or_else(|| anyhow!("live-kit was not initialized"))? + .await + }; - // let publication = publish_track.await; - // this.upgrade() - // .ok_or_else(|| anyhow!("room was dropped"))? - // .update(&mut cx, |this, cx| { - // let live_kit = this - // .live_kit - // .as_mut() - // .ok_or_else(|| anyhow!("live-kit was not initialized"))?; + let publication = publish_track.await; + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + let live_kit = this + .live_kit + .as_mut() + .ok_or_else(|| anyhow!("live-kit was not initialized"))?; - // let (canceled, muted) = if let LocalTrack::Pending { - // publish_id: cur_publish_id, - // muted, - // } = &live_kit.microphone_track - // { - // (*cur_publish_id != publish_id, *muted) - // } else { - // (true, false) - // }; + let (canceled, muted) = if let LocalTrack::Pending { + publish_id: cur_publish_id, + muted, + } = &live_kit.microphone_track + { + (*cur_publish_id != publish_id, *muted) + } else { + (true, false) + }; - // match publication { - // Ok(publication) => { - // if canceled { - // live_kit.room.unpublish_track(publication); - // } else { - // if muted { - // cx.executor().spawn(publication.set_mute(muted)).detach(); - // } - // live_kit.microphone_track = LocalTrack::Published { - // track_publication: publication, - // muted, - // }; - // cx.notify(); - // } - // Ok(()) - // } - // Err(error) => { - // if canceled { - // Ok(()) - // } else { - // live_kit.microphone_track = LocalTrack::None; - // cx.notify(); - // Err(error) - // } - // } - // } - // })? - // }) + match publication { + Ok(publication) => { + if canceled { + live_kit.room.unpublish_track(publication); + } else { + if muted { + cx.executor().spawn(publication.set_mute(muted)).detach(); + } + live_kit.microphone_track = LocalTrack::Published { + track_publication: publication, + muted, + }; + cx.notify(); + } + Ok(()) + } + Err(error) => { + if canceled { + Ok(()) + } else { + live_kit.microphone_track = LocalTrack::None; + cx.notify(); + Err(error) + } + } + } + })? + }) } pub fn share_screen(&mut self, cx: &mut ModelContext) -> Task> { - todo!() - // if self.status.is_offline() { - // return Task::ready(Err(anyhow!("room is offline"))); - // } else if self.is_screen_sharing() { - // return Task::ready(Err(anyhow!("screen was already shared"))); - // } + if self.status.is_offline() { + return Task::ready(Err(anyhow!("room is offline"))); + } else if self.is_screen_sharing() { + return Task::ready(Err(anyhow!("screen was already shared"))); + } - // let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() { - // let publish_id = post_inc(&mut live_kit.next_publish_id); - // live_kit.screen_track = LocalTrack::Pending { - // publish_id, - // muted: false, - // }; - // cx.notify(); - // (live_kit.room.display_sources(), publish_id) - // } else { - // return Task::ready(Err(anyhow!("live-kit was not initialized"))); - // }; + let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() { + let publish_id = post_inc(&mut live_kit.next_publish_id); + live_kit.screen_track = LocalTrack::Pending { + publish_id, + muted: false, + }; + cx.notify(); + (live_kit.room.display_sources(), publish_id) + } else { + return Task::ready(Err(anyhow!("live-kit was not initialized"))); + }; - // cx.spawn(move |this, mut cx| async move { - // let publish_track = async { - // let displays = displays.await?; - // let display = displays - // .first() - // .ok_or_else(|| anyhow!("no display found"))?; - // let track = LocalVideoTrack::screen_share_for_display(&display); - // this.upgrade() - // .ok_or_else(|| anyhow!("room was dropped"))? - // .update(&mut cx, |this, _| { - // this.live_kit - // .as_ref() - // .map(|live_kit| live_kit.room.publish_video_track(track)) - // })? - // .ok_or_else(|| anyhow!("live-kit was not initialized"))? - // .await - // }; + cx.spawn_on_main(move |this, mut cx| async move { + let publish_track = async { + let displays = displays.await?; + let display = displays + .first() + .ok_or_else(|| anyhow!("no display found"))?; + let track = LocalVideoTrack::screen_share_for_display(&display); + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, _| { + this.live_kit + .as_ref() + .map(|live_kit| live_kit.room.publish_video_track(track)) + })? + .ok_or_else(|| anyhow!("live-kit was not initialized"))? + .await + }; - // let publication = publish_track.await; - // this.upgrade() - // .ok_or_else(|| anyhow!("room was dropped"))? - // .update(&mut cx, |this, cx| { - // let live_kit = this - // .live_kit - // .as_mut() - // .ok_or_else(|| anyhow!("live-kit was not initialized"))?; + let publication = publish_track.await; + this.upgrade() + .ok_or_else(|| anyhow!("room was dropped"))? + .update(&mut cx, |this, cx| { + let live_kit = this + .live_kit + .as_mut() + .ok_or_else(|| anyhow!("live-kit was not initialized"))?; - // let (canceled, muted) = if let LocalTrack::Pending { - // publish_id: cur_publish_id, - // muted, - // } = &live_kit.screen_track - // { - // (*cur_publish_id != publish_id, *muted) - // } else { - // (true, false) - // }; + let (canceled, muted) = if let LocalTrack::Pending { + publish_id: cur_publish_id, + muted, + } = &live_kit.screen_track + { + (*cur_publish_id != publish_id, *muted) + } else { + (true, false) + }; - // match publication { - // Ok(publication) => { - // if canceled { - // live_kit.room.unpublish_track(publication); - // } else { - // if muted { - // cx.executor().spawn(publication.set_mute(muted)).detach(); - // } - // live_kit.screen_track = LocalTrack::Published { - // track_publication: publication, - // muted, - // }; - // cx.notify(); - // } + match publication { + Ok(publication) => { + if canceled { + live_kit.room.unpublish_track(publication); + } else { + if muted { + cx.executor().spawn(publication.set_mute(muted)).detach(); + } + live_kit.screen_track = LocalTrack::Published { + track_publication: publication, + muted, + }; + cx.notify(); + } - // Audio::play_sound(Sound::StartScreenshare, cx); + Audio::play_sound(Sound::StartScreenshare, cx); - // Ok(()) - // } - // Err(error) => { - // if canceled { - // Ok(()) - // } else { - // live_kit.screen_track = LocalTrack::None; - // cx.notify(); - // Err(error) - // } - // } - // } - // })? - // }) + Ok(()) + } + Err(error) => { + if canceled { + Ok(()) + } else { + live_kit.screen_track = LocalTrack::None; + cx.notify(); + Err(error) + } + } + } + })? + }) } pub fn toggle_mute(&mut self, cx: &mut ModelContext) -> Result>> { - todo!() - // let should_mute = !self.is_muted(cx); - // if let Some(live_kit) = self.live_kit.as_mut() { - // if matches!(live_kit.microphone_track, LocalTrack::None) { - // return Ok(self.share_microphone(cx)); - // } + let should_mute = !self.is_muted(cx); + if let Some(live_kit) = self.live_kit.as_mut() { + if matches!(live_kit.microphone_track, LocalTrack::None) { + return Ok(self.share_microphone(cx)); + } - // let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?; - // live_kit.muted_by_user = should_mute; + let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?; + live_kit.muted_by_user = should_mute; - // if old_muted == true && live_kit.deafened == true { - // if let Some(task) = self.toggle_deafen(cx).ok() { - // task.detach(); - // } - // } + if old_muted == true && live_kit.deafened == true { + if let Some(task) = self.toggle_deafen(cx).ok() { + task.detach(); + } + } - // Ok(ret_task) - // } else { - // Err(anyhow!("LiveKit not started")) - // } + Ok(ret_task) + } else { + Err(anyhow!("LiveKit not started")) + } } pub fn toggle_deafen(&mut self, cx: &mut ModelContext) -> Result>> { - todo!() - // if let Some(live_kit) = self.live_kit.as_mut() { - // (*live_kit).deafened = !live_kit.deafened; + if let Some(live_kit) = self.live_kit.as_mut() { + (*live_kit).deafened = !live_kit.deafened; - // let mut tasks = Vec::with_capacity(self.remote_participants.len()); - // // Context notification is sent within set_mute itself. - // let mut mute_task = None; - // // When deafening, mute user's mic as well. - // // When undeafening, unmute user's mic unless it was manually muted prior to deafening. - // if live_kit.deafened || !live_kit.muted_by_user { - // mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0); - // }; - // for participant in self.remote_participants.values() { - // for track in live_kit - // .room - // .remote_audio_track_publications(&participant.user.id.to_string()) - // { - // let deafened = live_kit.deafened; - // tasks.push( - // cx.executor() - // .spawn_on_main(move || track.set_enabled(!deafened)), - // ); - // } - // } + let mut tasks = Vec::with_capacity(self.remote_participants.len()); + // Context notification is sent within set_mute itself. + let mut mute_task = None; + // When deafening, mute user's mic as well. + // When undeafening, unmute user's mic unless it was manually muted prior to deafening. + if live_kit.deafened || !live_kit.muted_by_user { + mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0); + }; + for participant in self.remote_participants.values() { + for track in live_kit + .room + .remote_audio_track_publications(&participant.user.id.to_string()) + { + let deafened = live_kit.deafened; + tasks.push( + cx.executor() + .spawn_on_main(move || track.set_enabled(!deafened)), + ); + } + } - // Ok(cx.executor().spawn_on_main(|| async { - // if let Some(mute_task) = mute_task { - // mute_task.await?; - // } - // for task in tasks { - // task.await?; - // } - // Ok(()) - // })) - // } else { - // Err(anyhow!("LiveKit not started")) - // } + Ok(cx.executor().spawn_on_main(|| async { + if let Some(mute_task) = mute_task { + mute_task.await?; + } + for task in tasks { + task.await?; + } + Ok(()) + })) + } else { + Err(anyhow!("LiveKit not started")) + } } pub fn unshare_screen(&mut self, cx: &mut ModelContext) -> Result<()> { @@ -1499,37 +1479,35 @@ impl Room { return Err(anyhow!("room is offline")); } - todo!() - // let live_kit = self - // .live_kit - // .as_mut() - // .ok_or_else(|| anyhow!("live-kit was not initialized"))?; - // match mem::take(&mut live_kit.screen_track) { - // LocalTrack::None => Err(anyhow!("screen was not shared")), - // LocalTrack::Pending { .. } => { - // cx.notify(); - // Ok(()) - // } - // LocalTrack::Published { - // track_publication, .. - // } => { - // live_kit.room.unpublish_track(track_publication); - // cx.notify(); + let live_kit = self + .live_kit + .as_mut() + .ok_or_else(|| anyhow!("live-kit was not initialized"))?; + match mem::take(&mut live_kit.screen_track) { + LocalTrack::None => Err(anyhow!("screen was not shared")), + LocalTrack::Pending { .. } => { + cx.notify(); + Ok(()) + } + LocalTrack::Published { + track_publication, .. + } => { + live_kit.room.unpublish_track(track_publication); + cx.notify(); - // Audio::play_sound(Sound::StopScreenshare, cx); - // Ok(()) - // } - // } + Audio::play_sound(Sound::StopScreenshare, cx); + Ok(()) + } + } } #[cfg(any(test, feature = "test-support"))] pub fn set_display_sources(&self, sources: Vec) { - todo!() - // self.live_kit - // .as_ref() - // .unwrap() - // .room - // .set_display_sources(sources); + self.live_kit + .as_ref() + .unwrap() + .room + .set_display_sources(sources); } } diff --git a/crates/live_kit_client2/src/prod.rs b/crates/live_kit_client2/src/prod.rs index 65ed8b754f..b2b83e95fc 100644 --- a/crates/live_kit_client2/src/prod.rs +++ b/crates/live_kit_client2/src/prod.rs @@ -499,6 +499,10 @@ impl Room { rx, ) } + + pub fn set_display_sources(&self, _: Vec) { + unreachable!("This is a test-only function") + } } impl Drop for Room { diff --git a/crates/live_kit_client2/src/test.rs b/crates/live_kit_client2/src/test.rs index 535ab20afb..f1c3d39b8e 100644 --- a/crates/live_kit_client2/src/test.rs +++ b/crates/live_kit_client2/src/test.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Result, Context}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use collections::{BTreeMap, HashMap}; use futures::Stream; @@ -364,7 +364,10 @@ impl Room { let token = token.to_string(); async move { let server = TestServer::get(&url)?; - server.join_room(token.clone(), this.clone()).await.context("room join")?; + server + .join_room(token.clone(), this.clone()) + .await + .context("room join")?; *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token }; Ok(()) } @@ -547,6 +550,7 @@ impl LocalAudioTrack { } } +#[derive(Debug)] pub struct RemoteVideoTrack { sid: Sid, publisher_id: Sid,