From 0c3c1e1f68f9ebda86c0b5ec65e9ce3500791a5d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 18 Oct 2022 19:30:45 +0200 Subject: [PATCH] WIP --- crates/call/src/room.rs | 51 ++++++++++++------- .../Sources/LiveKitBridge/LiveKitBridge.swift | 8 +-- crates/live_kit_client/src/live_kit_client.rs | 34 +++++++++++-- 3 files changed, 68 insertions(+), 25 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 1bd86944bf..7670a0e239 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -7,7 +7,7 @@ use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; use collections::{BTreeMap, HashSet}; use futures::StreamExt; use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task}; -use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate}; +use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate}; use postage::watch; use project::Project; use std::sync::Arc; @@ -32,7 +32,7 @@ pub enum Event { pub struct Room { id: u64, - live_kit_room: Option<(Arc, Task<()>)>, + live_kit: Option, status: RoomStatus, local_participant: LocalParticipant, remote_participants: BTreeMap, @@ -82,7 +82,7 @@ impl Room { let live_kit_room = if let Some(connection_info) = live_kit_connection_info { let room = live_kit_client::Room::new(); let mut track_changes = room.remote_video_track_updates(); - let maintain_room = cx.spawn_weak(|this, mut cx| async move { + let _maintain_room = cx.spawn_weak(|this, mut cx| async move { while let Some(track_change) = track_changes.next().await { let this = if let Some(this) = this.upgrade(&cx) { this @@ -98,14 +98,18 @@ impl Room { cx.foreground() .spawn(room.connect(&connection_info.server_url, &connection_info.token)) .detach_and_log_err(cx); - Some((room, maintain_room)) + Some(LiveKitRoom { + room, + screen_track: None, + _maintain_room, + }) } else { None }; Self { id, - live_kit_room, + live_kit: live_kit_room, status: RoomStatus::Online, participant_user_ids: Default::default(), local_participant: Default::default(), @@ -212,7 +216,7 @@ impl Room { self.pending_participants.clear(); self.participant_user_ids.clear(); self.subscriptions.clear(); - self.live_kit_room.take(); + self.live_kit.take(); self.client.send(proto::LeaveRoom { id: self.id })?; Ok(()) } @@ -342,8 +346,9 @@ impl Room { }, ); - if let Some((room, _)) = this.live_kit_room.as_ref() { - let tracks = room.remote_video_tracks(&peer_id.0.to_string()); + if let Some(live_kit) = this.live_kit.as_ref() { + let tracks = + live_kit.room.remote_video_tracks(&peer_id.0.to_string()); for track in tracks { this.remote_video_track_updated( RemoteVideoTrackUpdate::Subscribed(track), @@ -605,24 +610,36 @@ impl Room { return Task::ready(Err(anyhow!("room is offline"))); } - let room = if let Some((room, _)) = self.live_kit_room.as_ref() { - room.clone() - } else { - return Task::ready(Err(anyhow!("not connected to LiveKit"))); - }; - - cx.foreground().spawn(async move { + cx.spawn_weak(|this, mut cx| async move { let displays = live_kit_client::display_sources().await?; let display = displays .first() .ok_or_else(|| anyhow!("no display found"))?; let track = LocalVideoTrack::screen_share_for_display(&display); - room.publish_video_track(&track).await?; - Ok(()) + + let publication = this + .upgrade(&cx)? + .read_with(&cx, |this, _| { + this.live_kit + .as_ref() + .map(|live_kit| live_kit.room.publish_video_track(&track)) + })? + .await?; + + this.upgrade(&cx)?.update(cx, |this, _| { + this.live_kit.as_mut()?.screen_track = Some(publication); + Some(()) + }) }) } } +struct LiveKitRoom { + room: Arc, + screen_track: Option, + _maintain_room: Task<()>, +} + #[derive(Copy, Clone, PartialEq, Eq)] pub enum RoomStatus { Online, diff --git a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift index c7ce2178ab..7488eb9444 100644 --- a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift +++ b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -85,13 +85,13 @@ public func LKRoomDisconnect(room: UnsafeRawPointer) { } @_cdecl("LKRoomPublishVideoTrack") -public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) { +public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) { let room = Unmanaged.fromOpaque(room).takeUnretainedValue() let track = Unmanaged.fromOpaque(track).takeUnretainedValue() - room.localParticipant?.publishVideoTrack(track: track).then { _ in - callback(callback_data, UnsafeRawPointer(nil) as! CFString?) + room.localParticipant?.publishVideoTrack(track: track).then { publication in + callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil) }.catch { error in - callback(callback_data, error.localizedDescription as CFString) + callback(callback_data, nil, error.localizedDescription as CFString) } } diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index 8eaa9a6f6a..b3724e91db 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -45,7 +45,7 @@ extern "C" { fn LKRoomPublishVideoTrack( room: *const c_void, track: *const c_void, - callback: extern "C" fn(*mut c_void, CFStringRef), + callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef), callback_data: *mut c_void, ); fn LKRoomVideoTracksForRemoteParticipant( @@ -108,10 +108,28 @@ impl Room { async { rx.await.unwrap().context("error connecting to room") } } - pub fn publish_video_track(&self, track: &LocalVideoTrack) -> impl Future> { - let (did_publish, tx, rx) = Self::build_done_callback(); + pub fn publish_video_track( + &self, + track: &LocalVideoTrack, + ) -> impl Future> { + let (tx, rx) = oneshot::channel::>(); + extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) { + let tx = + unsafe { Box::from_raw(tx as *mut oneshot::Sender>) }; + if error.is_null() { + let _ = tx.send(Ok(LocalTrackPublication(publication))); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + let _ = tx.send(Err(anyhow!(error))); + } + } unsafe { - LKRoomPublishVideoTrack(self.native_room, track.0, did_publish, tx); + LKRoomPublishVideoTrack( + self.native_room, + track.0, + callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ); } async { rx.await.unwrap().context("error publishing video track") } } @@ -275,6 +293,14 @@ impl Drop for LocalVideoTrack { } } +pub struct LocalTrackPublication(*const c_void); + +impl Drop for LocalTrackPublication { + fn drop(&mut self) { + unsafe { CFRelease(self.0) } + } +} + #[derive(Debug)] pub struct RemoteVideoTrack { native_track: *const c_void,