diff --git a/Cargo.lock b/Cargo.lock index f0481db850..6f901fe829 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1381,6 +1381,8 @@ dependencies = [ "anyhow", "collections", "gpui", + "libwebrtc", + "parking_lot", "rodio", "schemars", "serde", @@ -2681,7 +2683,7 @@ dependencies = [ "cap-primitives", "cap-std", "io-lifetimes", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2710,7 +2712,7 @@ dependencies = [ "maybe-owned", "rustix 1.0.7", "rustix-linux-procfs", - "windows-sys 0.59.0", + "windows-sys 0.52.0", "winx", ] @@ -5327,7 +5329,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -5714,7 +5716,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", "rustix 1.0.7", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -6092,7 +6094,7 @@ checksum = "94e7099f6313ecacbe1256e8ff9d617b75d1bcb16a6fddef94866d225a01a14a" dependencies = [ "io-lifetimes", "rustix 1.0.7", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -8551,7 +8553,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2285ddfe3054097ef4b2fe909ef8c3bcd1ea52a8f0d274416caebeef39f04a65" dependencies = [ "io-lifetimes", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -8624,7 +8626,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi 0.5.0", "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -8706,7 +8708,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -9406,7 +9408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -13062,7 +13064,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -13874,7 +13876,7 @@ dependencies = [ [[package]] name = "rodio" version = "0.21.1" -source = "git+https://github.com/RustAudio/rodio?branch=microphone#bb560f30b17d330459b81afc918f2a4a123c41aa" +source = "git+https://github.com/RustAudio/rodio?branch=microphone#cad73716a363a5ba92fcb73ec37a4b98a7d7de5f" dependencies = [ "cpal", "dasp_sample", @@ -14104,7 +14106,7 @@ dependencies = [ "itoa", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -14117,7 +14119,7 @@ dependencies = [ "errno 0.3.11", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -14239,7 +14241,7 @@ dependencies = [ "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -15591,7 +15593,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -16249,7 +16251,7 @@ dependencies = [ "fd-lock", "io-lifetimes", "rustix 0.38.44", - "windows-sys 0.59.0", + "windows-sys 0.52.0", "winx", ] @@ -16431,7 +16433,7 @@ dependencies = [ "getrandom 0.3.2", "once_cell", "rustix 1.0.7", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -18993,7 +18995,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -19652,7 +19654,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f3fd376f71958b862e7afb20cfe5a22830e1963462f3a17f49d82a6c1d1f42d" dependencies = [ "bitflags 2.9.0", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] diff --git a/crates/audio/Cargo.toml b/crates/audio/Cargo.toml index ae7eb52fd3..8826b5f49c 100644 --- a/crates/audio/Cargo.toml +++ b/crates/audio/Cargo.toml @@ -19,6 +19,10 @@ gpui.workspace = true settings.workspace = true schemars.workspace = true serde.workspace = true +parking_lot.workspace = true rodio = { workspace = true, features = [ "wav", "playback", "tracing" ] } util.workspace = true workspace-hack.workspace = true + +[target.'cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))'.dependencies] +libwebrtc = { rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d", git = "https://github.com/zed-industries/livekit-rust-sdks" } diff --git a/crates/audio/src/audio.rs b/crates/audio/src/audio.rs index b4f2c24fef..a5a4e721d3 100644 --- a/crates/audio/src/audio.rs +++ b/crates/audio/src/audio.rs @@ -1,12 +1,15 @@ use anyhow::{Context as _, Result, anyhow}; use collections::HashMap; use gpui::{App, BorrowAppContext, Global}; -use rodio::{Decoder, OutputStream, OutputStreamBuilder, Source, source::Buffered}; +use libwebrtc::native::apm; +use parking_lot::Mutex; +use rodio::{Decoder, OutputStream, OutputStreamBuilder, Source, mixer::Mixer, source::Buffered}; use settings::Settings; -use std::io::Cursor; +use std::{io::Cursor, sync::Arc}; use util::ResultExt; mod audio_settings; +mod rodio_ext; pub use audio_settings::AudioSettings; pub fn init(cx: &mut App) { @@ -38,18 +41,46 @@ impl Sound { } } -#[derive(Default)] pub struct Audio { output_handle: Option, + output_mixer: Option, + echo_canceller: Arc>, source_cache: HashMap>>>>, } +impl Default for Audio { + fn default() -> Self { + Self { + output_handle: Default::default(), + output_mixer: Default::default(), + echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new( + true, false, false, false, + ))), + source_cache: Default::default(), + } + } +} + impl Global for Audio {} impl Audio { fn ensure_output_exists(&mut self) -> Option<&OutputStream> { if self.output_handle.is_none() { self.output_handle = OutputStreamBuilder::open_default_stream().log_err(); + if let Some(output_handle) = self.output_handle { + let config = output_handle.config(); + let (mixer, source) = + rodio::mixer::mixer(config.channel_count(), config.sample_rate()); + self.output_mixer = Some(mixer); + + let echo_canceller = Arc::clone(&self.echo_canceller); + let source = source.inspect_buffered( + |buffer| echo_canceller.lock().process_reverse_stream(&mut buf), + config.sample_rate().get() as i32, + config.channel_count().get().into(), + ); + output_handle.mixer().add(source); + } } self.output_handle.as_ref() @@ -72,6 +103,7 @@ impl Audio { cx.update_default_global(|this: &mut Self, cx| { let source = this.sound_source(sound, cx).log_err()?; let output_handle = this.ensure_output_exists()?; + output_handle.mixer().add(source); Some(()) }); diff --git a/crates/audio/src/rodio_ext.rs b/crates/audio/src/rodio_ext.rs new file mode 100644 index 0000000000..055ff9003a --- /dev/null +++ b/crates/audio/src/rodio_ext.rs @@ -0,0 +1,93 @@ +use rodio::Source; + +pub trait RodioExt: Source + Sized { + fn process_buffer(self, callback: F) -> ProcessBuffer + where + F: FnMut(&mut [rodio::Sample; N]); + fn inspect_buffer(self, callback: F) -> ProcessBuffer + where + F: FnMut(&[rodio::Sample; N]); +} + +impl RodioExt for S { + fn process_buffer(self, callback: F) -> ProcessBuffer + where + F: FnMut(&mut [rodio::Sample; N]), + { + ProcessBuffer { + inner: self, + callback, + buffer: [0.0; N], + next: N, + } + } + fn inspect_buffer(self, callback: F) -> ProcessBuffer + where + F: FnMut(&[rodio::Sample; N]), + { + InspectBuffer { + inner: self, + callback, + buffer: [0.0; N], + next: N, + } + } +} + +pub struct ProcessBuffer +where + S: Source + Sized, + F: FnMut(&mut [rodio::Sample; N]), +{ + inner: S, + callback: F, + buffer: [rodio::Sample; N], + next: usize, +} + +impl Iterator for ProcessBuffer +where + S: Source + Sized, + F: FnMut(&mut [rodio::Sample; N]), +{ + type Item = rodio::Sample; + + fn next(&mut self) -> Option { + self.next += 1; + if self.next < self.buffer.len() { + let sample = self.buffer[self.next]; + return Some(sample); + } + + for sample in &mut self.buffer { + *sample = self.inner.next()? + } + (self.callback)(&mut self.buffer); + + self.next = 0; + Some(self.buffer[0]) + } +} + +impl Source for ProcessBuffer +where + S: Source + Sized, + F: FnMut(&mut [rodio::Sample; N]), +{ + fn current_span_len(&self) -> Option { + // TODO dvdsk this should be a spanless Source + None + } + + fn channels(&self) -> rodio::ChannelCount { + self.inner.channels() + } + + fn sample_rate(&self) -> rodio::SampleRate { + self.inner.sample_rate() + } + + fn total_duration(&self) -> Option { + self.inner.total_duration() + } +} diff --git a/crates/livekit_client/src/livekit_client.rs b/crates/livekit_client/src/livekit_client.rs index 0751b014f4..c14a29bee3 100644 --- a/crates/livekit_client/src/livekit_client.rs +++ b/crates/livekit_client/src/livekit_client.rs @@ -99,7 +99,7 @@ impl Room { &self, cx: &mut AsyncApp, ) -> Result<(LocalTrackPublication, playback::AudioStream)> { - let (track, stream) = self.playback.capture_local_microphone_track()?; + let (track, stream) = self.playback.capture_local_microphone_track(&cx)?; let publication = self .local_participant() .publish_track( diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index b908c27ed2..8fa0e7929a 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -1,12 +1,14 @@ use anyhow::{Context as _, Result}; +use audio::AudioSettings; use cpal::Sample; use cpal::traits::{DeviceTrait, StreamTrait as _}; use dasp_sample::ToSample; use futures::channel::mpsc::UnboundedSender; use futures::{Stream, StreamExt as _}; use gpui::{ - BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task, + AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, + Task, }; use libwebrtc::native::{apm, audio_mixer, audio_resampler}; use livekit::track; @@ -19,9 +21,11 @@ use livekit::webrtc::{ video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource}, video_stream::native::NativeVideoStream, }; +use log::info; use parking_lot::Mutex; use rodio::Source; use rodio::source::{LimitSettings, UniformSourceIterator}; +use settings::Settings; use std::cell::RefCell; use std::num::NonZero; use std::sync::Weak; @@ -45,8 +49,8 @@ pub(crate) struct AudioStack { // 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up" // for audio output devices like speakers/bluetooth, we just hard-code // this; and downsample when we need to. -const SAMPLE_RATE: u32 = 48000; -const NUM_CHANNELS: u32 = 2; +const SAMPLE_RATE: NonZero = NonZero::new(48000).expect("not zero"); +const NUM_CHANNELS: NonZero = NonZero::new(2).expect("not zero"); pub(crate) fn play_remote_audio_track( track: &livekit::track::RemoteAudioTrack, @@ -55,6 +59,7 @@ pub(crate) fn play_remote_audio_track( let stop_handle = Arc::new(AtomicBool::new(false)); let stop_handle_clone = stop_handle.clone(); let stream = source::LiveKitStream::new(cx.background_executor(), track) + .process_buffer(|| apm) .stoppable() .periodic_access(Duration::from_millis(50), move |s| { if stop_handle.load(Ordering::Relaxed) { @@ -95,8 +100,8 @@ impl AudioStack { let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed); let source = AudioMixerSource { ssrc: next_ssrc, - sample_rate: SAMPLE_RATE, - num_channels: NUM_CHANNELS, + sample_rate: SAMPLE_RATE.get(), + num_channels: NUM_CHANNELS.get() as u32, buffer: Arc::default(), }; self.mixer.lock().add_source(source.clone()); @@ -136,7 +141,7 @@ impl AudioStack { let apm = self.apm.clone(); let mixer = self.mixer.clone(); async move { - Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS) + Self::play_output(apm, mixer, SAMPLE_RATE.get(), NUM_CHANNELS.get().into()) .await .log_err(); } @@ -147,12 +152,13 @@ impl AudioStack { pub(crate) fn capture_local_microphone_track( &self, + cx: &AsyncApp, ) -> Result<(crate::LocalAudioTrack, AudioStream)> { let source = NativeAudioSource::new( // n.b. this struct's options are always ignored, noise cancellation is provided by apm. AudioSourceOptions::default(), - SAMPLE_RATE, - NUM_CHANNELS, + SAMPLE_RATE.get(), + NUM_CHANNELS.get().into(), 10, ); @@ -171,8 +177,16 @@ impl AudioStack { } } }); + let rodio_pipeline = + AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or(false); let capture_task = self.executor.spawn(async move { - Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await + if rodio_pipeline { + info!("Using experimental.rodio_audio audio pipeline"); + Self::capture_input_rodio(apm, frame_tx).await + } else { + Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), NUM_CHANNELS.get().into()) + .await + } }); let on_drop = util::defer(|| { @@ -262,12 +276,11 @@ impl AudioStack { async fn capture_input_rodio( apm: Arc>, frame_tx: UnboundedSender>, - sample_rate: u32, - num_channels: u32, ) -> Result<()> { use crate::livekit_client::playback::source::RodioExt; const NUM_CHANNELS: usize = 1; - const LIVEKIT_BUFFER_SIZE: usize = (SAMPLE_RATE as usize / 100) * NUM_CHANNELS as usize; + const LIVEKIT_BUFFER_SIZE: usize = + (SAMPLE_RATE.get() as usize / 100) * NUM_CHANNELS as usize; let (stream_error_tx, stream_error_rx) = channel(); @@ -275,18 +288,31 @@ impl AudioStack { let stream = rodio::microphone::MicrophoneBuilder::new() .default_device()? .default_config()? + .prefer_sample_rates([ + SAMPLE_RATE, + SAMPLE_RATE.saturating_mul(NonZero::new(2).expect("not zero")), + ]) + .prefer_channel_counts([ + NonZero::new(1).expect("not zero"), + NonZero::new(2).expect("not zero"), + ]) + .prefer_buffer_sizes(512..) .open_stream()?; let mut stream = UniformSourceIterator::new( stream, NonZero::new(1).expect("1 is not zero"), - NonZero::new(SAMPLE_RATE).expect("constant is not zero"), + SAMPLE_RATE, ) .limit(LimitSettings::live_performance()) .process_buffer::(|buffer| { let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); if let Err(e) = apm .lock() - .process_stream(&mut int_buffer, SAMPLE_RATE as i32, NUM_CHANNELS as i32) + .process_stream( + &mut int_buffer, + SAMPLE_RATE.get() as i32, + NUM_CHANNELS as i32, + ) .context("livekit audio processor error") { let _ = stream_error_tx.send(e); @@ -299,7 +325,7 @@ impl AudioStack { .automatic_gain_control(1.0, 4.0, 0.0, 5.0); loop { - let sampled = stream + let sampled: Vec<_> = stream .by_ref() .take(LIVEKIT_BUFFER_SIZE) .map(|s| s.to_sample()) @@ -315,10 +341,10 @@ impl AudioStack { frame_tx .unbounded_send(AudioFrame { + sample_rate: SAMPLE_RATE.get(), + num_channels: NUM_CHANNELS as u32, + samples_per_channel: sampled.len() as u32 / NUM_CHANNELS as u32, data: Cow::Owned(sampled), - sample_rate, - num_channels, - samples_per_channel: sample_rate / 100, }) .context("Failed to send audio frame")? } @@ -419,6 +445,8 @@ impl AudioStack { } } +use crate::livekit_client::playback::source::RodioExt; + use super::LocalVideoTrack; pub enum AudioStream { diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index 7f9f5e1b2a..62d949f95d 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -26,8 +26,11 @@ pub struct LiveKitStream { impl LiveKitStream { pub fn new(executor: &gpui::BackgroundExecutor, track: &RemoteAudioTrack) -> Self { - let mut stream = - NativeAudioStream::new(track.rtc_track(), SAMPLE_RATE as i32, NUM_CHANNELS as i32); + let mut stream = NativeAudioStream::new( + track.rtc_track(), + SAMPLE_RATE.get() as i32, + NUM_CHANNELS.get().into(), + ); let (queue_input, queue_output) = rodio::queue::queue(true); // spawn rtc stream let receiver_task = executor.spawn({ @@ -71,81 +74,3 @@ impl Source for LiveKitStream { self.inner.total_duration() } } - -pub trait RodioExt: Source + Sized { - fn process_buffer(self, callback: F) -> ProcessBuffer - where - F: FnMut(&mut [rodio::Sample; N]); -} - -impl RodioExt for S { - fn process_buffer(self, callback: F) -> ProcessBuffer - where - F: FnMut(&mut [rodio::Sample; N]), - { - ProcessBuffer { - inner: self, - callback, - buffer: [0.0; N], - next: N, - } - } -} - -pub struct ProcessBuffer -where - S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), -{ - inner: S, - callback: F, - buffer: [rodio::Sample; N], - next: usize, -} - -impl Iterator for ProcessBuffer -where - S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), -{ - type Item = rodio::Sample; - - fn next(&mut self) -> Option { - self.next += 1; - if self.next < self.buffer.len() { - let sample = self.buffer[self.next]; - return Some(sample); - } - - for sample in &mut self.buffer { - *sample = self.inner.next()? - } - (self.callback)(&mut self.buffer); - - self.next = 0; - Some(self.buffer[0]) - } -} - -impl Source for ProcessBuffer -where - S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), -{ - fn current_span_len(&self) -> Option { - // TODO dvdsk this should be a spanless Source - None - } - - fn channels(&self) -> rodio::ChannelCount { - self.inner.channels() - } - - fn sample_rate(&self) -> rodio::SampleRate { - self.inner.sample_rate() - } - - fn total_duration(&self) -> Option { - self.inner.total_duration() - } -}