diff --git a/Cargo.lock b/Cargo.lock index c896fdf7ce..4814befcb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3166,13 +3166,27 @@ name = "live_kit_client" version = "0.1.0" dependencies = [ "anyhow", + "block", + "byteorder", + "bytes 1.2.1", + "cocoa", "core-foundation", "core-graphics", + "foreign-types", "futures 0.3.24", + "gpui", + "hmac 0.12.1", + "jwt", + "live_kit_server", + "log", "media", + "objc", "parking_lot 0.11.2", + "postage", "serde", "serde_json", + "sha2 0.10.6", + "simplelog", ] [[package]] diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index dd1e4598db..4434aa77e0 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -7,7 +7,7 @@ use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; use collections::{BTreeMap, HashSet}; use futures::StreamExt; use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task}; -use live_kit_client::{LocalVideoTrack, RemoteVideoTrackChange}; +use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate}; use postage::watch; use project::Project; use std::sync::Arc; @@ -75,9 +75,9 @@ impl Room { let live_kit_room = if let Some(connection_info) = live_kit_connection_info { let room = live_kit_client::Room::new(); - let mut tracks = room.remote_video_tracks(); + let mut track_changes = room.remote_video_track_updates(); let maintain_room = cx.spawn_weak(|this, mut cx| async move { - while let Some(track_change) = tracks.next().await { + while let Some(track_change) = track_changes.next().await { let this = if let Some(this) = this.upgrade(&cx) { this } else { @@ -85,7 +85,7 @@ impl Room { }; this.update(&mut cx, |this, cx| { - this.remote_video_track_changed(track_change, cx).log_err() + this.remote_video_track_updated(track_change, cx).log_err() }); } }); @@ -330,13 +330,16 @@ impl Room { ); if let Some((room, _)) = this.live_kit_room.as_ref() { - for track in - room.video_tracks_for_remote_participant(peer_id.0.to_string()) - { - this.remote_video_track_changed( - RemoteVideoTrackChange::Subscribed(track), + println!("getting video tracks for peer id {}", peer_id.0.to_string()); + let tracks = room.remote_video_tracks(&peer_id.0.to_string()); + dbg!(tracks.len()); + for track in tracks { + dbg!(track.id(), track.publisher_id()); + this.remote_video_track_updated( + RemoteVideoTrackUpdate::Subscribed(track), cx, - ); + ) + .log_err(); } } } @@ -376,13 +379,14 @@ impl Room { Ok(()) } - fn remote_video_track_changed( + fn remote_video_track_updated( &mut self, - change: RemoteVideoTrackChange, + change: RemoteVideoTrackUpdate, cx: &mut ModelContext, ) -> Result<()> { match change { - RemoteVideoTrackChange::Subscribed(track) => { + RemoteVideoTrackUpdate::Subscribed(track) => { + dbg!(track.publisher_id(), track.id()); let peer_id = PeerId(track.publisher_id().parse()?); let track_id = track.id().to_string(); let participant = self @@ -427,7 +431,7 @@ impl Room { }, ); } - RemoteVideoTrackChange::Unsubscribed { + RemoteVideoTrackUpdate::Unsubscribed { publisher_id, track_id, } => { @@ -596,11 +600,11 @@ impl Room { }; cx.foreground().spawn(async move { - let displays = live_kit_client::display_sources().await?; - let display = displays - .first() - .ok_or_else(|| anyhow!("no display found"))?; - let track = LocalVideoTrack::screen_share_for_display(display); + let display = live_kit_client::display_source().await?; + // let display = displays + // .first() + // .ok_or_else(|| anyhow!("no display found"))?; + let track = LocalVideoTrack::screen_share_for_display(&display); room.publish_video_track(&track).await?; Ok(()) }) diff --git a/crates/live_kit_client/.cargo/config.toml b/crates/live_kit_client/.cargo/config.toml new file mode 100644 index 0000000000..b33fe211bd --- /dev/null +++ b/crates/live_kit_client/.cargo/config.toml @@ -0,0 +1,2 @@ +[live_kit_client_test] +rustflags = ["-C", "link-args=-ObjC"] diff --git a/crates/live_kit_client/Cargo.toml b/crates/live_kit_client/Cargo.toml index 1344ecbcdb..af7ddb67c0 100644 --- a/crates/live_kit_client/Cargo.toml +++ b/crates/live_kit_client/Cargo.toml @@ -8,6 +8,9 @@ description = "Bindings to LiveKit Swift client SDK" path = "src/live_kit_client.rs" doctest = false +[[example]] +name = "test_app" + [dependencies] media = { path = "../media" } @@ -17,6 +20,30 @@ core-graphics = "0.22.3" futures = "0.3" parking_lot = "0.11.1" +[dev-dependencies] +gpui = { path = "../gpui" } +live_kit_server = { path = "../live_kit_server" } +media = { path = "../media" } + +anyhow = "1.0.38" +block = "0.1" +bytes = "1.2" +byteorder = "1.4" +cocoa = "0.24" +core-foundation = "0.9.3" +core-graphics = "0.22.3" +foreign-types = "0.3" +futures = "0.3" +hmac = "0.12" +jwt = "0.16" +log = { version = "0.4.16", features = ["kv_unstable_serde"] } +objc = "0.2" +parking_lot = "0.11.1" +postage = { version = "0.4.1", features = ["futures-traits"] } +serde = { version = "1.0", features = ["derive", "rc"] } +sha2 = "0.10" +simplelog = "0.9" + [build-dependencies] serde = { version = "1.0", features = ["derive", "rc"] } serde_json = { version = "1.0", features = ["preserve_order"] } diff --git a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift index b39f359737..4401b06b30 100644 --- a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift +++ b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -15,7 +15,7 @@ class LKRoomDelegate: RoomDelegate { func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) { if track.kind == .video { - self.onDidSubscribeToRemoteVideoTrack(self.data, participant.sid as CFString, track.id as CFString, Unmanaged.passRetained(track).toOpaque()) + self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.id as CFString, Unmanaged.passRetained(track).toOpaque()) } } @@ -97,8 +97,22 @@ public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPoin @_cdecl("LKRoomVideoTracksForRemoteParticipant") public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { let room = Unmanaged.fromOpaque(room).takeUnretainedValue() - let tracks = room.remoteParticipants[participantId as Sid]?.videoTracks.compactMap { $0.track as? RemoteVideoTrack } - return tracks as CFArray? + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + var tracks = [UnsafeMutableRawPointer]() + for publication in participant.videoTracks { + let track = publication.track as? RemoteVideoTrack + if track != nil { + tracks.append(Unmanaged.passRetained(track!).toOpaque()) + } + + } + return tracks as CFArray? + } + } + + return nil; } @_cdecl("LKCreateScreenShareTrackForDisplay") @@ -120,10 +134,17 @@ public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRaw track.add(videoRenderer: renderer) } -@_cdecl("LKDisplaySources") -public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) { +@_cdecl("LKRemoteVideoTrackGetSid") +public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString { + let track = Unmanaged.fromOpaque(track).takeUnretainedValue() + return track.sid! as CFString +} + +@_cdecl("LKDisplaySource") +public func LKDisplaySource(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer?, CFString?) -> Void) { MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in - callback(data, displaySources as CFArray, nil) + let displaySource = displaySources.first.map { Unmanaged.passRetained($0).toOpaque() } + callback(data, displaySource, nil) }.catch { error in callback(data, nil, error.localizedDescription as CFString) } diff --git a/crates/live_kit_client/build.rs b/crates/live_kit_client/build.rs index 79d7d84cdd..872454a489 100644 --- a/crates/live_kit_client/build.rs +++ b/crates/live_kit_client/build.rs @@ -40,6 +40,9 @@ fn main() { build_bridge(&swift_target); link_swift_stdlib(&swift_target); link_webrtc_framework(&swift_target); + + // Register exported Objective-C selectors, protocols, etc when building example binaries. + println!("cargo:rustc-link-arg=-Wl,-ObjC"); } fn build_bridge(swift_target: &SwiftTarget) { @@ -94,6 +97,8 @@ fn link_webrtc_framework(swift_target: &SwiftTarget) { ); // Find WebRTC.framework as a sibling of the executable when running tests. println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path"); + // Find WebRTC.framework in parent directory of the executable when running examples. + println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/.."); let source_path = swift_out_dir_path.join("WebRTC.framework"); let deps_dir_path = diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs new file mode 100644 index 0000000000..370d20354f --- /dev/null +++ b/crates/live_kit_client/examples/test_app.rs @@ -0,0 +1,160 @@ +use core_foundation::base::CFRetain; +use futures::StreamExt; +use gpui::{ + actions, + elements::{Canvas, *}, + keymap::Binding, + platform::current::Surface, + Menu, MenuItem, ViewContext, +}; +use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate, Room}; +use live_kit_server::token::{self, VideoGrant}; +use log::LevelFilter; +use media::core_video::CVImageBuffer; +use postage::watch; +use simplelog::SimpleLogger; +use std::sync::Arc; + +actions!(capture, [Quit]); + +fn main() { + SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger"); + + gpui::App::new(()).unwrap().run(|cx| { + cx.platform().activate(true); + cx.add_global_action(quit); + + cx.add_bindings([Binding::new("cmd-q", Quit, None)]); + cx.set_menus(vec![Menu { + name: "Zed", + items: vec![MenuItem::Action { + name: "Quit", + action: Box::new(Quit), + }], + }]); + + let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into()); + let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into()); + let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into()); + + cx.spawn(|cx| async move { + let user_a_token = token::create( + &live_kit_key, + &live_kit_secret, + Some("test-participant-1"), + VideoGrant::to_join("test-room"), + ) + .unwrap(); + let room_a = Room::new(); + room_a.connect(&live_kit_url, &user_a_token).await.unwrap(); + + let user2_token = token::create( + &live_kit_key, + &live_kit_secret, + Some("test-participant-2"), + VideoGrant::to_join("test-room"), + ) + .unwrap(); + let room_b = Room::new(); + room_b.connect(&live_kit_url, &user2_token).await.unwrap(); + + let mut track_changes = room_b.remote_video_track_updates(); + + let display = live_kit_client::display_source().await.unwrap(); + + let track_a = LocalVideoTrack::screen_share_for_display(&display); + room_a.publish_video_track(&track_a).await.unwrap(); + + let next_update = track_changes.next().await.unwrap(); + + if let RemoteVideoTrackUpdate::Subscribed(track) = next_update { + println!("A !!!!!!!!!!!!"); + let remote_tracks = room_b.remote_video_tracks("test-participant-1"); + println!("B !!!!!!!!!!!!"); + assert_eq!(remote_tracks.len(), 1); + println!("C !!!!!!!!!!!!"); + assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); + println!("D !!!!!!!!!!!!"); + // dbg!(track.id()); + // assert_eq!(track.id(), "test-participant-1"); + } else { + panic!("unexpected message") + } + println!("E !!!!!!!!!!!!"); + + cx.platform().quit(); + }) + .detach(); + }); +} + +struct ScreenCaptureView { + image_buffer: Option, + _room: Arc, +} + +impl gpui::Entity for ScreenCaptureView { + type Event = (); +} + +impl ScreenCaptureView { + pub fn new(room: Arc, cx: &mut ViewContext) -> Self { + let mut remote_video_tracks = room.remote_video_track_updates(); + cx.spawn_weak(|this, mut cx| async move { + if let Some(video_track) = remote_video_tracks.next().await { + let (mut frames_tx, mut frames_rx) = watch::channel_with(None); + // video_track.add_renderer(move |frame| *frames_tx.borrow_mut() = Some(frame)); + + while let Some(frame) = frames_rx.next().await { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| { + this.image_buffer = frame; + cx.notify(); + }); + } else { + break; + } + } + } + }) + .detach(); + + Self { + image_buffer: None, + _room: room, + } + } +} + +impl gpui::View for ScreenCaptureView { + fn ui_name() -> &'static str { + "View" + } + + fn render(&mut self, _: &mut gpui::RenderContext) -> gpui::ElementBox { + let image_buffer = self.image_buffer.clone(); + let canvas = Canvas::new(move |bounds, _, cx| { + if let Some(image_buffer) = image_buffer.clone() { + cx.scene.push_surface(Surface { + bounds, + image_buffer, + }); + } + }); + + if let Some(image_buffer) = self.image_buffer.as_ref() { + canvas + .constrained() + .with_width(image_buffer.width() as f32) + .with_height(image_buffer.height() as f32) + .aligned() + .boxed() + } else { + canvas.boxed() + } + } +} + +fn quit(_: &Quit, cx: &mut gpui::MutableAppContext) { + cx.platform().quit(); +} diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index 6bd5ce47e6..2d871d31b4 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -15,6 +15,10 @@ use std::{ sync::{Arc, Weak}, }; +pub type Sid = String; +#[allow(non_camel_case_types)] +pub type sid = str; + extern "C" { fn LKRelease(object: *const c_void); @@ -47,6 +51,10 @@ extern "C" { callback: extern "C" fn(*mut c_void, CFStringRef), callback_data: *mut c_void, ); + fn LKRoomVideoTracksForRemoteParticipant( + room: *const c_void, + participant_id: CFStringRef, + ) -> CFArrayRef; fn LKVideoRendererCreate( callback_data: *mut c_void, @@ -55,12 +63,13 @@ extern "C" { ) -> *const c_void; fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void); + fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef; - fn LKDisplaySources( + fn LKDisplaySource( callback_data: *mut c_void, callback: extern "C" fn( callback_data: *mut c_void, - sources: CFArrayRef, + source: *mut c_void, error: CFStringRef, ), ); @@ -69,7 +78,7 @@ extern "C" { pub struct Room { native_room: *const c_void, - remote_video_track_subscribers: Mutex>>, + remote_video_track_subscribers: Mutex>>, _delegate: RoomDelegate, } @@ -110,7 +119,40 @@ impl Room { async { rx.await.unwrap().context("error publishing video track") } } - pub fn remote_video_tracks(&self) -> mpsc::UnboundedReceiver { + pub fn remote_video_tracks(&self, participant_sid: &sid) -> Vec> { + unsafe { + let tracks = LKRoomVideoTracksForRemoteParticipant( + self.native_room, + CFString::new(participant_sid).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + println!("aaaa >>>>>>>>>>>>>>>"); + let tracks = CFArray::wrap_under_get_rule(tracks); + println!("bbbb >>>>>>>>>>>>>>>"); + tracks + .into_iter() + .map(|native_track| { + let native_track = *native_track; + println!("cccc >>>>>>>>>>>>>>>"); + let id = + CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track)) + .to_string(); + println!("dddd >>>>>>>>>>>>>>>"); + Arc::new(RemoteVideoTrack { + native_track, + publisher_id: participant_sid.into(), + id, + }) + }) + .collect() + } + } + } + + pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded(); self.remote_video_track_subscribers.lock().push(tx); rx @@ -119,14 +161,14 @@ impl Room { fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) { let track = Arc::new(track); self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackChange::Subscribed(track.clone())) + tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone())) .is_ok() }); } fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) { self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackChange::Unsubscribed { + tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed { publisher_id: publisher_id.clone(), track_id: track_id.clone(), }) @@ -201,7 +243,7 @@ impl RoomDelegate { if let Some(room) = room.upgrade() { room.did_subscribe_to_remote_video_track(track); } - let _ = Weak::into_raw(room); + // let _ = Weak::into_raw(room); } extern "C" fn on_did_unsubscribe_from_remote_video_track( @@ -244,9 +286,9 @@ impl Drop for LocalVideoTrack { #[derive(Debug)] pub struct RemoteVideoTrack { - id: String, + id: Sid, native_track: *const c_void, - publisher_id: String, + publisher_id: Sid, } impl RemoteVideoTrack { @@ -294,12 +336,9 @@ impl Drop for RemoteVideoTrack { } } -pub enum RemoteVideoTrackChange { +pub enum RemoteVideoTrackUpdate { Subscribed(Arc), - Unsubscribed { - publisher_id: String, - track_id: String, - }, + Unsubscribed { publisher_id: Sid, track_id: Sid }, } pub struct MacOSDisplay(*const c_void); @@ -310,17 +349,16 @@ impl Drop for MacOSDisplay { } } -pub fn display_sources() -> impl Future>> { - extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) { +pub fn display_source() -> impl Future> { + extern "C" fn callback(tx: *mut c_void, source: *mut c_void, error: CFStringRef) { unsafe { - let tx = Box::from_raw(tx as *mut oneshot::Sender>>); + let tx = Box::from_raw(tx as *mut oneshot::Sender>); - if sources.is_null() { + if source.is_null() { let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error)))); } else { - let sources = CFArray::wrap_under_get_rule(sources); - let sources_vec = sources.iter().map(|source| MacOSDisplay(*source)).collect(); - let _ = tx.send(Ok(sources_vec)); + let source = MacOSDisplay(source); + let _ = tx.send(Ok(source)); } } } @@ -328,8 +366,14 @@ pub fn display_sources() -> impl Future>> { let (tx, rx) = oneshot::channel(); unsafe { - LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback); + LKDisplaySource(Box::into_raw(Box::new(tx)) as *mut _, callback); } async move { rx.await.unwrap() } } + +#[cfg(test)] +mod tests { + #[test] + fn test_client() {} +} diff --git a/crates/live_kit_server/src/token.rs b/crates/live_kit_server/src/token.rs index ae03cb3469..9956ad006c 100644 --- a/crates/live_kit_server/src/token.rs +++ b/crates/live_kit_server/src/token.rs @@ -38,6 +38,18 @@ pub struct VideoGrant<'a> { pub recorder: Option, } +impl<'a> VideoGrant<'a> { + pub fn to_join(room: &'a str) -> Self { + Self { + room: Some(room), + room_join: Some(true), + can_publish: Some(true), + can_subscribe: Some(true), + ..Default::default() + } + } +} + pub fn create( api_key: &str, secret_key: &str,