adds ProcessBuffer as rodio Source

This commit is contained in:
David Kleingeld 2025-08-26 12:12:01 +02:00
parent 3a5d02cb7d
commit 9521f540fd
3 changed files with 45 additions and 32 deletions

2
Cargo.lock generated
View file

@ -13874,7 +13874,7 @@ dependencies = [
[[package]] [[package]]
name = "rodio" name = "rodio"
version = "0.21.1" version = "0.21.1"
source = "git+https://github.com/RustAudio/rodio?branch=microphone#9af0d470b35c6e1ec6bca88d87bbf4bc52f1f018" source = "git+https://github.com/RustAudio/rodio?branch=microphone#bb560f30b17d330459b81afc918f2a4a123c41aa"
dependencies = [ dependencies = [
"cpal", "cpal",
"dasp_sample", "dasp_sample",

View file

@ -265,12 +265,12 @@ impl AudioStack {
num_channels: u32, num_channels: u32,
) -> Result<()> { ) -> Result<()> {
use crate::livekit_client::playback::source::RodioExt; use crate::livekit_client::playback::source::RodioExt;
thread::spawn(|| { thread::spawn(move || {
let stream = rodio::microphone::MicrophoneBuilder::new() let stream = rodio::microphone::MicrophoneBuilder::new()
.with_default_device()? .default_device()?
.with_default_config()? .default_config()?
.open_stream()?; .open_stream()?;
let stream = UniformSourceIterator::new( let mut stream = UniformSourceIterator::new(
stream, stream,
NonZero::new(1).expect("1 is not zero"), NonZero::new(1).expect("1 is not zero"),
NonZero::new(SAMPLE_RATE).expect("constant is not zero"), NonZero::new(SAMPLE_RATE).expect("constant is not zero"),
@ -281,20 +281,28 @@ impl AudioStack {
apm.lock() apm.lock()
.process_stream(&mut int_buffer, sample_rate as i32, num_channels as i32) .process_stream(&mut int_buffer, sample_rate as i32, num_channels as i32)
.unwrap(); // TODO dvdsk fix this .unwrap(); // TODO dvdsk fix this
for (sample, processed) in buffer.iter().zip(&int_buffer) { for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
*sample = (*processed).to_sample_(); *sample = (*processed).to_sample_();
} }
}) })
.automatic_gain_control(1.0, 4.0, 0.0, 5.0); .automatic_gain_control(1.0, 4.0, 0.0, 5.0);
loop { loop {
frame_tx.unbounded_send(AudioFrame { let sampled = stream.by_ref().take(1000).map(|s| s.to_sample()).collect();
if frame_tx
.unbounded_send(AudioFrame {
data: Cow::Owned(sampled), data: Cow::Owned(sampled),
sample_rate, sample_rate,
num_channels, num_channels,
samples_per_channel: sample_rate / 100, samples_per_channel: sample_rate / 100,
}) })
.is_err()
{
break;
} }
}
Ok::<(), anyhow::Error>(())
}); });
Ok(()) Ok(())

View file

@ -73,27 +73,21 @@ impl Source for LiveKitStream {
} }
pub trait RodioExt: Source + Sized { pub trait RodioExt: Source + Sized {
fn process_buffer<F>( fn process_buffer<F>(self, callback: F) -> ProcessBuffer<Self, F>
self,
callback: impl FnMut(&mut [rodio::Sample; 200]),
) -> ProcessBuffer<Self, F>
where where
F: FnMut(&mut [rodio::Sample]); F: FnMut(&mut [rodio::Sample; 200]);
} }
impl<S: Source> RodioExt for S { impl<S: Source> RodioExt for S {
fn process_buffer<F>( fn process_buffer<F>(self, callback: F) -> ProcessBuffer<Self, F>
self,
callback: impl FnMut(&mut [rodio::Sample; 200]),
) -> ProcessBuffer<Self, F>
where where
F: FnMut(&mut [rodio::Sample]), F: FnMut(&mut [rodio::Sample; 200]),
{ {
ProcessBuffer { ProcessBuffer {
inner: self, inner: self,
callback, callback,
in_buffer: [0.0; 200], buffer: [0.0; 200],
out_buffer: [0.0; 200], next: 200,
} }
} }
} }
@ -105,8 +99,8 @@ where
{ {
inner: S, inner: S,
callback: F, callback: F,
in_buffer: [rodio::Sample; 200], buffer: [rodio::Sample; 200],
out_buffer: std::array::IntoIter<rodio::Sample, N>, next: usize,
} }
impl<S, F> Iterator for ProcessBuffer<S, F> impl<S, F> Iterator for ProcessBuffer<S, F>
@ -117,30 +111,41 @@ where
type Item = rodio::Sample; type Item = rodio::Sample;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
for sample in &mut in_buffer { self.next += 1;
*sample = self.inner.next()?; if self.next < self.buffer.len() {
let sample = self.buffer[self.next];
return Some(sample);
} }
for sample in &mut self.buffer {
*sample = self.inner.next()?
}
(self.callback)(&mut self.buffer);
self.next = 0;
Some(self.buffer[0])
} }
} }
// TODO dvdsk this should be a spanless Source
impl<S, F> Source for ProcessBuffer<S, F> impl<S, F> Source for ProcessBuffer<S, F>
where where
S: Source + Sized, S: Source + Sized,
F: FnMut(&mut [rodio::Sample; 200]), F: FnMut(&mut [rodio::Sample; 200]),
{ {
fn current_span_len(&self) -> Option<usize> { fn current_span_len(&self) -> Option<usize> {
todo!() None
} }
fn channels(&self) -> rodio::ChannelCount { fn channels(&self) -> rodio::ChannelCount {
todo!() self.inner.channels()
} }
fn sample_rate(&self) -> rodio::SampleRate { fn sample_rate(&self) -> rodio::SampleRate {
todo!() self.inner.sample_rate()
} }
fn total_duration(&self) -> Option<std::time::Duration> { fn total_duration(&self) -> Option<std::time::Duration> {
todo!() self.inner.total_duration()
} }
} }