diff --git a/Cargo.lock b/Cargo.lock index 30be5d582c..f0481db850 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13874,7 +13874,7 @@ dependencies = [ [[package]] name = "rodio" version = "0.21.1" -source = "git+https://github.com/RustAudio/rodio?branch=microphone#9af0d470b35c6e1ec6bca88d87bbf4bc52f1f018" +source = "git+https://github.com/RustAudio/rodio?branch=microphone#bb560f30b17d330459b81afc918f2a4a123c41aa" dependencies = [ "cpal", "dasp_sample", diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index 63ce404bc2..c7882e1f20 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -265,12 +265,12 @@ impl AudioStack { num_channels: u32, ) -> Result<()> { use crate::livekit_client::playback::source::RodioExt; - thread::spawn(|| { + thread::spawn(move || { let stream = rodio::microphone::MicrophoneBuilder::new() - .with_default_device()? - .with_default_config()? + .default_device()? + .default_config()? .open_stream()?; - let stream = UniformSourceIterator::new( + let mut stream = UniformSourceIterator::new( stream, NonZero::new(1).expect("1 is not zero"), NonZero::new(SAMPLE_RATE).expect("constant is not zero"), @@ -281,20 +281,28 @@ impl AudioStack { apm.lock() .process_stream(&mut int_buffer, sample_rate as i32, num_channels as i32) .unwrap(); // TODO dvdsk fix this - for (sample, processed) in buffer.iter().zip(&int_buffer) { + for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { *sample = (*processed).to_sample_(); } }) .automatic_gain_control(1.0, 4.0, 0.0, 5.0); loop { - frame_tx.unbounded_send(AudioFrame { - data: Cow::Owned(sampled), - sample_rate, - num_channels, - samples_per_channel: sample_rate / 100, - }) + let sampled = stream.by_ref().take(1000).map(|s| s.to_sample()).collect(); + + if frame_tx + .unbounded_send(AudioFrame { + data: Cow::Owned(sampled), + sample_rate, + num_channels, + samples_per_channel: sample_rate / 100, + }) + .is_err() + { + break; + } } + Ok::<(), anyhow::Error>(()) }); Ok(()) diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index 0ce3e6aa1c..b54bc7e821 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -73,27 +73,21 @@ impl Source for LiveKitStream { } pub trait RodioExt: Source + Sized { - fn process_buffer( - self, - callback: impl FnMut(&mut [rodio::Sample; 200]), - ) -> ProcessBuffer + fn process_buffer(self, callback: F) -> ProcessBuffer where - F: FnMut(&mut [rodio::Sample]); + F: FnMut(&mut [rodio::Sample; 200]); } impl RodioExt for S { - fn process_buffer( - self, - callback: impl FnMut(&mut [rodio::Sample; 200]), - ) -> ProcessBuffer + fn process_buffer(self, callback: F) -> ProcessBuffer where - F: FnMut(&mut [rodio::Sample]), + F: FnMut(&mut [rodio::Sample; 200]), { ProcessBuffer { inner: self, callback, - in_buffer: [0.0; 200], - out_buffer: [0.0; 200], + buffer: [0.0; 200], + next: 200, } } } @@ -105,8 +99,8 @@ where { inner: S, callback: F, - in_buffer: [rodio::Sample; 200], - out_buffer: std::array::IntoIter, + buffer: [rodio::Sample; 200], + next: usize, } impl Iterator for ProcessBuffer @@ -117,30 +111,41 @@ where type Item = rodio::Sample; fn next(&mut self) -> Option { - for sample in &mut in_buffer { - *sample = self.inner.next()?; + self.next += 1; + if self.next < self.buffer.len() { + let sample = self.buffer[self.next]; + return Some(sample); } + + for sample in &mut self.buffer { + *sample = self.inner.next()? + } + (self.callback)(&mut self.buffer); + + self.next = 0; + Some(self.buffer[0]) } } +// TODO dvdsk this should be a spanless Source impl Source for ProcessBuffer where S: Source + Sized, F: FnMut(&mut [rodio::Sample; 200]), { fn current_span_len(&self) -> Option { - todo!() + None } fn channels(&self) -> rodio::ChannelCount { - todo!() + self.inner.channels() } fn sample_rate(&self) -> rodio::SampleRate { - todo!() + self.inner.sample_rate() } fn total_duration(&self) -> Option { - todo!() + self.inner.total_duration() } }