adds error handling to audio input pipeline

This commit is contained in:
David Kleingeld 2025-08-26 12:37:10 +02:00
parent efd9edb5ff
commit 5a4f284e45
2 changed files with 26 additions and 14 deletions

View file

@ -26,9 +26,10 @@ use std::cell::RefCell;
use std::num::NonZero;
use std::sync::Weak;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::mpsc::{TryRecvError, channel};
use std::time::Duration;
use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
use util::{ResultExt as _, maybe};
use util::{ResultExt as _, debug_panic, maybe};
mod source;
@ -268,6 +269,8 @@ impl AudioStack {
const NUM_CHANNELS: usize = 1;
const LIVEKIT_BUFFER_SIZE: usize = (SAMPLE_RATE as usize / 100) * NUM_CHANNELS as usize;
let (stream_error_tx, stream_error_rx) = channel();
thread::spawn(move || {
let stream = rodio::microphone::MicrophoneBuilder::new()
.default_device()?
@ -281,11 +284,16 @@ impl AudioStack {
.limit(LimitSettings::live_performance())
.process_buffer::<LIVEKIT_BUFFER_SIZE, _>(|buffer| {
let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
apm.lock()
.process_stream(&mut int_buffer, sample_rate as i32, num_channels as i32)
.unwrap(); // TODO dvdsk fix this
for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
*sample = (*processed).to_sample_();
if let Err(e) = apm
.lock()
.process_stream(&mut int_buffer, SAMPLE_RATE as i32, NUM_CHANNELS as i32)
.context("livekit audio processor error")
{
let _ = stream_error_tx.send(e);
} else {
for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
*sample = (*processed).to_sample_();
}
}
})
.automatic_gain_control(1.0, 4.0, 0.0, 5.0);
@ -297,19 +305,23 @@ impl AudioStack {
.map(|s| s.to_sample())
.collect();
if frame_tx
match stream_error_rx.try_recv() {
Ok(apm_error) => return Err::<(), _>(apm_error),
Err(TryRecvError::Disconnected) => {
debug_panic!("Stream should end on its own without sending an error")
}
Err(TryRecvError::Empty) => (),
}
frame_tx
.unbounded_send(AudioFrame {
data: Cow::Owned(sampled),
sample_rate,
num_channels,
samples_per_channel: sample_rate / 100,
})
.is_err()
{
break;
}
.context("Failed to send audio frame")?
}
Ok::<(), anyhow::Error>(())
});
Ok(())

View file

@ -103,7 +103,7 @@ where
next: usize,
}
impl<const N: usize, S, F> Iterator for ProcessBuffer<S, F, N>
impl<const N: usize, S, F> Iterator for ProcessBuffer<N, S, F>
where
S: Source + Sized,
F: FnMut(&mut [rodio::Sample; N]),
@ -127,13 +127,13 @@ where
}
}
// TODO dvdsk this should be a spanless Source
impl<const N: usize, S, F> Source for ProcessBuffer<N, S, F>
where
S: Source + Sized,
F: FnMut(&mut [rodio::Sample; N]),
{
fn current_span_len(&self) -> Option<usize> {
// TODO dvdsk this should be a spanless Source
None
}