ZIm/crates/livekit_client/src/livekit_client.rs
Maksim Bondarenkov 64a5153bb5
collab: Make unsupported for MinGW toolchain (#23518)
Closes #23451

reverts #23117 for MinGW. collab can't be compiled for MinGW because
webrtc itself doesn't support MinGW compilers

Release Notes:

- N/A
2025-01-26 11:01:20 +02:00

679 lines
23 KiB
Rust

#![cfg_attr(all(target_os = "windows", target_env = "gnu"), allow(unused))]
mod remote_video_track_view;
#[cfg(any(
test,
feature = "test-support",
all(target_os = "windows", target_env = "gnu")
))]
pub mod test;
use anyhow::{anyhow, Context as _, Result};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
use futures::{io, Stream, StreamExt as _};
use gpui::{
BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
};
use parking_lot::Mutex;
use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
use util::{debug_panic, ResultExt as _};
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
use webrtc::{
audio_frame::AudioFrame,
audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
audio_stream::native::NativeAudioStream,
video_frame::{VideoBuffer, VideoFrame, VideoRotation},
video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
video_stream::native::NativeVideoStream,
};
#[cfg(all(
not(any(test, feature = "test-support")),
not(all(target_os = "windows", target_env = "gnu"))
))]
use livekit::track::RemoteAudioTrack;
#[cfg(all(
not(any(test, feature = "test-support")),
not(all(target_os = "windows", target_env = "gnu"))
))]
pub use livekit::*;
#[cfg(any(
test,
feature = "test-support",
all(target_os = "windows", target_env = "gnu")
))]
use test::track::RemoteAudioTrack;
#[cfg(any(
test,
feature = "test-support",
all(target_os = "windows", target_env = "gnu")
))]
pub use test::*;
pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
pub enum AudioStream {
Input {
_thread_handle: std::sync::mpsc::Sender<()>,
_transmit_task: Task<()>,
},
Output {
_task: Task<()>,
},
}
struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
impl livekit::dispatcher::Dispatcher for Dispatcher {
fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
self.0.dispatch(runnable, None);
}
fn dispatch_after(
&self,
duration: std::time::Duration,
runnable: livekit::dispatcher::Runnable,
) {
self.0.dispatch_after(duration, runnable);
}
}
struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
http_2::StatusCode::from_u16(status.as_u16())
.expect("valid status code to status code conversion")
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
impl livekit::dispatcher::HttpClient for HttpClientAdapter {
fn get(
&self,
url: &str,
) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
let http_client = self.0.clone();
let url = url.to_string();
Box::pin(async move {
let response = http_client
.get(&url, http_client::AsyncBody::empty(), false)
.await
.map_err(io::Error::other)?;
Ok(livekit::dispatcher::Response {
status: http_2_status(response.status()),
body: Box::pin(response.into_body()),
})
})
}
fn send_async(
&self,
request: http_2::Request<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
let http_client = self.0.clone();
let mut builder = http_client::http::Request::builder()
.method(request.method().as_str())
.uri(request.uri().to_string());
for (key, value) in request.headers().iter() {
builder = builder.header(key.as_str(), value.as_bytes());
}
if !request.extensions().is_empty() {
debug_panic!(
"Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
);
}
let request = builder
.body(http_client::AsyncBody::from_bytes(
request.into_body().into(),
))
.unwrap();
Box::pin(async move {
let response = http_client.send(request).await.map_err(io::Error::other)?;
Ok(livekit::dispatcher::Response {
status: http_2_status(response.status()),
body: Box::pin(response.into_body()),
})
})
}
}
#[cfg(all(target_os = "windows", target_env = "gnu"))]
pub fn init(
dispatcher: Arc<dyn gpui::PlatformDispatcher>,
http_client: Arc<dyn http_client::HttpClient>,
) {
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub fn init(
dispatcher: Arc<dyn gpui::PlatformDispatcher>,
http_client: Arc<dyn http_client::HttpClient>,
) {
livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub async fn capture_local_video_track(
capture_source: &dyn ScreenCaptureSource,
) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
let resolution = capture_source.resolution()?;
let track_source = NativeVideoSource::new(VideoResolution {
width: resolution.width.0 as u32,
height: resolution.height.0 as u32,
});
let capture_stream = capture_source
.stream({
let track_source = track_source.clone();
Box::new(move |frame| {
if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
track_source.capture_frame(&VideoFrame {
rotation: VideoRotation::VideoRotation0,
timestamp_us: 0,
buffer,
});
}
})
})
.await??;
Ok((
track::LocalVideoTrack::create_video_track(
"screen share",
RtcVideoSource::Native(track_source),
),
capture_stream,
))
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub fn capture_local_audio_track(
background_executor: &BackgroundExecutor,
) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
use util::maybe;
let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
let sample_rate;
let channels;
if cfg!(any(test, feature = "test-support")) {
sample_rate = 2;
channels = 1;
} else {
let (device, config) = default_device(true)?;
sample_rate = config.sample_rate().0;
channels = config.channels() as u32;
thread::spawn(move || {
maybe!({
if let Some(name) = device.name().ok() {
log::info!("Using microphone: {}", name)
} else {
log::info!("Using microphone: <unknown>");
}
let stream = device
.build_input_stream_raw(
&config.config(),
cpal::SampleFormat::I16,
move |data, _: &_| {
frame_tx
.unbounded_send(AudioFrame {
data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
sample_rate,
num_channels: channels,
samples_per_channel: data.len() as u32 / channels,
})
.ok();
},
|err| log::error!("error capturing audio track: {:?}", err),
None,
)
.context("failed to build input stream")?;
stream.play()?;
// Keep the thread alive and holding onto the `stream`
thread_kill_rx.recv().ok();
anyhow::Ok(Some(()))
})
.log_err();
});
}
Ok(background_executor.spawn({
let background_executor = background_executor.clone();
async move {
let source = NativeAudioSource::new(
AudioSourceOptions {
echo_cancellation: true,
noise_suppression: true,
auto_gain_control: true,
},
sample_rate,
channels,
100,
);
let transmit_task = background_executor.spawn({
let source = source.clone();
async move {
while let Some(frame) = frame_rx.next().await {
source.capture_frame(&frame).await.log_err();
}
}
});
let track = track::LocalAudioTrack::create_audio_track(
"microphone",
RtcAudioSource::Native(source),
);
(
track,
AudioStream::Input {
_thread_handle: thread_handle,
_transmit_task: transmit_task,
},
)
}
}))
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub fn play_remote_audio_track(
track: &RemoteAudioTrack,
background_executor: &BackgroundExecutor,
) -> Result<AudioStream> {
let track = track.clone();
// We track device changes in our output because Livekit has a resampler built in,
// and it's easy to create a new native audio stream when the device changes.
if cfg!(any(test, feature = "test-support")) {
Ok(AudioStream::Output {
_task: background_executor.spawn(async {}),
})
} else {
let mut default_change_listener = DeviceChangeListener::new(false)?;
let (output_device, output_config) = default_device(false)?;
let _task = background_executor.spawn({
let background_executor = background_executor.clone();
async move {
let (mut _receive_task, mut _thread) =
start_output_stream(output_config, output_device, &track, &background_executor);
while let Some(_) = default_change_listener.next().await {
let Some((output_device, output_config)) = get_default_output().log_err()
else {
continue;
};
if let Ok(name) = output_device.name() {
log::info!("Using speaker: {}", name)
} else {
log::info!("Using speaker: <unknown>")
}
(_receive_task, _thread) = start_output_stream(
output_config,
output_device,
&track,
&background_executor,
);
}
futures::future::pending::<()>().await;
}
});
Ok(AudioStream::Output { _task })
}
}
fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
let device;
let config;
if input {
device = cpal::default_host()
.default_input_device()
.ok_or_else(|| anyhow!("no audio input device available"))?;
config = device
.default_input_config()
.context("failed to get default input config")?;
} else {
device = cpal::default_host()
.default_output_device()
.ok_or_else(|| anyhow!("no audio output device available"))?;
config = device
.default_output_config()
.context("failed to get default output config")?;
}
Ok((device, config))
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
let host = cpal::default_host();
let output_device = host
.default_output_device()
.context("failed to read default output device")?;
let output_config = output_device.default_output_config()?;
Ok((output_device, output_config))
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
fn start_output_stream(
output_config: cpal::SupportedStreamConfig,
output_device: cpal::Device,
track: &track::RemoteAudioTrack,
background_executor: &BackgroundExecutor,
) -> (Task<()>, std::sync::mpsc::Sender<()>) {
let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
let sample_rate = output_config.sample_rate();
let mut stream = NativeAudioStream::new(
track.rtc_track(),
sample_rate.0 as i32,
output_config.channels() as i32,
);
let receive_task = background_executor.spawn({
let buffer = buffer.clone();
async move {
const MS_OF_BUFFER: u32 = 100;
const MS_IN_SEC: u32 = 1000;
while let Some(frame) = stream.next().await {
let frame_size = frame.samples_per_channel * frame.num_channels;
debug_assert!(frame.data.len() == frame_size as usize);
let buffer_size =
((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
let mut buffer = buffer.lock();
let new_size = buffer.len() + frame.data.len();
if new_size > buffer_size {
let overflow = new_size - buffer_size;
buffer.drain(0..overflow);
}
buffer.extend(frame.data.iter());
}
}
});
// The _output_stream needs to be on it's own thread because it's !Send
// and we experienced a deadlock when it's created on the main thread.
let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
thread::spawn(move || {
if cfg!(any(test, feature = "test-support")) {
// Can't play audio in tests
return;
}
let output_stream = output_device.build_output_stream(
&output_config.config(),
{
let buffer = buffer.clone();
move |data, _info| {
let mut buffer = buffer.lock();
if buffer.len() < data.len() {
// Instead of partially filling a buffer, output silence. If a partial
// buffer was outputted then this could lead to a perpetual state of
// outputting partial buffers as it never gets filled enough for a full
// frame.
data.fill(0);
} else {
// SAFETY: We know that buffer has at least data.len() values in it.
// because we just checked
let mut drain = buffer.drain(..data.len());
data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
}
}
},
|error| log::error!("error playing audio track: {:?}", error),
None,
);
let Some(output_stream) = output_stream.log_err() else {
return;
};
output_stream.play().log_err();
// Block forever to keep the output stream alive
end_on_drop_rx.recv().ok();
});
(receive_task, thread)
}
#[cfg(all(target_os = "windows", target_env = "gnu"))]
pub fn play_remote_video_track(
track: &track::RemoteVideoTrack,
) -> impl Stream<Item = RemoteVideoFrame> {
futures::stream::empty()
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub fn play_remote_video_track(
track: &track::RemoteVideoTrack,
) -> impl Stream<Item = RemoteVideoFrame> {
NativeVideoStream::new(track.rtc_track())
.filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
}
#[cfg(target_os = "macos")]
pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
#[cfg(target_os = "macos")]
fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
use core_foundation::base::TCFType as _;
use media::core_video::CVImageBuffer;
let buffer = buffer.as_native()?;
let pixel_buffer = buffer.get_cv_pixel_buffer();
if pixel_buffer.is_null() {
return None;
}
unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
}
#[cfg(not(target_os = "macos"))]
pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
#[cfg(not(any(target_os = "macos", all(target_os = "windows", target_env = "gnu"))))]
fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
use gpui::RenderImage;
use image::{Frame, RgbaImage};
use livekit::webrtc::prelude::VideoFormatType;
use smallvec::SmallVec;
use std::alloc::{alloc, Layout};
let width = buffer.width();
let height = buffer.height();
let stride = width * 4;
let byte_len = (stride * height) as usize;
let argb_image = unsafe {
// Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
// will write all bytes anyway.
let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
if start_ptr.is_null() {
return None;
}
let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
buffer.to_argb(
VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
bgra_frame_slice,
stride,
width as i32,
height as i32,
);
Vec::from_raw_parts(start_ptr, byte_len, byte_len)
};
Some(Arc::new(RenderImage::new(SmallVec::from_elem(
Frame::new(
RgbaImage::from_raw(width, height, argb_image)
.with_context(|| "Bug: not enough bytes allocated for image.")
.log_err()?,
),
1,
))))
}
#[cfg(target_os = "macos")]
fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
use core_foundation::base::TCFType as _;
let pixel_buffer = frame.0.as_concrete_TypeRef();
std::mem::forget(frame.0);
unsafe {
Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
}
}
#[cfg(not(any(target_os = "macos", all(target_os = "windows", target_env = "gnu"))))]
fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
None as Option<Box<dyn VideoBuffer>>
}
trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
fn new(input: bool) -> Result<Self>;
}
#[cfg(target_os = "macos")]
mod macos {
use coreaudio::sys::{
kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
};
use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
use crate::DeviceChangeListenerApi;
/// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
pub struct CoreAudioDefaultDeviceChangeListener {
rx: UnboundedReceiver<()>,
callback: Box<PropertyListenerCallbackWrapper>,
input: bool,
}
trait _AssertSend: Send {}
impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
unsafe extern "C" fn property_listener_handler_shim(
_: AudioObjectID,
_: u32,
_: *const AudioObjectPropertyAddress,
callback: *mut ::std::os::raw::c_void,
) -> OSStatus {
let wrapper = callback as *mut PropertyListenerCallbackWrapper;
(*wrapper).0();
0
}
impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
fn new(input: bool) -> gpui::Result<Self> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
tx.unbounded_send(()).ok();
})));
unsafe {
coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
kAudioObjectSystemObject,
&AudioObjectPropertyAddress {
mSelector: if input {
kAudioHardwarePropertyDefaultInputDevice
} else {
kAudioHardwarePropertyDefaultOutputDevice
},
mScope: kAudioObjectPropertyScopeGlobal,
mElement: kAudioObjectPropertyElementMaster,
},
Some(property_listener_handler_shim),
&*callback as *const _ as *mut _,
))?;
}
Ok(Self {
rx,
callback,
input,
})
}
}
impl Drop for CoreAudioDefaultDeviceChangeListener {
fn drop(&mut self) {
unsafe {
AudioObjectRemovePropertyListener(
kAudioObjectSystemObject,
&AudioObjectPropertyAddress {
mSelector: if self.input {
kAudioHardwarePropertyDefaultInputDevice
} else {
kAudioHardwarePropertyDefaultOutputDevice
},
mScope: kAudioObjectPropertyScopeGlobal,
mElement: kAudioObjectPropertyElementMaster,
},
Some(property_listener_handler_shim),
&*self.callback as *const _ as *mut _,
);
}
}
}
impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
type Item = ();
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.rx.poll_next_unpin(cx)
}
}
}
#[cfg(target_os = "macos")]
type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
#[cfg(not(target_os = "macos"))]
mod noop_change_listener {
use std::task::Poll;
use crate::DeviceChangeListenerApi;
pub struct NoopOutputDeviceChangelistener {}
impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
fn new(_input: bool) -> anyhow::Result<Self> {
Ok(NoopOutputDeviceChangelistener {})
}
}
impl futures::Stream for NoopOutputDeviceChangelistener {
type Item = ();
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
}
#[cfg(not(target_os = "macos"))]
type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;