From 5a4f284e456a3c995ab50dc9f30e0dd67380c19a Mon Sep 17 00:00:00 2001 From: David Kleingeld Date: Tue, 26 Aug 2025 12:37:10 +0200 Subject: [PATCH] adds error handling to audio input pipeline --- .../src/livekit_client/playback.rs | 36 ++++++++++++------- .../src/livekit_client/playback/source.rs | 4 +-- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index c6b317e37f..b908c27ed2 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -26,9 +26,10 @@ use std::cell::RefCell; use std::num::NonZero; use std::sync::Weak; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; +use std::sync::mpsc::{TryRecvError, channel}; use std::time::Duration; use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread}; -use util::{ResultExt as _, maybe}; +use util::{ResultExt as _, debug_panic, maybe}; mod source; @@ -268,6 +269,8 @@ impl AudioStack { const NUM_CHANNELS: usize = 1; const LIVEKIT_BUFFER_SIZE: usize = (SAMPLE_RATE as usize / 100) * NUM_CHANNELS as usize; + let (stream_error_tx, stream_error_rx) = channel(); + thread::spawn(move || { let stream = rodio::microphone::MicrophoneBuilder::new() .default_device()? @@ -281,11 +284,16 @@ impl AudioStack { .limit(LimitSettings::live_performance()) .process_buffer::(|buffer| { let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); - apm.lock() - .process_stream(&mut int_buffer, sample_rate as i32, num_channels as i32) - .unwrap(); // TODO dvdsk fix this - for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { - *sample = (*processed).to_sample_(); + if let Err(e) = apm + .lock() + .process_stream(&mut int_buffer, SAMPLE_RATE as i32, NUM_CHANNELS as i32) + .context("livekit audio processor error") + { + let _ = stream_error_tx.send(e); + } else { + for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { + *sample = (*processed).to_sample_(); + } } }) .automatic_gain_control(1.0, 4.0, 0.0, 5.0); @@ -297,19 +305,23 @@ impl AudioStack { .map(|s| s.to_sample()) .collect(); - if frame_tx + match stream_error_rx.try_recv() { + Ok(apm_error) => return Err::<(), _>(apm_error), + Err(TryRecvError::Disconnected) => { + debug_panic!("Stream should end on its own without sending an error") + } + Err(TryRecvError::Empty) => (), + } + + frame_tx .unbounded_send(AudioFrame { data: Cow::Owned(sampled), sample_rate, num_channels, samples_per_channel: sample_rate / 100, }) - .is_err() - { - break; - } + .context("Failed to send audio frame")? } - Ok::<(), anyhow::Error>(()) }); Ok(()) diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index 89e1fd0b21..7f9f5e1b2a 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -103,7 +103,7 @@ where next: usize, } -impl Iterator for ProcessBuffer +impl Iterator for ProcessBuffer where S: Source + Sized, F: FnMut(&mut [rodio::Sample; N]), @@ -127,13 +127,13 @@ where } } -// TODO dvdsk this should be a spanless Source impl Source for ProcessBuffer where S: Source + Sized, F: FnMut(&mut [rodio::Sample; N]), { fn current_span_len(&self) -> Option { + // TODO dvdsk this should be a spanless Source None }