diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 115359231c..b92ce67aaf 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -1,5 +1,5 @@ use crate::{AppContext, PlatformDispatcher}; -use futures::{channel::mpsc, pin_mut, FutureExt}; +use futures::channel::mpsc; use smol::prelude::*; use std::{ fmt::Debug, @@ -9,7 +9,7 @@ use std::{ pin::Pin, rc::Rc, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, + atomic::{AtomicUsize, Ordering::SeqCst}, Arc, }, task::{Context, Poll}, @@ -164,7 +164,7 @@ impl BackgroundExecutor { #[cfg(any(test, feature = "test-support"))] #[track_caller] pub fn block_test(&self, future: impl Future) -> R { - if let Ok(value) = self.block_internal(false, future, usize::MAX) { + if let Ok(value) = self.block_internal(false, future, None) { value } else { unreachable!() @@ -174,24 +174,75 @@ impl BackgroundExecutor { /// Block the current thread until the given future resolves. /// Consider using `block_with_timeout` instead. pub fn block(&self, future: impl Future) -> R { - if let Ok(value) = self.block_internal(true, future, usize::MAX) { + if let Ok(value) = self.block_internal(true, future, None) { value } else { unreachable!() } } + #[cfg(not(any(test, feature = "test-support")))] + pub(crate) fn block_internal( + &self, + _background_only: bool, + future: impl Future, + timeout: Option, + ) -> Result> { + use std::time::Instant; + + let mut future = Box::pin(future); + if timeout == Some(Duration::ZERO) { + return Err(future); + } + let deadline = timeout.map(|timeout| Instant::now() + timeout); + + let unparker = self.dispatcher.unparker(); + let waker = waker_fn(move || { + unparker.unpark(); + }); + let mut cx = std::task::Context::from_waker(&waker); + + loop { + match future.as_mut().poll(&mut cx) { + Poll::Ready(result) => return Ok(result), + Poll::Pending => { + let timeout = + deadline.map(|deadline| deadline.saturating_duration_since(Instant::now())); + if !self.dispatcher.park(timeout) { + if deadline.is_some_and(|deadline| deadline < Instant::now()) { + return Err(future); + } + } + } + } + } + } + + #[cfg(any(test, feature = "test-support"))] #[track_caller] pub(crate) fn block_internal( &self, background_only: bool, future: impl Future, - mut max_ticks: usize, - ) -> Result { - pin_mut!(future); + timeout: Option, + ) -> Result> { + use std::sync::atomic::AtomicBool; + + let mut future = Box::pin(future); + if timeout == Some(Duration::ZERO) { + return Err(future); + } + let Some(dispatcher) = self.dispatcher.as_test() else { + return Err(future); + }; + + let mut max_ticks = if timeout.is_some() { + dispatcher.gen_block_on_ticks() + } else { + usize::MAX + }; let unparker = self.dispatcher.unparker(); let awoken = Arc::new(AtomicBool::new(false)); - let waker = waker_fn({ let awoken = awoken.clone(); move || { @@ -206,34 +257,30 @@ impl BackgroundExecutor { Poll::Ready(result) => return Ok(result), Poll::Pending => { if max_ticks == 0 { - return Err(()); + return Err(future); } max_ticks -= 1; - if !self.dispatcher.tick(background_only) { + if !dispatcher.tick(background_only) { if awoken.swap(false, SeqCst) { continue; } - #[cfg(any(test, feature = "test-support"))] - if let Some(test) = self.dispatcher.as_test() { - if !test.parking_allowed() { - let mut backtrace_message = String::new(); - let mut waiting_message = String::new(); - if let Some(backtrace) = test.waiting_backtrace() { - backtrace_message = - format!("\nbacktrace of waiting future:\n{:?}", backtrace); - } - if let Some(waiting_hint) = test.waiting_hint() { - waiting_message = format!("\n waiting on: {}\n", waiting_hint); - } - panic!( + if !dispatcher.parking_allowed() { + let mut backtrace_message = String::new(); + let mut waiting_message = String::new(); + if let Some(backtrace) = dispatcher.waiting_backtrace() { + backtrace_message = + format!("\nbacktrace of waiting future:\n{:?}", backtrace); + } + if let Some(waiting_hint) = dispatcher.waiting_hint() { + waiting_message = format!("\n waiting on: {}\n", waiting_hint); + } + panic!( "parked with nothing left to run{waiting_message}{backtrace_message}", ) - } } - - self.dispatcher.park(); + self.dispatcher.park(None); } } } @@ -247,31 +294,7 @@ impl BackgroundExecutor { duration: Duration, future: impl Future, ) -> Result> { - let mut future = Box::pin(future.fuse()); - if duration.is_zero() { - return Err(future); - } - - #[cfg(any(test, feature = "test-support"))] - let max_ticks = self - .dispatcher - .as_test() - .map_or(usize::MAX, |dispatcher| dispatcher.gen_block_on_ticks()); - #[cfg(not(any(test, feature = "test-support")))] - let max_ticks = usize::MAX; - - let mut timer = self.timer(duration).fuse(); - - let timeout = async { - futures::select_biased! { - value = future => Ok(value), - _ = timer => Err(()), - } - }; - match self.block_internal(true, timeout, max_ticks) { - Ok(Ok(value)) => Ok(value), - _ => Err(future), - } + self.block_internal(true, future, Some(duration)) } /// Scoped lets you start a number of tasks and waits diff --git a/crates/gpui/src/platform.rs b/crates/gpui/src/platform.rs index 083c6d5fe1..25f6a3e628 100644 --- a/crates/gpui/src/platform.rs +++ b/crates/gpui/src/platform.rs @@ -240,8 +240,7 @@ pub trait PlatformDispatcher: Send + Sync { fn dispatch(&self, runnable: Runnable, label: Option); fn dispatch_on_main_thread(&self, runnable: Runnable); fn dispatch_after(&self, duration: Duration, runnable: Runnable); - fn tick(&self, background_only: bool) -> bool; - fn park(&self); + fn park(&self, timeout: Option) -> bool; fn unparker(&self) -> Unparker; #[cfg(any(test, feature = "test-support"))] diff --git a/crates/gpui/src/platform/linux/dispatcher.rs b/crates/gpui/src/platform/linux/dispatcher.rs index 1557b87ac1..8be712d274 100644 --- a/crates/gpui/src/platform/linux/dispatcher.rs +++ b/crates/gpui/src/platform/linux/dispatcher.rs @@ -110,12 +110,13 @@ impl PlatformDispatcher for LinuxDispatcher { .ok(); } - fn tick(&self, background_only: bool) -> bool { - false - } - - fn park(&self) { - self.parker.lock().park(); + fn park(&self, timeout: Option) -> bool { + if let Some(timeout) = timeout { + self.parker.lock().park_timeout(timeout) + } else { + self.parker.lock().park(); + true + } } fn unparker(&self) -> Unparker { diff --git a/crates/gpui/src/platform/mac/dispatcher.rs b/crates/gpui/src/platform/mac/dispatcher.rs index d5ad010200..776ca4fe04 100644 --- a/crates/gpui/src/platform/mac/dispatcher.rs +++ b/crates/gpui/src/platform/mac/dispatcher.rs @@ -87,12 +87,13 @@ impl PlatformDispatcher for MacDispatcher { } } - fn tick(&self, _background_only: bool) -> bool { - false - } - - fn park(&self) { - self.parker.lock().park() + fn park(&self, timeout: Option) -> bool { + if let Some(timeout) = timeout { + self.parker.lock().park_timeout(timeout) + } else { + self.parker.lock().park(); + true + } } fn unparker(&self) -> Unparker { diff --git a/crates/gpui/src/platform/test/dispatcher.rs b/crates/gpui/src/platform/test/dispatcher.rs index 24850187fc..e9cab32e96 100644 --- a/crates/gpui/src/platform/test/dispatcher.rs +++ b/crates/gpui/src/platform/test/dispatcher.rs @@ -111,6 +111,68 @@ impl TestDispatcher { } } + pub fn tick(&self, background_only: bool) -> bool { + let mut state = self.state.lock(); + + while let Some((deadline, _)) = state.delayed.first() { + if *deadline > state.time { + break; + } + let (_, runnable) = state.delayed.remove(0); + state.background.push(runnable); + } + + let foreground_len: usize = if background_only { + 0 + } else { + state + .foreground + .values() + .map(|runnables| runnables.len()) + .sum() + }; + let background_len = state.background.len(); + + let runnable; + let main_thread; + if foreground_len == 0 && background_len == 0 { + let deprioritized_background_len = state.deprioritized_background.len(); + if deprioritized_background_len == 0 { + return false; + } + let ix = state.random.gen_range(0..deprioritized_background_len); + main_thread = false; + runnable = state.deprioritized_background.swap_remove(ix); + } else { + main_thread = state.random.gen_ratio( + foreground_len as u32, + (foreground_len + background_len) as u32, + ); + if main_thread { + let state = &mut *state; + runnable = state + .foreground + .values_mut() + .filter(|runnables| !runnables.is_empty()) + .choose(&mut state.random) + .unwrap() + .pop_front() + .unwrap(); + } else { + let ix = state.random.gen_range(0..background_len); + runnable = state.background.swap_remove(ix); + }; + }; + + let was_main_thread = state.is_main_thread; + state.is_main_thread = main_thread; + drop(state); + runnable.run(); + self.state.lock().is_main_thread = was_main_thread; + + true + } + pub fn deprioritize(&self, task_label: TaskLabel) { self.state .lock() @@ -221,71 +283,9 @@ impl PlatformDispatcher for TestDispatcher { }; state.delayed.insert(ix, (next_time, runnable)); } - - fn tick(&self, background_only: bool) -> bool { - let mut state = self.state.lock(); - - while let Some((deadline, _)) = state.delayed.first() { - if *deadline > state.time { - break; - } - let (_, runnable) = state.delayed.remove(0); - state.background.push(runnable); - } - - let foreground_len: usize = if background_only { - 0 - } else { - state - .foreground - .values() - .map(|runnables| runnables.len()) - .sum() - }; - let background_len = state.background.len(); - - let runnable; - let main_thread; - if foreground_len == 0 && background_len == 0 { - let deprioritized_background_len = state.deprioritized_background.len(); - if deprioritized_background_len == 0 { - return false; - } - let ix = state.random.gen_range(0..deprioritized_background_len); - main_thread = false; - runnable = state.deprioritized_background.swap_remove(ix); - } else { - main_thread = state.random.gen_ratio( - foreground_len as u32, - (foreground_len + background_len) as u32, - ); - if main_thread { - let state = &mut *state; - runnable = state - .foreground - .values_mut() - .filter(|runnables| !runnables.is_empty()) - .choose(&mut state.random) - .unwrap() - .pop_front() - .unwrap(); - } else { - let ix = state.random.gen_range(0..background_len); - runnable = state.background.swap_remove(ix); - }; - }; - - let was_main_thread = state.is_main_thread; - state.is_main_thread = main_thread; - drop(state); - runnable.run(); - self.state.lock().is_main_thread = was_main_thread; - - true - } - - fn park(&self) { + fn park(&self, _: Option) -> bool { self.parker.lock().park(); + true } fn unparker(&self) -> Unparker { diff --git a/crates/gpui/src/platform/windows/dispatcher.rs b/crates/gpui/src/platform/windows/dispatcher.rs index 5932132210..724b83a6d8 100644 --- a/crates/gpui/src/platform/windows/dispatcher.rs +++ b/crates/gpui/src/platform/windows/dispatcher.rs @@ -122,12 +122,13 @@ impl PlatformDispatcher for WindowsDispatcher { self.dispatch_on_threadpool_after(runnable, duration); } - fn tick(&self, _background_only: bool) -> bool { - false - } - - fn park(&self) { - self.parker.lock().park(); + fn park(&self, timeout: Option) -> bool { + if let Some(timeout) = timeout { + self.parker.lock().park_timeout(timeout) + } else { + self.parker.lock().park(); + true + } } fn unparker(&self) -> parking::Unparker {