Use LiveKit's Rust SDK on Linux while continue using Swift SDK on Mac (#21550)
Similar to #20826 but keeps the Swift implementation. There were quite a few changes in the `call` crate, and so that code now has two variants. Closes #13714 Release Notes: - Added preliminary Linux support for voice chat and viewing screenshares. --------- Co-authored-by: Kirill Bulatov <mail4score@gmail.com> Co-authored-by: Kirill Bulatov <kirill@zed.dev> Co-authored-by: Mikayla <mikayla@zed.dev>
This commit is contained in:
parent
0511768b22
commit
6a4cd53fd8
91 changed files with 7187 additions and 1028 deletions
2
crates/livekit_client/.cargo/config.toml
Normal file
2
crates/livekit_client/.cargo/config.toml
Normal file
|
@ -0,0 +1,2 @@
|
|||
[livekit_client_test]
|
||||
rustflags = ["-C", "link-args=-ObjC"]
|
65
crates/livekit_client/Cargo.toml
Normal file
65
crates/livekit_client/Cargo.toml
Normal file
|
@ -0,0 +1,65 @@
|
|||
[package]
|
||||
name = "livekit_client"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "Logic for using LiveKit with GPUI"
|
||||
publish = false
|
||||
license = "GPL-3.0-or-later"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[lib]
|
||||
path = "src/livekit_client.rs"
|
||||
doctest = false
|
||||
|
||||
[[example]]
|
||||
name = "test_app"
|
||||
|
||||
[features]
|
||||
no-webrtc = []
|
||||
test-support = [
|
||||
"collections/test-support",
|
||||
"gpui/test-support",
|
||||
"nanoid",
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
collections.workspace = true
|
||||
cpal = "0.15"
|
||||
futures.workspace = true
|
||||
gpui.workspace = true
|
||||
http_2 = { package = "http", version = "0.2.1" }
|
||||
livekit_server.workspace = true
|
||||
log.workspace = true
|
||||
media.workspace = true
|
||||
nanoid = { workspace = true, optional = true}
|
||||
parking_lot.workspace = true
|
||||
postage.workspace = true
|
||||
util.workspace = true
|
||||
http_client.workspace = true
|
||||
smallvec.workspace = true
|
||||
image.workspace = true
|
||||
|
||||
[target.'cfg(not(target_os = "windows"))'.dependencies]
|
||||
livekit.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
core-foundation.workspace = true
|
||||
coreaudio-rs = "0.12.1"
|
||||
|
||||
[dev-dependencies]
|
||||
collections = { workspace = true, features = ["test-support"] }
|
||||
gpui = { workspace = true, features = ["test-support"] }
|
||||
nanoid.workspace = true
|
||||
sha2.workspace = true
|
||||
simplelog.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
[package.metadata.cargo-machete]
|
||||
ignored = ["serde_json"]
|
1
crates/livekit_client/LICENSE-GPL
Symbolic link
1
crates/livekit_client/LICENSE-GPL
Symbolic link
|
@ -0,0 +1 @@
|
|||
../../LICENSE-GPL
|
442
crates/livekit_client/examples/test_app.rs
Normal file
442
crates/livekit_client/examples/test_app.rs
Normal file
|
@ -0,0 +1,442 @@
|
|||
#![cfg_attr(windows, allow(unused))]
|
||||
// TODO: For some reason mac build complains about import of postage::stream::Stream, but removal of
|
||||
// it causes compile errors.
|
||||
#![cfg_attr(target_os = "macos", allow(unused_imports))]
|
||||
|
||||
use gpui::{
|
||||
actions, bounds, div, point,
|
||||
prelude::{FluentBuilder as _, IntoElement},
|
||||
px, rgb, size, AsyncAppContext, Bounds, InteractiveElement, KeyBinding, Menu, MenuItem,
|
||||
ParentElement, Pixels, Render, ScreenCaptureStream, SharedString,
|
||||
StatefulInteractiveElement as _, Styled, Task, View, ViewContext, VisualContext, WindowBounds,
|
||||
WindowHandle, WindowOptions,
|
||||
};
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use livekit_client::{
|
||||
capture_local_audio_track, capture_local_video_track,
|
||||
id::ParticipantIdentity,
|
||||
options::{TrackPublishOptions, VideoCodec},
|
||||
participant::{Participant, RemoteParticipant},
|
||||
play_remote_audio_track,
|
||||
publication::{LocalTrackPublication, RemoteTrackPublication},
|
||||
track::{LocalTrack, RemoteTrack, RemoteVideoTrack, TrackSource},
|
||||
AudioStream, RemoteVideoTrackView, Room, RoomEvent, RoomOptions,
|
||||
};
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use postage::stream::Stream;
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
use livekit_client::{
|
||||
participant::{Participant, RemoteParticipant},
|
||||
publication::{LocalTrackPublication, RemoteTrackPublication},
|
||||
track::{LocalTrack, RemoteTrack, RemoteVideoTrack},
|
||||
AudioStream, RemoteVideoTrackView, Room, RoomEvent,
|
||||
};
|
||||
|
||||
use livekit_server::token::{self, VideoGrant};
|
||||
use log::LevelFilter;
|
||||
use simplelog::SimpleLogger;
|
||||
|
||||
actions!(livekit_client, [Quit]);
|
||||
|
||||
#[cfg(windows)]
|
||||
fn main() {}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
fn main() {
|
||||
SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
|
||||
|
||||
gpui::App::new().run(|cx| {
|
||||
livekit_client::init(
|
||||
cx.background_executor().dispatcher.clone(),
|
||||
cx.http_client(),
|
||||
);
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
println!("USING TEST LIVEKIT");
|
||||
|
||||
#[cfg(not(any(test, feature = "test-support")))]
|
||||
println!("USING REAL LIVEKIT");
|
||||
|
||||
cx.activate(true);
|
||||
cx.on_action(quit);
|
||||
cx.bind_keys([KeyBinding::new("cmd-q", Quit, None)]);
|
||||
cx.set_menus(vec![Menu {
|
||||
name: "Zed".into(),
|
||||
items: vec![MenuItem::Action {
|
||||
name: "Quit".into(),
|
||||
action: Box::new(Quit),
|
||||
os_action: None,
|
||||
}],
|
||||
}]);
|
||||
|
||||
let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or("http://localhost:7880".into());
|
||||
let livekit_key = std::env::var("LIVEKIT_KEY").unwrap_or("devkey".into());
|
||||
let livekit_secret = std::env::var("LIVEKIT_SECRET").unwrap_or("secret".into());
|
||||
let height = px(800.);
|
||||
let width = px(800.);
|
||||
|
||||
cx.spawn(|cx| async move {
|
||||
let mut windows = Vec::new();
|
||||
for i in 0..2 {
|
||||
let token = token::create(
|
||||
&livekit_key,
|
||||
&livekit_secret,
|
||||
Some(&format!("test-participant-{i}")),
|
||||
VideoGrant::to_join("test-room"),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let bounds = bounds(point(width * i, px(0.0)), size(width, height));
|
||||
let window =
|
||||
LivekitWindow::new(livekit_url.as_str(), token.as_str(), bounds, cx.clone())
|
||||
.await;
|
||||
windows.push(window);
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
});
|
||||
}
|
||||
|
||||
fn quit(_: &Quit, cx: &mut gpui::AppContext) {
|
||||
cx.quit();
|
||||
}
|
||||
|
||||
struct LivekitWindow {
|
||||
room: Room,
|
||||
microphone_track: Option<LocalTrackPublication>,
|
||||
screen_share_track: Option<LocalTrackPublication>,
|
||||
microphone_stream: Option<AudioStream>,
|
||||
screen_share_stream: Option<Box<dyn ScreenCaptureStream>>,
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
remote_participants: Vec<(ParticipantIdentity, ParticipantState)>,
|
||||
_events_task: Task<()>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ParticipantState {
|
||||
audio_output_stream: Option<(RemoteTrackPublication, AudioStream)>,
|
||||
muted: bool,
|
||||
screen_share_output_view: Option<(RemoteVideoTrack, View<RemoteVideoTrackView>)>,
|
||||
speaking: bool,
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
impl LivekitWindow {
|
||||
async fn new(
|
||||
url: &str,
|
||||
token: &str,
|
||||
bounds: Bounds<Pixels>,
|
||||
cx: AsyncAppContext,
|
||||
) -> WindowHandle<Self> {
|
||||
let (room, mut events) = Room::connect(url, token, RoomOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
cx.update(|cx| {
|
||||
cx.open_window(
|
||||
WindowOptions {
|
||||
window_bounds: Some(WindowBounds::Windowed(bounds)),
|
||||
..Default::default()
|
||||
},
|
||||
|cx| {
|
||||
cx.new_view(|cx| {
|
||||
let _events_task = cx.spawn(|this, mut cx| async move {
|
||||
while let Some(event) = events.recv().await {
|
||||
this.update(&mut cx, |this: &mut LivekitWindow, cx| {
|
||||
this.handle_room_event(event, cx)
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
room,
|
||||
microphone_track: None,
|
||||
microphone_stream: None,
|
||||
screen_share_track: None,
|
||||
screen_share_stream: None,
|
||||
remote_participants: Vec::new(),
|
||||
_events_task,
|
||||
}
|
||||
})
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn handle_room_event(&mut self, event: RoomEvent, cx: &mut ViewContext<Self>) {
|
||||
eprintln!("event: {event:?}");
|
||||
|
||||
match event {
|
||||
RoomEvent::TrackUnpublished {
|
||||
publication,
|
||||
participant,
|
||||
} => {
|
||||
let output = self.remote_participant(participant);
|
||||
let unpublish_sid = publication.sid();
|
||||
if output
|
||||
.audio_output_stream
|
||||
.as_ref()
|
||||
.map_or(false, |(track, _)| track.sid() == unpublish_sid)
|
||||
{
|
||||
output.audio_output_stream.take();
|
||||
}
|
||||
if output
|
||||
.screen_share_output_view
|
||||
.as_ref()
|
||||
.map_or(false, |(track, _)| track.sid() == unpublish_sid)
|
||||
{
|
||||
output.screen_share_output_view.take();
|
||||
}
|
||||
cx.notify();
|
||||
}
|
||||
|
||||
RoomEvent::TrackSubscribed {
|
||||
publication,
|
||||
participant,
|
||||
track,
|
||||
} => {
|
||||
let output = self.remote_participant(participant);
|
||||
match track {
|
||||
RemoteTrack::Audio(track) => {
|
||||
output.audio_output_stream = Some((
|
||||
publication.clone(),
|
||||
play_remote_audio_track(&track, cx.background_executor()).unwrap(),
|
||||
));
|
||||
}
|
||||
RemoteTrack::Video(track) => {
|
||||
output.screen_share_output_view = Some((
|
||||
track.clone(),
|
||||
cx.new_view(|cx| RemoteVideoTrackView::new(track, cx)),
|
||||
));
|
||||
}
|
||||
}
|
||||
cx.notify();
|
||||
}
|
||||
|
||||
RoomEvent::TrackMuted { participant, .. } => {
|
||||
if let Participant::Remote(participant) = participant {
|
||||
self.remote_participant(participant).muted = true;
|
||||
cx.notify();
|
||||
}
|
||||
}
|
||||
|
||||
RoomEvent::TrackUnmuted { participant, .. } => {
|
||||
if let Participant::Remote(participant) = participant {
|
||||
self.remote_participant(participant).muted = false;
|
||||
cx.notify();
|
||||
}
|
||||
}
|
||||
|
||||
RoomEvent::ActiveSpeakersChanged { speakers } => {
|
||||
for (identity, output) in &mut self.remote_participants {
|
||||
output.speaking = speakers.iter().any(|speaker| {
|
||||
if let Participant::Remote(speaker) = speaker {
|
||||
speaker.identity() == *identity
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
}
|
||||
cx.notify();
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
|
||||
cx.notify();
|
||||
}
|
||||
|
||||
fn remote_participant(&mut self, participant: RemoteParticipant) -> &mut ParticipantState {
|
||||
match self
|
||||
.remote_participants
|
||||
.binary_search_by_key(&&participant.identity(), |row| &row.0)
|
||||
{
|
||||
Ok(ix) => &mut self.remote_participants[ix].1,
|
||||
Err(ix) => {
|
||||
self.remote_participants
|
||||
.insert(ix, (participant.identity(), ParticipantState::default()));
|
||||
&mut self.remote_participants[ix].1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn toggle_mute(&mut self, cx: &mut ViewContext<Self>) {
|
||||
if let Some(track) = &self.microphone_track {
|
||||
if track.is_muted() {
|
||||
track.unmute();
|
||||
} else {
|
||||
track.mute();
|
||||
}
|
||||
cx.notify();
|
||||
} else {
|
||||
let participant = self.room.local_participant();
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let (track, stream) = capture_local_audio_track(cx.background_executor())?.await;
|
||||
let publication = participant
|
||||
.publish_track(
|
||||
LocalTrack::Audio(track),
|
||||
TrackPublishOptions {
|
||||
source: TrackSource::Microphone,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.microphone_track = Some(publication);
|
||||
this.microphone_stream = Some(stream);
|
||||
cx.notify();
|
||||
})
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
|
||||
fn toggle_screen_share(&mut self, cx: &mut ViewContext<Self>) {
|
||||
if let Some(track) = self.screen_share_track.take() {
|
||||
self.screen_share_stream.take();
|
||||
let participant = self.room.local_participant();
|
||||
cx.background_executor()
|
||||
.spawn(async move {
|
||||
participant.unpublish_track(&track.sid()).await.unwrap();
|
||||
})
|
||||
.detach();
|
||||
cx.notify();
|
||||
} else {
|
||||
let participant = self.room.local_participant();
|
||||
let sources = cx.screen_capture_sources();
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
let sources = sources.await.unwrap()?;
|
||||
let source = sources.into_iter().next().unwrap();
|
||||
let (track, stream) = capture_local_video_track(&*source).await?;
|
||||
let publication = participant
|
||||
.publish_track(
|
||||
LocalTrack::Video(track),
|
||||
TrackPublishOptions {
|
||||
source: TrackSource::Screenshare,
|
||||
video_codec: VideoCodec::H264,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.screen_share_track = Some(publication);
|
||||
this.screen_share_stream = Some(stream);
|
||||
cx.notify();
|
||||
})
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
|
||||
fn toggle_remote_audio_for_participant(
|
||||
&mut self,
|
||||
identity: &ParticipantIdentity,
|
||||
cx: &mut ViewContext<Self>,
|
||||
) -> Option<()> {
|
||||
let participant = self.remote_participants.iter().find_map(|(id, state)| {
|
||||
if id == identity {
|
||||
Some(state)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})?;
|
||||
let publication = &participant.audio_output_stream.as_ref()?.0;
|
||||
publication.set_enabled(!publication.is_enabled());
|
||||
cx.notify();
|
||||
Some(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
impl Render for LivekitWindow {
|
||||
fn render(&mut self, cx: &mut ViewContext<Self>) -> impl IntoElement {
|
||||
fn button() -> gpui::Div {
|
||||
div()
|
||||
.w(px(180.0))
|
||||
.h(px(30.0))
|
||||
.px_2()
|
||||
.m_2()
|
||||
.bg(rgb(0x8888ff))
|
||||
}
|
||||
|
||||
div()
|
||||
.bg(rgb(0xffffff))
|
||||
.size_full()
|
||||
.flex()
|
||||
.flex_col()
|
||||
.child(
|
||||
div().bg(rgb(0xffd4a8)).flex().flex_row().children([
|
||||
button()
|
||||
.id("toggle-mute")
|
||||
.child(if let Some(track) = &self.microphone_track {
|
||||
if track.is_muted() {
|
||||
"Unmute"
|
||||
} else {
|
||||
"Mute"
|
||||
}
|
||||
} else {
|
||||
"Publish mic"
|
||||
})
|
||||
.on_click(cx.listener(|this, _, cx| this.toggle_mute(cx))),
|
||||
button()
|
||||
.id("toggle-screen-share")
|
||||
.child(if self.screen_share_track.is_none() {
|
||||
"Share screen"
|
||||
} else {
|
||||
"Unshare screen"
|
||||
})
|
||||
.on_click(cx.listener(|this, _, cx| this.toggle_screen_share(cx))),
|
||||
]),
|
||||
)
|
||||
.child(
|
||||
div()
|
||||
.id("remote-participants")
|
||||
.overflow_y_scroll()
|
||||
.flex()
|
||||
.flex_col()
|
||||
.flex_grow()
|
||||
.children(self.remote_participants.iter().map(|(identity, state)| {
|
||||
div()
|
||||
.h(px(300.0))
|
||||
.flex()
|
||||
.flex_col()
|
||||
.m_2()
|
||||
.px_2()
|
||||
.bg(rgb(0x8888ff))
|
||||
.child(SharedString::from(if state.speaking {
|
||||
format!("{} (speaking)", &identity.0)
|
||||
} else if state.muted {
|
||||
format!("{} (muted)", &identity.0)
|
||||
} else {
|
||||
identity.0.clone()
|
||||
}))
|
||||
.when_some(state.audio_output_stream.as_ref(), |el, state| {
|
||||
el.child(
|
||||
button()
|
||||
.id(SharedString::from(identity.0.clone()))
|
||||
.child(if state.0.is_enabled() {
|
||||
"Deafen"
|
||||
} else {
|
||||
"Undeafen"
|
||||
})
|
||||
.on_click(cx.listener({
|
||||
let identity = identity.clone();
|
||||
move |this, _, cx| {
|
||||
this.toggle_remote_audio_for_participant(
|
||||
&identity, cx,
|
||||
);
|
||||
}
|
||||
})),
|
||||
)
|
||||
})
|
||||
.children(state.screen_share_output_view.as_ref().map(|e| e.1.clone()))
|
||||
})),
|
||||
)
|
||||
}
|
||||
}
|
661
crates/livekit_client/src/livekit_client.rs
Normal file
661
crates/livekit_client/src/livekit_client.rs
Normal file
|
@ -0,0 +1,661 @@
|
|||
#![cfg_attr(target_os = "windows", allow(unused))]
|
||||
|
||||
mod remote_video_track_view;
|
||||
#[cfg(any(test, feature = "test-support", target_os = "windows"))]
|
||||
pub mod test;
|
||||
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
|
||||
use futures::{io, Stream, StreamExt as _};
|
||||
use gpui::{
|
||||
BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
|
||||
use util::{debug_panic, ResultExt as _};
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use webrtc::{
|
||||
audio_frame::AudioFrame,
|
||||
audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
|
||||
audio_stream::native::NativeAudioStream,
|
||||
video_frame::{VideoBuffer, VideoFrame, VideoRotation},
|
||||
video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
|
||||
video_stream::native::NativeVideoStream,
|
||||
};
|
||||
|
||||
#[cfg(all(not(any(test, feature = "test-support")), not(target_os = "windows")))]
|
||||
use livekit::track::RemoteAudioTrack;
|
||||
#[cfg(all(not(any(test, feature = "test-support")), not(target_os = "windows")))]
|
||||
pub use livekit::*;
|
||||
#[cfg(any(test, feature = "test-support", target_os = "windows"))]
|
||||
use test::track::RemoteAudioTrack;
|
||||
#[cfg(any(test, feature = "test-support", target_os = "windows"))]
|
||||
pub use test::*;
|
||||
|
||||
pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
|
||||
|
||||
pub enum AudioStream {
|
||||
Input {
|
||||
_thread_handle: std::sync::mpsc::Sender<()>,
|
||||
_transmit_task: Task<()>,
|
||||
},
|
||||
Output {
|
||||
_task: Task<()>,
|
||||
},
|
||||
}
|
||||
|
||||
struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl livekit::dispatcher::Dispatcher for Dispatcher {
|
||||
fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
|
||||
self.0.dispatch(runnable, None);
|
||||
}
|
||||
|
||||
fn dispatch_after(
|
||||
&self,
|
||||
duration: std::time::Duration,
|
||||
runnable: livekit::dispatcher::Runnable,
|
||||
) {
|
||||
self.0.dispatch_after(duration, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
|
||||
|
||||
fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
|
||||
http_2::StatusCode::from_u16(status.as_u16())
|
||||
.expect("valid status code to status code conversion")
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl livekit::dispatcher::HttpClient for HttpClientAdapter {
|
||||
fn get(
|
||||
&self,
|
||||
url: &str,
|
||||
) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
|
||||
let http_client = self.0.clone();
|
||||
let url = url.to_string();
|
||||
Box::pin(async move {
|
||||
let response = http_client
|
||||
.get(&url, http_client::AsyncBody::empty(), false)
|
||||
.await
|
||||
.map_err(io::Error::other)?;
|
||||
Ok(livekit::dispatcher::Response {
|
||||
status: http_2_status(response.status()),
|
||||
body: Box::pin(response.into_body()),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn send_async(
|
||||
&self,
|
||||
request: http_2::Request<Vec<u8>>,
|
||||
) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
|
||||
let http_client = self.0.clone();
|
||||
let mut builder = http_client::http::Request::builder()
|
||||
.method(request.method().as_str())
|
||||
.uri(request.uri().to_string());
|
||||
|
||||
for (key, value) in request.headers().iter() {
|
||||
builder = builder.header(key.as_str(), value.as_bytes());
|
||||
}
|
||||
|
||||
if !request.extensions().is_empty() {
|
||||
debug_panic!(
|
||||
"Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
|
||||
);
|
||||
}
|
||||
|
||||
let request = builder
|
||||
.body(http_client::AsyncBody::from_bytes(
|
||||
request.into_body().into(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
let response = http_client.send(request).await.map_err(io::Error::other)?;
|
||||
Ok(livekit::dispatcher::Response {
|
||||
status: http_2_status(response.status()),
|
||||
body: Box::pin(response.into_body()),
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
pub fn init(
|
||||
dispatcher: Arc<dyn gpui::PlatformDispatcher>,
|
||||
http_client: Arc<dyn http_client::HttpClient>,
|
||||
) {
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub fn init(
|
||||
dispatcher: Arc<dyn gpui::PlatformDispatcher>,
|
||||
http_client: Arc<dyn http_client::HttpClient>,
|
||||
) {
|
||||
livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
|
||||
livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub async fn capture_local_video_track(
|
||||
capture_source: &dyn ScreenCaptureSource,
|
||||
) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
|
||||
let resolution = capture_source.resolution()?;
|
||||
let track_source = NativeVideoSource::new(VideoResolution {
|
||||
width: resolution.width.0 as u32,
|
||||
height: resolution.height.0 as u32,
|
||||
});
|
||||
|
||||
let capture_stream = capture_source
|
||||
.stream({
|
||||
let track_source = track_source.clone();
|
||||
Box::new(move |frame| {
|
||||
if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
|
||||
track_source.capture_frame(&VideoFrame {
|
||||
rotation: VideoRotation::VideoRotation0,
|
||||
timestamp_us: 0,
|
||||
buffer,
|
||||
});
|
||||
}
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok((
|
||||
track::LocalVideoTrack::create_video_track(
|
||||
"screen share",
|
||||
RtcVideoSource::Native(track_source),
|
||||
),
|
||||
capture_stream,
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub fn capture_local_audio_track(
|
||||
background_executor: &BackgroundExecutor,
|
||||
) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
|
||||
use util::maybe;
|
||||
|
||||
let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
|
||||
let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
|
||||
let sample_rate;
|
||||
let channels;
|
||||
|
||||
if cfg!(any(test, feature = "test-support")) {
|
||||
sample_rate = 2;
|
||||
channels = 1;
|
||||
} else {
|
||||
let (device, config) = default_device(true)?;
|
||||
sample_rate = config.sample_rate().0;
|
||||
channels = config.channels() as u32;
|
||||
thread::spawn(move || {
|
||||
maybe!({
|
||||
if let Some(name) = device.name().ok() {
|
||||
log::info!("Using microphone: {}", name)
|
||||
} else {
|
||||
log::info!("Using microphone: <unknown>");
|
||||
}
|
||||
|
||||
let stream = device
|
||||
.build_input_stream_raw(
|
||||
&config.config(),
|
||||
cpal::SampleFormat::I16,
|
||||
move |data, _: &_| {
|
||||
frame_tx
|
||||
.unbounded_send(AudioFrame {
|
||||
data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
|
||||
sample_rate,
|
||||
num_channels: channels,
|
||||
samples_per_channel: data.len() as u32 / channels,
|
||||
})
|
||||
.ok();
|
||||
},
|
||||
|err| log::error!("error capturing audio track: {:?}", err),
|
||||
None,
|
||||
)
|
||||
.context("failed to build input stream")?;
|
||||
|
||||
stream.play()?;
|
||||
// Keep the thread alive and holding onto the `stream`
|
||||
thread_kill_rx.recv().ok();
|
||||
anyhow::Ok(Some(()))
|
||||
})
|
||||
.log_err();
|
||||
});
|
||||
}
|
||||
|
||||
Ok(background_executor.spawn({
|
||||
let background_executor = background_executor.clone();
|
||||
async move {
|
||||
let source = NativeAudioSource::new(
|
||||
AudioSourceOptions {
|
||||
echo_cancellation: true,
|
||||
noise_suppression: true,
|
||||
auto_gain_control: true,
|
||||
},
|
||||
sample_rate,
|
||||
channels,
|
||||
100,
|
||||
);
|
||||
let transmit_task = background_executor.spawn({
|
||||
let source = source.clone();
|
||||
async move {
|
||||
while let Some(frame) = frame_rx.next().await {
|
||||
source.capture_frame(&frame).await.log_err();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let track = track::LocalAudioTrack::create_audio_track(
|
||||
"microphone",
|
||||
RtcAudioSource::Native(source),
|
||||
);
|
||||
|
||||
(
|
||||
track,
|
||||
AudioStream::Input {
|
||||
_thread_handle: thread_handle,
|
||||
_transmit_task: transmit_task,
|
||||
},
|
||||
)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub fn play_remote_audio_track(
|
||||
track: &RemoteAudioTrack,
|
||||
background_executor: &BackgroundExecutor,
|
||||
) -> Result<AudioStream> {
|
||||
let track = track.clone();
|
||||
// We track device changes in our output because Livekit has a resampler built in,
|
||||
// and it's easy to create a new native audio stream when the device changes.
|
||||
if cfg!(any(test, feature = "test-support")) {
|
||||
Ok(AudioStream::Output {
|
||||
_task: background_executor.spawn(async {}),
|
||||
})
|
||||
} else {
|
||||
let mut default_change_listener = DeviceChangeListener::new(false)?;
|
||||
let (output_device, output_config) = default_device(false)?;
|
||||
|
||||
let _task = background_executor.spawn({
|
||||
let background_executor = background_executor.clone();
|
||||
async move {
|
||||
let (mut _receive_task, mut _thread) =
|
||||
start_output_stream(output_config, output_device, &track, &background_executor);
|
||||
|
||||
while let Some(_) = default_change_listener.next().await {
|
||||
let Some((output_device, output_config)) = get_default_output().log_err()
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Ok(name) = output_device.name() {
|
||||
log::info!("Using speaker: {}", name)
|
||||
} else {
|
||||
log::info!("Using speaker: <unknown>")
|
||||
}
|
||||
|
||||
(_receive_task, _thread) = start_output_stream(
|
||||
output_config,
|
||||
output_device,
|
||||
&track,
|
||||
&background_executor,
|
||||
);
|
||||
}
|
||||
|
||||
futures::future::pending::<()>().await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(AudioStream::Output { _task })
|
||||
}
|
||||
}
|
||||
|
||||
fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
|
||||
let device;
|
||||
let config;
|
||||
if input {
|
||||
device = cpal::default_host()
|
||||
.default_input_device()
|
||||
.ok_or_else(|| anyhow!("no audio input device available"))?;
|
||||
config = device
|
||||
.default_input_config()
|
||||
.context("failed to get default input config")?;
|
||||
} else {
|
||||
device = cpal::default_host()
|
||||
.default_output_device()
|
||||
.ok_or_else(|| anyhow!("no audio output device available"))?;
|
||||
config = device
|
||||
.default_output_config()
|
||||
.context("failed to get default output config")?;
|
||||
}
|
||||
Ok((device, config))
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
|
||||
let host = cpal::default_host();
|
||||
let output_device = host
|
||||
.default_output_device()
|
||||
.context("failed to read default output device")?;
|
||||
let output_config = output_device.default_output_config()?;
|
||||
Ok((output_device, output_config))
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn start_output_stream(
|
||||
output_config: cpal::SupportedStreamConfig,
|
||||
output_device: cpal::Device,
|
||||
track: &track::RemoteAudioTrack,
|
||||
background_executor: &BackgroundExecutor,
|
||||
) -> (Task<()>, std::sync::mpsc::Sender<()>) {
|
||||
let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
|
||||
let sample_rate = output_config.sample_rate();
|
||||
|
||||
let mut stream = NativeAudioStream::new(
|
||||
track.rtc_track(),
|
||||
sample_rate.0 as i32,
|
||||
output_config.channels() as i32,
|
||||
);
|
||||
|
||||
let receive_task = background_executor.spawn({
|
||||
let buffer = buffer.clone();
|
||||
async move {
|
||||
const MS_OF_BUFFER: u32 = 100;
|
||||
const MS_IN_SEC: u32 = 1000;
|
||||
while let Some(frame) = stream.next().await {
|
||||
let frame_size = frame.samples_per_channel * frame.num_channels;
|
||||
debug_assert!(frame.data.len() == frame_size as usize);
|
||||
|
||||
let buffer_size =
|
||||
((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
|
||||
|
||||
let mut buffer = buffer.lock();
|
||||
let new_size = buffer.len() + frame.data.len();
|
||||
if new_size > buffer_size {
|
||||
let overflow = new_size - buffer_size;
|
||||
buffer.drain(0..overflow);
|
||||
}
|
||||
|
||||
buffer.extend(frame.data.iter());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// The _output_stream needs to be on it's own thread because it's !Send
|
||||
// and we experienced a deadlock when it's created on the main thread.
|
||||
let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
|
||||
thread::spawn(move || {
|
||||
if cfg!(any(test, feature = "test-support")) {
|
||||
// Can't play audio in tests
|
||||
return;
|
||||
}
|
||||
|
||||
let output_stream = output_device.build_output_stream(
|
||||
&output_config.config(),
|
||||
{
|
||||
let buffer = buffer.clone();
|
||||
move |data, _info| {
|
||||
let mut buffer = buffer.lock();
|
||||
if buffer.len() < data.len() {
|
||||
// Instead of partially filling a buffer, output silence. If a partial
|
||||
// buffer was outputted then this could lead to a perpetual state of
|
||||
// outputting partial buffers as it never gets filled enough for a full
|
||||
// frame.
|
||||
data.fill(0);
|
||||
} else {
|
||||
// SAFETY: We know that buffer has at least data.len() values in it.
|
||||
// because we just checked
|
||||
let mut drain = buffer.drain(..data.len());
|
||||
data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
|
||||
}
|
||||
}
|
||||
},
|
||||
|error| log::error!("error playing audio track: {:?}", error),
|
||||
None,
|
||||
);
|
||||
|
||||
let Some(output_stream) = output_stream.log_err() else {
|
||||
return;
|
||||
};
|
||||
|
||||
output_stream.play().log_err();
|
||||
// Block forever to keep the output stream alive
|
||||
end_on_drop_rx.recv().ok();
|
||||
});
|
||||
|
||||
(receive_task, thread)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
pub fn play_remote_video_track(
|
||||
track: &track::RemoteVideoTrack,
|
||||
) -> impl Stream<Item = RemoteVideoFrame> {
|
||||
futures::stream::empty()
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub fn play_remote_video_track(
|
||||
track: &track::RemoteVideoTrack,
|
||||
) -> impl Stream<Item = RemoteVideoFrame> {
|
||||
NativeVideoStream::new(track.rtc_track())
|
||||
.filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
|
||||
use core_foundation::base::TCFType as _;
|
||||
use media::core_video::CVImageBuffer;
|
||||
|
||||
let buffer = buffer.as_native()?;
|
||||
let pixel_buffer = buffer.get_cv_pixel_buffer();
|
||||
if pixel_buffer.is_null() {
|
||||
return None;
|
||||
}
|
||||
|
||||
unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
|
||||
fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
|
||||
use gpui::RenderImage;
|
||||
use image::{Frame, RgbaImage};
|
||||
use livekit::webrtc::prelude::VideoFormatType;
|
||||
use smallvec::SmallVec;
|
||||
use std::alloc::{alloc, Layout};
|
||||
|
||||
let width = buffer.width();
|
||||
let height = buffer.height();
|
||||
let stride = width * 4;
|
||||
let byte_len = (stride * height) as usize;
|
||||
let argb_image = unsafe {
|
||||
// Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
|
||||
// will write all bytes anyway.
|
||||
let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
|
||||
if start_ptr.is_null() {
|
||||
return None;
|
||||
}
|
||||
let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
|
||||
buffer.to_argb(
|
||||
VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
|
||||
bgra_frame_slice,
|
||||
stride,
|
||||
width as i32,
|
||||
height as i32,
|
||||
);
|
||||
Vec::from_raw_parts(start_ptr, byte_len, byte_len)
|
||||
};
|
||||
|
||||
Some(Arc::new(RenderImage::new(SmallVec::from_elem(
|
||||
Frame::new(
|
||||
RgbaImage::from_raw(width, height, argb_image)
|
||||
.with_context(|| "Bug: not enough bytes allocated for image.")
|
||||
.log_err()?,
|
||||
),
|
||||
1,
|
||||
))))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
|
||||
use core_foundation::base::TCFType as _;
|
||||
|
||||
let pixel_buffer = frame.0.as_concrete_TypeRef();
|
||||
std::mem::forget(frame.0);
|
||||
unsafe {
|
||||
Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
|
||||
fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
|
||||
None as Option<Box<dyn VideoBuffer>>
|
||||
}
|
||||
|
||||
trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
|
||||
fn new(input: bool) -> Result<Self>;
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
mod macos {
|
||||
|
||||
use coreaudio::sys::{
|
||||
kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
|
||||
kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
|
||||
kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
|
||||
AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
|
||||
};
|
||||
use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
|
||||
|
||||
use crate::DeviceChangeListenerApi;
|
||||
|
||||
/// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
|
||||
pub struct CoreAudioDefaultDeviceChangeListener {
|
||||
rx: UnboundedReceiver<()>,
|
||||
callback: Box<PropertyListenerCallbackWrapper>,
|
||||
input: bool,
|
||||
}
|
||||
|
||||
trait _AssertSend: Send {}
|
||||
impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
|
||||
|
||||
struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
|
||||
|
||||
unsafe extern "C" fn property_listener_handler_shim(
|
||||
_: AudioObjectID,
|
||||
_: u32,
|
||||
_: *const AudioObjectPropertyAddress,
|
||||
callback: *mut ::std::os::raw::c_void,
|
||||
) -> OSStatus {
|
||||
let wrapper = callback as *mut PropertyListenerCallbackWrapper;
|
||||
(*wrapper).0();
|
||||
0
|
||||
}
|
||||
|
||||
impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
|
||||
fn new(input: bool) -> gpui::Result<Self> {
|
||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||
|
||||
let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
|
||||
tx.unbounded_send(()).ok();
|
||||
})));
|
||||
|
||||
unsafe {
|
||||
coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
|
||||
kAudioObjectSystemObject,
|
||||
&AudioObjectPropertyAddress {
|
||||
mSelector: if input {
|
||||
kAudioHardwarePropertyDefaultInputDevice
|
||||
} else {
|
||||
kAudioHardwarePropertyDefaultOutputDevice
|
||||
},
|
||||
mScope: kAudioObjectPropertyScopeGlobal,
|
||||
mElement: kAudioObjectPropertyElementMaster,
|
||||
},
|
||||
Some(property_listener_handler_shim),
|
||||
&*callback as *const _ as *mut _,
|
||||
))?;
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
rx,
|
||||
callback,
|
||||
input,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CoreAudioDefaultDeviceChangeListener {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
AudioObjectRemovePropertyListener(
|
||||
kAudioObjectSystemObject,
|
||||
&AudioObjectPropertyAddress {
|
||||
mSelector: if self.input {
|
||||
kAudioHardwarePropertyDefaultInputDevice
|
||||
} else {
|
||||
kAudioHardwarePropertyDefaultOutputDevice
|
||||
},
|
||||
mScope: kAudioObjectPropertyScopeGlobal,
|
||||
mElement: kAudioObjectPropertyElementMaster,
|
||||
},
|
||||
Some(property_listener_handler_shim),
|
||||
&*self.callback as *const _ as *mut _,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
|
||||
type Item = ();
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.rx.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
|
||||
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
mod noop_change_listener {
|
||||
use std::task::Poll;
|
||||
|
||||
use crate::DeviceChangeListenerApi;
|
||||
|
||||
pub struct NoopOutputDeviceChangelistener {}
|
||||
|
||||
impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
|
||||
fn new(_input: bool) -> anyhow::Result<Self> {
|
||||
Ok(NoopOutputDeviceChangelistener {})
|
||||
}
|
||||
}
|
||||
|
||||
impl futures::Stream for NoopOutputDeviceChangelistener {
|
||||
type Item = ();
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;
|
99
crates/livekit_client/src/remote_video_track_view.rs
Normal file
99
crates/livekit_client/src/remote_video_track_view.rs
Normal file
|
@ -0,0 +1,99 @@
|
|||
use crate::track::RemoteVideoTrack;
|
||||
use anyhow::Result;
|
||||
use futures::StreamExt as _;
|
||||
use gpui::{Empty, EventEmitter, IntoElement, Render, Task, View, ViewContext, VisualContext as _};
|
||||
|
||||
pub struct RemoteVideoTrackView {
|
||||
track: RemoteVideoTrack,
|
||||
latest_frame: Option<crate::RemoteVideoFrame>,
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
current_rendered_frame: Option<crate::RemoteVideoFrame>,
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
previous_rendered_frame: Option<crate::RemoteVideoFrame>,
|
||||
_maintain_frame: Task<Result<()>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum RemoteVideoTrackViewEvent {
|
||||
Close,
|
||||
}
|
||||
|
||||
impl RemoteVideoTrackView {
|
||||
pub fn new(track: RemoteVideoTrack, cx: &mut ViewContext<Self>) -> Self {
|
||||
cx.focus_handle();
|
||||
let frames = super::play_remote_video_track(&track);
|
||||
|
||||
Self {
|
||||
track,
|
||||
latest_frame: None,
|
||||
_maintain_frame: cx.spawn(|this, mut cx| async move {
|
||||
futures::pin_mut!(frames);
|
||||
while let Some(frame) = frames.next().await {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.latest_frame = Some(frame);
|
||||
cx.notify();
|
||||
})?;
|
||||
}
|
||||
this.update(&mut cx, |_this, cx| {
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
{
|
||||
use util::ResultExt as _;
|
||||
if let Some(frame) = _this.previous_rendered_frame.take() {
|
||||
cx.window_context().drop_image(frame).log_err();
|
||||
}
|
||||
// TODO(mgsloan): This might leak the last image of the screenshare if
|
||||
// render is called after the screenshare ends.
|
||||
if let Some(frame) = _this.current_rendered_frame.take() {
|
||||
cx.window_context().drop_image(frame).log_err();
|
||||
}
|
||||
}
|
||||
cx.emit(RemoteVideoTrackViewEvent::Close)
|
||||
})?;
|
||||
Ok(())
|
||||
}),
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
current_rendered_frame: None,
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
previous_rendered_frame: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clone(&self, cx: &mut ViewContext<Self>) -> View<Self> {
|
||||
cx.new_view(|cx| Self::new(self.track.clone(), cx))
|
||||
}
|
||||
}
|
||||
|
||||
impl EventEmitter<RemoteVideoTrackViewEvent> for RemoteVideoTrackView {}
|
||||
|
||||
impl Render for RemoteVideoTrackView {
|
||||
fn render(&mut self, _cx: &mut ViewContext<Self>) -> impl IntoElement {
|
||||
#[cfg(target_os = "macos")]
|
||||
if let Some(latest_frame) = &self.latest_frame {
|
||||
use gpui::Styled as _;
|
||||
return gpui::surface(latest_frame.clone())
|
||||
.size_full()
|
||||
.into_any_element();
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
if let Some(latest_frame) = &self.latest_frame {
|
||||
use gpui::Styled as _;
|
||||
if let Some(current_rendered_frame) = self.current_rendered_frame.take() {
|
||||
if let Some(frame) = self.previous_rendered_frame.take() {
|
||||
// Only drop the frame if it's not also the current frame.
|
||||
if frame.id != current_rendered_frame.id {
|
||||
use util::ResultExt as _;
|
||||
_cx.window_context().drop_image(frame).log_err();
|
||||
}
|
||||
}
|
||||
self.previous_rendered_frame = Some(current_rendered_frame)
|
||||
}
|
||||
self.current_rendered_frame = Some(latest_frame.clone());
|
||||
return gpui::img(latest_frame.clone())
|
||||
.size_full()
|
||||
.into_any_element();
|
||||
}
|
||||
|
||||
Empty.into_any_element()
|
||||
}
|
||||
}
|
825
crates/livekit_client/src/test.rs
Normal file
825
crates/livekit_client/src/test.rs
Normal file
|
@ -0,0 +1,825 @@
|
|||
pub mod participant;
|
||||
pub mod publication;
|
||||
pub mod track;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub mod webrtc;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
use self::id::*;
|
||||
use self::{participant::*, publication::*, track::*};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap, HashSet};
|
||||
use gpui::BackgroundExecutor;
|
||||
#[cfg(not(windows))]
|
||||
use livekit::options::TrackPublishOptions;
|
||||
use livekit_server::{proto, token};
|
||||
use parking_lot::Mutex;
|
||||
use postage::{mpsc, sink::Sink};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering::SeqCst},
|
||||
Arc, Weak,
|
||||
};
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub use livekit::{id, options, ConnectionState, DisconnectReason, RoomOptions};
|
||||
|
||||
static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
|
||||
|
||||
pub struct TestServer {
|
||||
pub url: String,
|
||||
pub api_key: String,
|
||||
pub secret_key: String,
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
rooms: Mutex<HashMap<String, TestServerRoom>>,
|
||||
executor: BackgroundExecutor,
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl TestServer {
|
||||
pub fn create(
|
||||
url: String,
|
||||
api_key: String,
|
||||
secret_key: String,
|
||||
executor: BackgroundExecutor,
|
||||
) -> Result<Arc<TestServer>> {
|
||||
let mut servers = SERVERS.lock();
|
||||
if let BTreeEntry::Vacant(e) = servers.entry(url.clone()) {
|
||||
let server = Arc::new(TestServer {
|
||||
url,
|
||||
api_key,
|
||||
secret_key,
|
||||
rooms: Default::default(),
|
||||
executor,
|
||||
});
|
||||
e.insert(server.clone());
|
||||
Ok(server)
|
||||
} else {
|
||||
Err(anyhow!("a server with url {:?} already exists", url))
|
||||
}
|
||||
}
|
||||
|
||||
fn get(url: &str) -> Result<Arc<TestServer>> {
|
||||
Ok(SERVERS
|
||||
.lock()
|
||||
.get(url)
|
||||
.ok_or_else(|| anyhow!("no server found for url"))?
|
||||
.clone())
|
||||
}
|
||||
|
||||
pub fn teardown(&self) -> Result<()> {
|
||||
SERVERS
|
||||
.lock()
|
||||
.remove(&self.url)
|
||||
.ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn create_api_client(&self) -> TestApiClient {
|
||||
TestApiClient {
|
||||
url: self.url.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_room(&self, room: String) -> Result<()> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
if let Entry::Vacant(e) = server_rooms.entry(room.clone()) {
|
||||
e.insert(Default::default());
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("room {:?} already exists", room))
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_room(&self, room: String) -> Result<()> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
server_rooms
|
||||
.remove(&room)
|
||||
.ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn join_room(&self, token: String, client_room: Room) -> Result<ParticipantIdentity> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
let room_name = claims.video.room.unwrap();
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = (*server_rooms).entry(room_name.to_string()).or_default();
|
||||
|
||||
if let Entry::Vacant(e) = room.client_rooms.entry(identity.clone()) {
|
||||
for server_track in &room.video_tracks {
|
||||
let track = RemoteTrack::Video(RemoteVideoTrack {
|
||||
server_track: server_track.clone(),
|
||||
_room: client_room.downgrade(),
|
||||
});
|
||||
client_room
|
||||
.0
|
||||
.lock()
|
||||
.updates_tx
|
||||
.blocking_send(RoomEvent::TrackSubscribed {
|
||||
track: track.clone(),
|
||||
publication: RemoteTrackPublication {
|
||||
sid: server_track.sid.clone(),
|
||||
room: client_room.downgrade(),
|
||||
track,
|
||||
},
|
||||
participant: RemoteParticipant {
|
||||
room: client_room.downgrade(),
|
||||
identity: server_track.publisher_id.clone(),
|
||||
},
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
for server_track in &room.audio_tracks {
|
||||
let track = RemoteTrack::Audio(RemoteAudioTrack {
|
||||
server_track: server_track.clone(),
|
||||
room: client_room.downgrade(),
|
||||
});
|
||||
client_room
|
||||
.0
|
||||
.lock()
|
||||
.updates_tx
|
||||
.blocking_send(RoomEvent::TrackSubscribed {
|
||||
track: track.clone(),
|
||||
publication: RemoteTrackPublication {
|
||||
sid: server_track.sid.clone(),
|
||||
room: client_room.downgrade(),
|
||||
track,
|
||||
},
|
||||
participant: RemoteParticipant {
|
||||
room: client_room.downgrade(),
|
||||
identity: server_track.publisher_id.clone(),
|
||||
},
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
e.insert(client_room);
|
||||
Ok(identity)
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"{:?} attempted to join room {:?} twice",
|
||||
identity,
|
||||
room_name
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn leave_room(&self, token: String) -> Result<()> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
let room_name = claims.video.room.unwrap();
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&*room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
room.client_rooms.remove(&identity).ok_or_else(|| {
|
||||
anyhow!(
|
||||
"{:?} attempted to leave room {:?} before joining it",
|
||||
identity,
|
||||
room_name
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remote_participants(
|
||||
&self,
|
||||
token: String,
|
||||
) -> Result<HashMap<ParticipantIdentity, RemoteParticipant>> {
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let local_identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
let room_name = claims.video.room.unwrap().to_string();
|
||||
|
||||
if let Some(server_room) = self.rooms.lock().get(&room_name) {
|
||||
let room = server_room
|
||||
.client_rooms
|
||||
.get(&local_identity)
|
||||
.unwrap()
|
||||
.downgrade();
|
||||
Ok(server_room
|
||||
.client_rooms
|
||||
.iter()
|
||||
.filter(|(identity, _)| *identity != &local_identity)
|
||||
.map(|(identity, _)| {
|
||||
(
|
||||
identity.clone(),
|
||||
RemoteParticipant {
|
||||
room: room.clone(),
|
||||
identity: identity.clone(),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect())
|
||||
} else {
|
||||
Ok(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_participant(
|
||||
&self,
|
||||
room_name: String,
|
||||
identity: ParticipantIdentity,
|
||||
) -> Result<()> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
room.client_rooms.remove(&identity).ok_or_else(|| {
|
||||
anyhow!(
|
||||
"participant {:?} did not join room {:?}",
|
||||
identity,
|
||||
room_name
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_participant(
|
||||
&self,
|
||||
room_name: String,
|
||||
identity: String,
|
||||
permission: proto::ParticipantPermission,
|
||||
) -> Result<()> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
room.participant_permissions
|
||||
.insert(ParticipantIdentity(identity), permission);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn disconnect_client(&self, client_identity: String) {
|
||||
let client_identity = ParticipantIdentity(client_identity);
|
||||
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
for room in server_rooms.values_mut() {
|
||||
if let Some(room) = room.client_rooms.remove(&client_identity) {
|
||||
let mut room = room.0.lock();
|
||||
room.connection_state = ConnectionState::Disconnected;
|
||||
room.updates_tx
|
||||
.blocking_send(RoomEvent::Disconnected {
|
||||
reason: DisconnectReason::SignalClose,
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn publish_video_track(
|
||||
&self,
|
||||
token: String,
|
||||
_local_track: LocalVideoTrack,
|
||||
) -> Result<TrackSid> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
let room_name = claims.video.room.unwrap();
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&*room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
|
||||
let can_publish = room
|
||||
.participant_permissions
|
||||
.get(&identity)
|
||||
.map(|permission| permission.can_publish)
|
||||
.or(claims.video.can_publish)
|
||||
.unwrap_or(true);
|
||||
|
||||
if !can_publish {
|
||||
return Err(anyhow!("user is not allowed to publish"));
|
||||
}
|
||||
|
||||
let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
|
||||
let server_track = Arc::new(TestServerVideoTrack {
|
||||
sid: sid.clone(),
|
||||
publisher_id: identity.clone(),
|
||||
});
|
||||
|
||||
room.video_tracks.push(server_track.clone());
|
||||
|
||||
for (room_identity, client_room) in &room.client_rooms {
|
||||
if *room_identity != identity {
|
||||
let track = RemoteTrack::Video(RemoteVideoTrack {
|
||||
server_track: server_track.clone(),
|
||||
_room: client_room.downgrade(),
|
||||
});
|
||||
let publication = RemoteTrackPublication {
|
||||
sid: sid.clone(),
|
||||
room: client_room.downgrade(),
|
||||
track: track.clone(),
|
||||
};
|
||||
let participant = RemoteParticipant {
|
||||
identity: identity.clone(),
|
||||
room: client_room.downgrade(),
|
||||
};
|
||||
client_room
|
||||
.0
|
||||
.lock()
|
||||
.updates_tx
|
||||
.blocking_send(RoomEvent::TrackSubscribed {
|
||||
track,
|
||||
publication,
|
||||
participant,
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(sid)
|
||||
}
|
||||
|
||||
async fn publish_audio_track(
|
||||
&self,
|
||||
token: String,
|
||||
_local_track: &LocalAudioTrack,
|
||||
) -> Result<TrackSid> {
|
||||
self.executor.simulate_random_delay().await;
|
||||
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
let room_name = claims.video.room.unwrap();
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&*room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
|
||||
let can_publish = room
|
||||
.participant_permissions
|
||||
.get(&identity)
|
||||
.map(|permission| permission.can_publish)
|
||||
.or(claims.video.can_publish)
|
||||
.unwrap_or(true);
|
||||
|
||||
if !can_publish {
|
||||
return Err(anyhow!("user is not allowed to publish"));
|
||||
}
|
||||
|
||||
let sid: TrackSid = format!("TR_{}", nanoid::nanoid!(17)).try_into().unwrap();
|
||||
let server_track = Arc::new(TestServerAudioTrack {
|
||||
sid: sid.clone(),
|
||||
publisher_id: identity.clone(),
|
||||
muted: AtomicBool::new(false),
|
||||
});
|
||||
|
||||
room.audio_tracks.push(server_track.clone());
|
||||
|
||||
for (room_identity, client_room) in &room.client_rooms {
|
||||
if *room_identity != identity {
|
||||
let track = RemoteTrack::Audio(RemoteAudioTrack {
|
||||
server_track: server_track.clone(),
|
||||
room: client_room.downgrade(),
|
||||
});
|
||||
let publication = RemoteTrackPublication {
|
||||
sid: sid.clone(),
|
||||
room: client_room.downgrade(),
|
||||
track: track.clone(),
|
||||
};
|
||||
let participant = RemoteParticipant {
|
||||
identity: identity.clone(),
|
||||
room: client_room.downgrade(),
|
||||
};
|
||||
client_room
|
||||
.0
|
||||
.lock()
|
||||
.updates_tx
|
||||
.blocking_send(RoomEvent::TrackSubscribed {
|
||||
track,
|
||||
publication,
|
||||
participant,
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(sid)
|
||||
}
|
||||
|
||||
async fn unpublish_track(&self, _token: String, _track: &TrackSid) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_track_muted(&self, token: &str, track_sid: &TrackSid, muted: bool) -> Result<()> {
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let room_name = claims.video.room.unwrap();
|
||||
let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&*room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
if let Some(track) = room
|
||||
.audio_tracks
|
||||
.iter_mut()
|
||||
.find(|track| track.sid == *track_sid)
|
||||
{
|
||||
track.muted.store(muted, SeqCst);
|
||||
for (id, client_room) in room.client_rooms.iter() {
|
||||
if *id != identity {
|
||||
let participant = Participant::Remote(RemoteParticipant {
|
||||
identity: identity.clone(),
|
||||
room: client_room.downgrade(),
|
||||
});
|
||||
let track = RemoteTrack::Audio(RemoteAudioTrack {
|
||||
server_track: track.clone(),
|
||||
room: client_room.downgrade(),
|
||||
});
|
||||
let publication = TrackPublication::Remote(RemoteTrackPublication {
|
||||
sid: track_sid.clone(),
|
||||
room: client_room.downgrade(),
|
||||
track,
|
||||
});
|
||||
|
||||
let event = if muted {
|
||||
RoomEvent::TrackMuted {
|
||||
participant,
|
||||
publication,
|
||||
}
|
||||
} else {
|
||||
RoomEvent::TrackUnmuted {
|
||||
participant,
|
||||
publication,
|
||||
}
|
||||
};
|
||||
|
||||
client_room
|
||||
.0
|
||||
.lock()
|
||||
.updates_tx
|
||||
.blocking_send(event)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_track_muted(&self, token: &str, track_sid: &TrackSid) -> Option<bool> {
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key).ok()?;
|
||||
let room_name = claims.video.room.unwrap();
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms.get_mut(&*room_name)?;
|
||||
room.audio_tracks.iter().find_map(|track| {
|
||||
if track.sid == *track_sid {
|
||||
Some(track.muted.load(SeqCst))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn video_tracks(&self, token: String) -> Result<Vec<RemoteVideoTrack>> {
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let room_name = claims.video.room.unwrap();
|
||||
let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&*room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
let client_room = room
|
||||
.client_rooms
|
||||
.get(&identity)
|
||||
.ok_or_else(|| anyhow!("not a participant in room"))?;
|
||||
Ok(room
|
||||
.video_tracks
|
||||
.iter()
|
||||
.map(|track| RemoteVideoTrack {
|
||||
server_track: track.clone(),
|
||||
_room: client_room.downgrade(),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn audio_tracks(&self, token: String) -> Result<Vec<RemoteAudioTrack>> {
|
||||
let claims = livekit_server::token::validate(&token, &self.secret_key)?;
|
||||
let room_name = claims.video.room.unwrap();
|
||||
let identity = ParticipantIdentity(claims.sub.unwrap().to_string());
|
||||
|
||||
let mut server_rooms = self.rooms.lock();
|
||||
let room = server_rooms
|
||||
.get_mut(&*room_name)
|
||||
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
|
||||
let client_room = room
|
||||
.client_rooms
|
||||
.get(&identity)
|
||||
.ok_or_else(|| anyhow!("not a participant in room"))?;
|
||||
Ok(room
|
||||
.audio_tracks
|
||||
.iter()
|
||||
.map(|track| RemoteAudioTrack {
|
||||
server_track: track.clone(),
|
||||
room: client_room.downgrade(),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
#[derive(Default, Debug)]
|
||||
struct TestServerRoom {
|
||||
client_rooms: HashMap<ParticipantIdentity, Room>,
|
||||
video_tracks: Vec<Arc<TestServerVideoTrack>>,
|
||||
audio_tracks: Vec<Arc<TestServerAudioTrack>>,
|
||||
participant_permissions: HashMap<ParticipantIdentity, proto::ParticipantPermission>,
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
#[derive(Debug)]
|
||||
struct TestServerVideoTrack {
|
||||
sid: TrackSid,
|
||||
publisher_id: ParticipantIdentity,
|
||||
// frames_rx: async_broadcast::Receiver<Frame>,
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
#[derive(Debug)]
|
||||
struct TestServerAudioTrack {
|
||||
sid: TrackSid,
|
||||
publisher_id: ParticipantIdentity,
|
||||
muted: AtomicBool,
|
||||
}
|
||||
|
||||
pub struct TestApiClient {
|
||||
url: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[non_exhaustive]
|
||||
pub enum RoomEvent {
|
||||
ParticipantConnected(RemoteParticipant),
|
||||
ParticipantDisconnected(RemoteParticipant),
|
||||
LocalTrackPublished {
|
||||
publication: LocalTrackPublication,
|
||||
track: LocalTrack,
|
||||
participant: LocalParticipant,
|
||||
},
|
||||
LocalTrackUnpublished {
|
||||
publication: LocalTrackPublication,
|
||||
participant: LocalParticipant,
|
||||
},
|
||||
TrackSubscribed {
|
||||
track: RemoteTrack,
|
||||
publication: RemoteTrackPublication,
|
||||
participant: RemoteParticipant,
|
||||
},
|
||||
TrackUnsubscribed {
|
||||
track: RemoteTrack,
|
||||
publication: RemoteTrackPublication,
|
||||
participant: RemoteParticipant,
|
||||
},
|
||||
TrackSubscriptionFailed {
|
||||
participant: RemoteParticipant,
|
||||
error: String,
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
track_sid: TrackSid,
|
||||
},
|
||||
TrackPublished {
|
||||
publication: RemoteTrackPublication,
|
||||
participant: RemoteParticipant,
|
||||
},
|
||||
TrackUnpublished {
|
||||
publication: RemoteTrackPublication,
|
||||
participant: RemoteParticipant,
|
||||
},
|
||||
TrackMuted {
|
||||
participant: Participant,
|
||||
publication: TrackPublication,
|
||||
},
|
||||
TrackUnmuted {
|
||||
participant: Participant,
|
||||
publication: TrackPublication,
|
||||
},
|
||||
RoomMetadataChanged {
|
||||
old_metadata: String,
|
||||
metadata: String,
|
||||
},
|
||||
ParticipantMetadataChanged {
|
||||
participant: Participant,
|
||||
old_metadata: String,
|
||||
metadata: String,
|
||||
},
|
||||
ParticipantNameChanged {
|
||||
participant: Participant,
|
||||
old_name: String,
|
||||
name: String,
|
||||
},
|
||||
ActiveSpeakersChanged {
|
||||
speakers: Vec<Participant>,
|
||||
},
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
ConnectionStateChanged(ConnectionState),
|
||||
Connected {
|
||||
participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
|
||||
},
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
Disconnected {
|
||||
reason: DisconnectReason,
|
||||
},
|
||||
Reconnecting,
|
||||
Reconnected,
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
#[async_trait]
|
||||
impl livekit_server::api::Client for TestApiClient {
|
||||
fn url(&self) -> &str {
|
||||
&self.url
|
||||
}
|
||||
|
||||
async fn create_room(&self, name: String) -> Result<()> {
|
||||
let server = TestServer::get(&self.url)?;
|
||||
server.create_room(name).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_room(&self, name: String) -> Result<()> {
|
||||
let server = TestServer::get(&self.url)?;
|
||||
server.delete_room(name).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
|
||||
let server = TestServer::get(&self.url)?;
|
||||
server
|
||||
.remove_participant(room, ParticipantIdentity(identity))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_participant(
|
||||
&self,
|
||||
room: String,
|
||||
identity: String,
|
||||
permission: livekit_server::proto::ParticipantPermission,
|
||||
) -> Result<()> {
|
||||
let server = TestServer::get(&self.url)?;
|
||||
server
|
||||
.update_participant(room, identity, permission)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn room_token(&self, room: &str, identity: &str) -> Result<String> {
|
||||
let server = TestServer::get(&self.url)?;
|
||||
token::create(
|
||||
&server.api_key,
|
||||
&server.secret_key,
|
||||
Some(identity),
|
||||
token::VideoGrant::to_join(room),
|
||||
)
|
||||
}
|
||||
|
||||
fn guest_token(&self, room: &str, identity: &str) -> Result<String> {
|
||||
let server = TestServer::get(&self.url)?;
|
||||
token::create(
|
||||
&server.api_key,
|
||||
&server.secret_key,
|
||||
Some(identity),
|
||||
token::VideoGrant::for_guest(room),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct RoomState {
|
||||
url: String,
|
||||
token: String,
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
local_identity: ParticipantIdentity,
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
connection_state: ConnectionState,
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
paused_audio_tracks: HashSet<TrackSid>,
|
||||
updates_tx: mpsc::Sender<RoomEvent>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Room(Arc<Mutex<RoomState>>);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct WeakRoom(Weak<Mutex<RoomState>>);
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl std::fmt::Debug for RoomState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Room")
|
||||
.field("url", &self.url)
|
||||
.field("token", &self.token)
|
||||
.field("local_identity", &self.local_identity)
|
||||
.field("connection_state", &self.connection_state)
|
||||
.field("paused_audio_tracks", &self.paused_audio_tracks)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
impl std::fmt::Debug for RoomState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Room")
|
||||
.field("url", &self.url)
|
||||
.field("token", &self.token)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl Room {
|
||||
fn downgrade(&self) -> WeakRoom {
|
||||
WeakRoom(Arc::downgrade(&self.0))
|
||||
}
|
||||
|
||||
pub fn connection_state(&self) -> ConnectionState {
|
||||
self.0.lock().connection_state
|
||||
}
|
||||
|
||||
pub fn local_participant(&self) -> LocalParticipant {
|
||||
let identity = self.0.lock().local_identity.clone();
|
||||
LocalParticipant {
|
||||
identity,
|
||||
room: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn connect(
|
||||
url: &str,
|
||||
token: &str,
|
||||
_options: RoomOptions,
|
||||
) -> Result<(Self, mpsc::Receiver<RoomEvent>)> {
|
||||
let server = TestServer::get(&url)?;
|
||||
let (updates_tx, updates_rx) = mpsc::channel(1024);
|
||||
let this = Self(Arc::new(Mutex::new(RoomState {
|
||||
local_identity: ParticipantIdentity(String::new()),
|
||||
url: url.to_string(),
|
||||
token: token.to_string(),
|
||||
connection_state: ConnectionState::Disconnected,
|
||||
paused_audio_tracks: Default::default(),
|
||||
updates_tx,
|
||||
})));
|
||||
|
||||
let identity = server
|
||||
.join_room(token.to_string(), this.clone())
|
||||
.await
|
||||
.context("room join")?;
|
||||
{
|
||||
let mut state = this.0.lock();
|
||||
state.local_identity = identity;
|
||||
state.connection_state = ConnectionState::Connected;
|
||||
}
|
||||
|
||||
Ok((this, updates_rx))
|
||||
}
|
||||
|
||||
pub fn remote_participants(&self) -> HashMap<ParticipantIdentity, RemoteParticipant> {
|
||||
self.test_server()
|
||||
.remote_participants(self.0.lock().token.clone())
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn test_server(&self) -> Arc<TestServer> {
|
||||
TestServer::get(&self.0.lock().url).unwrap()
|
||||
}
|
||||
|
||||
fn token(&self) -> String {
|
||||
self.0.lock().token.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl Drop for RoomState {
|
||||
fn drop(&mut self) {
|
||||
if self.connection_state == ConnectionState::Connected {
|
||||
if let Ok(server) = TestServer::get(&self.url) {
|
||||
let executor = server.executor.clone();
|
||||
let token = self.token.clone();
|
||||
executor
|
||||
.spawn(async move { server.leave_room(token).await.ok() })
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WeakRoom {
|
||||
fn upgrade(&self) -> Option<Room> {
|
||||
self.0.upgrade().map(Room)
|
||||
}
|
||||
}
|
111
crates/livekit_client/src/test/participant.rs
Normal file
111
crates/livekit_client/src/test/participant.rs
Normal file
|
@ -0,0 +1,111 @@
|
|||
use super::*;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Participant {
|
||||
Local(LocalParticipant),
|
||||
Remote(RemoteParticipant),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LocalParticipant {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(super) identity: ParticipantIdentity,
|
||||
pub(super) room: Room,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteParticipant {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(super) identity: ParticipantIdentity,
|
||||
pub(super) room: WeakRoom,
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl Participant {
|
||||
pub fn identity(&self) -> ParticipantIdentity {
|
||||
match self {
|
||||
Participant::Local(participant) => participant.identity.clone(),
|
||||
Participant::Remote(participant) => participant.identity.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl LocalParticipant {
|
||||
pub async fn unpublish_track(&self, track: &TrackSid) -> Result<()> {
|
||||
self.room
|
||||
.test_server()
|
||||
.unpublish_track(self.room.token(), track)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn publish_track(
|
||||
&self,
|
||||
track: LocalTrack,
|
||||
_options: TrackPublishOptions,
|
||||
) -> Result<LocalTrackPublication> {
|
||||
let this = self.clone();
|
||||
let track = track.clone();
|
||||
let server = this.room.test_server();
|
||||
let sid = match track {
|
||||
LocalTrack::Video(track) => {
|
||||
server.publish_video_track(this.room.token(), track).await?
|
||||
}
|
||||
LocalTrack::Audio(track) => {
|
||||
server
|
||||
.publish_audio_track(this.room.token(), &track)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
Ok(LocalTrackPublication {
|
||||
room: self.room.downgrade(),
|
||||
sid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl RemoteParticipant {
|
||||
pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
let server = room.test_server();
|
||||
let audio = server
|
||||
.audio_tracks(room.token())
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|track| track.publisher_id() == self.identity)
|
||||
.map(|track| {
|
||||
(
|
||||
track.sid(),
|
||||
RemoteTrackPublication {
|
||||
sid: track.sid(),
|
||||
room: self.room.clone(),
|
||||
track: RemoteTrack::Audio(track),
|
||||
},
|
||||
)
|
||||
});
|
||||
let video = server
|
||||
.video_tracks(room.token())
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|track| track.publisher_id() == self.identity)
|
||||
.map(|track| {
|
||||
(
|
||||
track.sid(),
|
||||
RemoteTrackPublication {
|
||||
sid: track.sid(),
|
||||
room: self.room.clone(),
|
||||
track: RemoteTrack::Video(track),
|
||||
},
|
||||
)
|
||||
});
|
||||
audio.chain(video).collect()
|
||||
} else {
|
||||
HashMap::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn identity(&self) -> ParticipantIdentity {
|
||||
self.identity.clone()
|
||||
}
|
||||
}
|
116
crates/livekit_client/src/test/publication.rs
Normal file
116
crates/livekit_client/src/test/publication.rs
Normal file
|
@ -0,0 +1,116 @@
|
|||
use super::*;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum TrackPublication {
|
||||
Local(LocalTrackPublication),
|
||||
Remote(RemoteTrackPublication),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LocalTrackPublication {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(crate) sid: TrackSid,
|
||||
pub(crate) room: WeakRoom,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteTrackPublication {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(crate) sid: TrackSid,
|
||||
pub(crate) room: WeakRoom,
|
||||
pub(crate) track: RemoteTrack,
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl TrackPublication {
|
||||
pub fn sid(&self) -> TrackSid {
|
||||
match self {
|
||||
TrackPublication::Local(track) => track.sid(),
|
||||
TrackPublication::Remote(track) => track.sid(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_muted(&self) -> bool {
|
||||
match self {
|
||||
TrackPublication::Local(track) => track.is_muted(),
|
||||
TrackPublication::Remote(track) => track.is_muted(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl LocalTrackPublication {
|
||||
pub fn sid(&self) -> TrackSid {
|
||||
self.sid.clone()
|
||||
}
|
||||
|
||||
pub fn mute(&self) {
|
||||
self.set_mute(true)
|
||||
}
|
||||
|
||||
pub fn unmute(&self) {
|
||||
self.set_mute(false)
|
||||
}
|
||||
|
||||
fn set_mute(&self, mute: bool) {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
room.test_server()
|
||||
.set_track_muted(&room.token(), &self.sid, mute)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_muted(&self) -> bool {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
room.test_server()
|
||||
.is_track_muted(&room.token(), &self.sid)
|
||||
.unwrap_or(false)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl RemoteTrackPublication {
|
||||
pub fn sid(&self) -> TrackSid {
|
||||
self.sid.clone()
|
||||
}
|
||||
|
||||
pub fn track(&self) -> Option<RemoteTrack> {
|
||||
Some(self.track.clone())
|
||||
}
|
||||
|
||||
pub fn kind(&self) -> TrackKind {
|
||||
self.track.kind()
|
||||
}
|
||||
|
||||
pub fn is_muted(&self) -> bool {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
room.test_server()
|
||||
.is_track_muted(&room.token(), &self.sid)
|
||||
.unwrap_or(false)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_enabled(&self) -> bool {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
!room.0.lock().paused_audio_tracks.contains(&self.sid)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_enabled(&self, enabled: bool) {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
let paused_audio_tracks = &mut room.0.lock().paused_audio_tracks;
|
||||
if enabled {
|
||||
paused_audio_tracks.remove(&self.sid);
|
||||
} else {
|
||||
paused_audio_tracks.insert(self.sid.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
201
crates/livekit_client/src/test/track.rs
Normal file
201
crates/livekit_client/src/test/track.rs
Normal file
|
@ -0,0 +1,201 @@
|
|||
use super::*;
|
||||
#[cfg(not(windows))]
|
||||
use webrtc::{audio_source::RtcAudioSource, video_source::RtcVideoSource};
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub use livekit::track::{TrackKind, TrackSource};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum LocalTrack {
|
||||
Audio(LocalAudioTrack),
|
||||
Video(LocalVideoTrack),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum RemoteTrack {
|
||||
Audio(RemoteAudioTrack),
|
||||
Video(RemoteVideoTrack),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LocalVideoTrack {}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LocalAudioTrack {}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteVideoTrack {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(super) server_track: Arc<TestServerVideoTrack>,
|
||||
pub(super) _room: WeakRoom,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteAudioTrack {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(super) server_track: Arc<TestServerAudioTrack>,
|
||||
pub(super) room: WeakRoom,
|
||||
}
|
||||
|
||||
pub enum RtcTrack {
|
||||
Audio(RtcAudioTrack),
|
||||
Video(RtcVideoTrack),
|
||||
}
|
||||
|
||||
pub struct RtcAudioTrack {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(super) server_track: Arc<TestServerAudioTrack>,
|
||||
pub(super) room: WeakRoom,
|
||||
}
|
||||
|
||||
pub struct RtcVideoTrack {
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
pub(super) _server_track: Arc<TestServerVideoTrack>,
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl RemoteTrack {
|
||||
pub fn sid(&self) -> TrackSid {
|
||||
match self {
|
||||
RemoteTrack::Audio(track) => track.sid(),
|
||||
RemoteTrack::Video(track) => track.sid(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn kind(&self) -> TrackKind {
|
||||
match self {
|
||||
RemoteTrack::Audio(_) => TrackKind::Audio,
|
||||
RemoteTrack::Video(_) => TrackKind::Video,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn publisher_id(&self) -> ParticipantIdentity {
|
||||
match self {
|
||||
RemoteTrack::Audio(track) => track.publisher_id(),
|
||||
RemoteTrack::Video(track) => track.publisher_id(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rtc_track(&self) -> RtcTrack {
|
||||
match self {
|
||||
RemoteTrack::Audio(track) => RtcTrack::Audio(track.rtc_track()),
|
||||
RemoteTrack::Video(track) => RtcTrack::Video(track.rtc_track()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
impl LocalVideoTrack {
|
||||
pub fn create_video_track(_name: &str, _source: RtcVideoSource) -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
impl LocalAudioTrack {
|
||||
pub fn create_audio_track(_name: &str, _source: RtcAudioSource) -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl RemoteAudioTrack {
|
||||
pub fn sid(&self) -> TrackSid {
|
||||
self.server_track.sid.clone()
|
||||
}
|
||||
|
||||
pub fn publisher_id(&self) -> ParticipantIdentity {
|
||||
self.server_track.publisher_id.clone()
|
||||
}
|
||||
|
||||
pub fn start(&self) {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
room.0
|
||||
.lock()
|
||||
.paused_audio_tracks
|
||||
.remove(&self.server_track.sid);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
room.0
|
||||
.lock()
|
||||
.paused_audio_tracks
|
||||
.insert(self.server_track.sid.clone());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rtc_track(&self) -> RtcAudioTrack {
|
||||
RtcAudioTrack {
|
||||
server_track: self.server_track.clone(),
|
||||
room: self.room.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl RemoteVideoTrack {
|
||||
pub fn sid(&self) -> TrackSid {
|
||||
self.server_track.sid.clone()
|
||||
}
|
||||
|
||||
pub fn publisher_id(&self) -> ParticipantIdentity {
|
||||
self.server_track.publisher_id.clone()
|
||||
}
|
||||
|
||||
pub fn rtc_track(&self) -> RtcVideoTrack {
|
||||
RtcVideoTrack {
|
||||
_server_track: self.server_track.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl RtcTrack {
|
||||
pub fn enabled(&self) -> bool {
|
||||
match self {
|
||||
RtcTrack::Audio(track) => track.enabled(),
|
||||
RtcTrack::Video(track) => track.enabled(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_enabled(&self, enabled: bool) {
|
||||
match self {
|
||||
RtcTrack::Audio(track) => track.set_enabled(enabled),
|
||||
RtcTrack::Video(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
impl RtcAudioTrack {
|
||||
pub fn set_enabled(&self, enabled: bool) {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
let paused_audio_tracks = &mut room.0.lock().paused_audio_tracks;
|
||||
if enabled {
|
||||
paused_audio_tracks.remove(&self.server_track.sid);
|
||||
} else {
|
||||
paused_audio_tracks.insert(self.server_track.sid.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enabled(&self) -> bool {
|
||||
if let Some(room) = self.room.upgrade() {
|
||||
!room
|
||||
.0
|
||||
.lock()
|
||||
.paused_audio_tracks
|
||||
.contains(&self.server_track.sid)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RtcVideoTrack {
|
||||
pub fn enabled(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
136
crates/livekit_client/src/test/webrtc.rs
Normal file
136
crates/livekit_client/src/test/webrtc.rs
Normal file
|
@ -0,0 +1,136 @@
|
|||
use super::track::{RtcAudioTrack, RtcVideoTrack};
|
||||
use futures::Stream;
|
||||
use livekit::webrtc as real;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub mod video_stream {
|
||||
use super::*;
|
||||
|
||||
pub mod native {
|
||||
use super::*;
|
||||
use real::video_frame::BoxVideoFrame;
|
||||
|
||||
pub struct NativeVideoStream {
|
||||
pub track: RtcVideoTrack,
|
||||
}
|
||||
|
||||
impl NativeVideoStream {
|
||||
pub fn new(track: RtcVideoTrack) -> Self {
|
||||
Self { track }
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for NativeVideoStream {
|
||||
type Item = BoxVideoFrame;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod audio_stream {
|
||||
use super::*;
|
||||
|
||||
pub mod native {
|
||||
use super::*;
|
||||
use real::audio_frame::AudioFrame;
|
||||
|
||||
pub struct NativeAudioStream {
|
||||
pub track: RtcAudioTrack,
|
||||
}
|
||||
|
||||
impl NativeAudioStream {
|
||||
pub fn new(track: RtcAudioTrack, _sample_rate: i32, _num_channels: i32) -> Self {
|
||||
Self { track }
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for NativeAudioStream {
|
||||
type Item = AudioFrame<'static>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod audio_source {
|
||||
use super::*;
|
||||
|
||||
pub use real::audio_source::AudioSourceOptions;
|
||||
|
||||
pub mod native {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use real::{audio_frame::AudioFrame, RtcError};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NativeAudioSource {
|
||||
pub options: Arc<AudioSourceOptions>,
|
||||
pub sample_rate: u32,
|
||||
pub num_channels: u32,
|
||||
}
|
||||
|
||||
impl NativeAudioSource {
|
||||
pub fn new(
|
||||
options: AudioSourceOptions,
|
||||
sample_rate: u32,
|
||||
num_channels: u32,
|
||||
_queue_size_ms: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
options: Arc::new(options),
|
||||
sample_rate,
|
||||
num_channels,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn capture_frame(&self, _frame: &AudioFrame<'_>) -> Result<(), RtcError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum RtcAudioSource {
|
||||
Native(native::NativeAudioSource),
|
||||
}
|
||||
}
|
||||
|
||||
pub use livekit::webrtc::audio_frame;
|
||||
pub use livekit::webrtc::video_frame;
|
||||
|
||||
pub mod video_source {
|
||||
use super::*;
|
||||
pub use real::video_source::VideoResolution;
|
||||
|
||||
pub struct RTCVideoSource;
|
||||
|
||||
pub mod native {
|
||||
use super::*;
|
||||
use real::video_frame::{VideoBuffer, VideoFrame};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NativeVideoSource {
|
||||
pub resolution: VideoResolution,
|
||||
}
|
||||
|
||||
impl NativeVideoSource {
|
||||
pub fn new(resolution: super::VideoResolution) -> Self {
|
||||
Self { resolution }
|
||||
}
|
||||
|
||||
pub fn capture_frame<T: AsRef<dyn VideoBuffer>>(&self, _frame: &VideoFrame<T>) {}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum RtcVideoSource {
|
||||
Native(native::NativeVideoSource),
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue