ZIm/crates/gpui/src/executor.rs
Nathan Sobo a6ffcdd0cf Track open buffers when handling sync requests
When a host sends a buffer to a guest for the first time, they record that
they have done so in a set tied to that guest's peer id. When the guest
reconnects and syncs buffers, they do so under a different peer id, so we
need to be sure we track which buffers we have sent them to avoid sending
them the same buffer twice, which violates the guest's assumptions.
2023-01-02 20:27:59 -07:00

993 lines
31 KiB
Rust

use anyhow::{anyhow, Result};
use async_task::Runnable;
use futures::channel::mpsc;
use smol::{channel, prelude::*, Executor};
use std::{
any::Any,
fmt::{self, Display},
marker::PhantomData,
mem,
pin::Pin,
rc::Rc,
sync::Arc,
task::{Context, Poll},
thread,
time::Duration,
};
use crate::{
platform::{self, Dispatcher},
util, MutableAppContext,
};
pub enum Foreground {
Platform {
dispatcher: Arc<dyn platform::Dispatcher>,
_not_send_or_sync: PhantomData<Rc<()>>,
},
#[cfg(any(test, feature = "test-support"))]
Deterministic {
cx_id: usize,
executor: Arc<Deterministic>,
},
}
pub enum Background {
#[cfg(any(test, feature = "test-support"))]
Deterministic { executor: Arc<Deterministic> },
Production {
executor: Arc<smol::Executor<'static>>,
_stop: channel::Sender<()>,
},
}
type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
#[must_use]
pub enum Task<T> {
Ready(Option<T>),
Local {
any_task: AnyLocalTask,
result_type: PhantomData<T>,
},
Send {
any_task: AnyTask,
result_type: PhantomData<T>,
},
}
unsafe impl<T: Send> Send for Task<T> {}
#[cfg(any(test, feature = "test-support"))]
struct DeterministicState {
rng: rand::prelude::StdRng,
seed: u64,
scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
scheduled_from_background: Vec<BackgroundRunnable>,
forbid_parking: bool,
block_on_ticks: std::ops::RangeInclusive<usize>,
now: std::time::Instant,
next_timer_id: usize,
pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
waiting_backtrace: Option<backtrace::Backtrace>,
next_runnable_id: usize,
poll_history: Vec<ExecutorEvent>,
previous_poll_history: Option<Vec<ExecutorEvent>>,
enable_runnable_backtraces: bool,
runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum ExecutorEvent {
PollRunnable { id: usize },
EnqueuRunnable { id: usize },
}
#[cfg(any(test, feature = "test-support"))]
struct ForegroundRunnable {
id: usize,
runnable: Runnable,
main: bool,
}
#[cfg(any(test, feature = "test-support"))]
struct BackgroundRunnable {
id: usize,
runnable: Runnable,
}
#[cfg(any(test, feature = "test-support"))]
pub struct Deterministic {
state: Arc<parking_lot::Mutex<DeterministicState>>,
parker: parking_lot::Mutex<parking::Parker>,
}
pub enum Timer {
Production(smol::Timer),
#[cfg(any(test, feature = "test-support"))]
Deterministic(DeterministicTimer),
}
#[cfg(any(test, feature = "test-support"))]
pub struct DeterministicTimer {
rx: postage::barrier::Receiver,
id: usize,
state: Arc<parking_lot::Mutex<DeterministicState>>,
}
#[cfg(any(test, feature = "test-support"))]
impl Deterministic {
pub fn new(seed: u64) -> Arc<Self> {
use rand::prelude::*;
Arc::new(Self {
state: Arc::new(parking_lot::Mutex::new(DeterministicState {
rng: StdRng::seed_from_u64(seed),
seed,
scheduled_from_foreground: Default::default(),
scheduled_from_background: Default::default(),
forbid_parking: false,
block_on_ticks: 0..=1000,
now: std::time::Instant::now(),
next_timer_id: Default::default(),
pending_timers: Default::default(),
waiting_backtrace: None,
next_runnable_id: 0,
poll_history: Default::default(),
previous_poll_history: Default::default(),
enable_runnable_backtraces: false,
runnable_backtraces: Default::default(),
})),
parker: Default::default(),
})
}
pub fn execution_history(&self) -> Vec<ExecutorEvent> {
self.state.lock().poll_history.clone()
}
pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
self.state.lock().previous_poll_history = history;
}
pub fn enable_runnable_backtrace(&self) {
self.state.lock().enable_runnable_backtraces = true;
}
pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
backtrace.resolve();
backtrace
}
pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
Arc::new(Background::Deterministic {
executor: self.clone(),
})
}
pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
Rc::new(Foreground::Deterministic {
cx_id: id,
executor: self.clone(),
})
}
fn spawn_from_foreground(
&self,
cx_id: usize,
future: AnyLocalFuture,
main: bool,
) -> AnyLocalTask {
let state = self.state.clone();
let id;
{
let mut state = state.lock();
id = util::post_inc(&mut state.next_runnable_id);
if state.enable_runnable_backtraces {
state
.runnable_backtraces
.insert(id, backtrace::Backtrace::new_unresolved());
}
}
let unparker = self.parker.lock().unparker();
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
let mut state = state.lock();
state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
state
.scheduled_from_foreground
.entry(cx_id)
.or_default()
.push(ForegroundRunnable { id, runnable, main });
unparker.unpark();
});
runnable.schedule();
task
}
fn spawn(&self, future: AnyFuture) -> AnyTask {
let state = self.state.clone();
let id;
{
let mut state = state.lock();
id = util::post_inc(&mut state.next_runnable_id);
if state.enable_runnable_backtraces {
state
.runnable_backtraces
.insert(id, backtrace::Backtrace::new_unresolved());
}
}
let unparker = self.parker.lock().unparker();
let (runnable, task) = async_task::spawn(future, move |runnable| {
let mut state = state.lock();
state
.poll_history
.push(ExecutorEvent::EnqueuRunnable { id });
state
.scheduled_from_background
.push(BackgroundRunnable { id, runnable });
unparker.unpark();
});
runnable.schedule();
task
}
fn run<'a>(
&self,
cx_id: usize,
main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
) -> Box<dyn Any> {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
let woken = Arc::new(AtomicBool::new(false));
let state = self.state.clone();
let id;
{
let mut state = state.lock();
id = util::post_inc(&mut state.next_runnable_id);
if state.enable_runnable_backtraces {
state
.runnable_backtraces
.insert(id, backtrace::Backtrace::new_unresolved());
}
}
let unparker = self.parker.lock().unparker();
let (runnable, mut main_task) = unsafe {
async_task::spawn_unchecked(main_future, move |runnable| {
let state = &mut *state.lock();
state
.scheduled_from_foreground
.entry(cx_id)
.or_default()
.push(ForegroundRunnable {
id: util::post_inc(&mut state.next_runnable_id),
runnable,
main: true,
});
unparker.unpark();
})
};
runnable.schedule();
loop {
if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
return result;
}
if !woken.load(SeqCst) {
self.state.lock().will_park();
}
woken.store(false, SeqCst);
self.parker.lock().park();
}
}
pub fn run_until_parked(&self) {
use std::sync::atomic::AtomicBool;
let woken = Arc::new(AtomicBool::new(false));
self.run_internal(woken, None);
}
fn run_internal(
&self,
woken: Arc<std::sync::atomic::AtomicBool>,
mut main_task: Option<&mut AnyLocalTask>,
) -> Option<Box<dyn Any>> {
use rand::prelude::*;
use std::sync::atomic::Ordering::SeqCst;
let unparker = self.parker.lock().unparker();
let waker = waker_fn::waker_fn(move || {
woken.store(true, SeqCst);
unparker.unpark();
});
let mut cx = Context::from_waker(&waker);
loop {
let mut state = self.state.lock();
if state.scheduled_from_foreground.is_empty()
&& state.scheduled_from_background.is_empty()
{
if let Some(main_task) = main_task {
if let Poll::Ready(result) = main_task.poll(&mut cx) {
return Some(result);
}
}
return None;
}
if !state.scheduled_from_background.is_empty() && state.rng.gen() {
let background_len = state.scheduled_from_background.len();
let ix = state.rng.gen_range(0..background_len);
let background_runnable = state.scheduled_from_background.remove(ix);
state.push_to_history(ExecutorEvent::PollRunnable {
id: background_runnable.id,
});
drop(state);
background_runnable.runnable.run();
} else if !state.scheduled_from_foreground.is_empty() {
let available_cx_ids = state
.scheduled_from_foreground
.keys()
.copied()
.collect::<Vec<_>>();
let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
let scheduled_from_cx = state
.scheduled_from_foreground
.get_mut(&cx_id_to_run)
.unwrap();
let foreground_runnable = scheduled_from_cx.remove(0);
if scheduled_from_cx.is_empty() {
state.scheduled_from_foreground.remove(&cx_id_to_run);
}
state.push_to_history(ExecutorEvent::PollRunnable {
id: foreground_runnable.id,
});
drop(state);
foreground_runnable.runnable.run();
if let Some(main_task) = main_task.as_mut() {
if foreground_runnable.main {
if let Poll::Ready(result) = main_task.poll(&mut cx) {
return Some(result);
}
}
}
}
}
}
fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
where
F: Unpin + Future<Output = T>,
{
use rand::prelude::*;
let unparker = self.parker.lock().unparker();
let waker = waker_fn::waker_fn(move || {
unparker.unpark();
});
let mut cx = Context::from_waker(&waker);
for _ in 0..max_ticks {
let mut state = self.state.lock();
let runnable_count = state.scheduled_from_background.len();
let ix = state.rng.gen_range(0..=runnable_count);
if ix < state.scheduled_from_background.len() {
let background_runnable = state.scheduled_from_background.remove(ix);
state.push_to_history(ExecutorEvent::PollRunnable {
id: background_runnable.id,
});
drop(state);
background_runnable.runnable.run();
} else {
drop(state);
if let Poll::Ready(result) = future.poll(&mut cx) {
return Some(result);
}
let mut state = self.state.lock();
if state.scheduled_from_background.is_empty() {
state.will_park();
drop(state);
self.parker.lock().park();
}
continue;
}
}
None
}
pub fn timer(&self, duration: Duration) -> Timer {
let (tx, rx) = postage::barrier::channel();
let mut state = self.state.lock();
let wakeup_at = state.now + duration;
let id = util::post_inc(&mut state.next_timer_id);
match state
.pending_timers
.binary_search_by_key(&wakeup_at, |e| e.1)
{
Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
}
let state = self.state.clone();
Timer::Deterministic(DeterministicTimer { rx, id, state })
}
pub fn now(&self) -> std::time::Instant {
let state = self.state.lock();
state.now
}
pub fn advance_clock(&self, duration: Duration) {
let new_now = self.state.lock().now + duration;
loop {
self.run_until_parked();
let mut state = self.state.lock();
if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
let wakeup_time = *wakeup_time;
if wakeup_time <= new_now {
let timer_count = state
.pending_timers
.iter()
.take_while(|(_, t, _)| *t == wakeup_time)
.count();
state.now = wakeup_time;
let timers_to_wake = state
.pending_timers
.drain(0..timer_count)
.collect::<Vec<_>>();
drop(state);
drop(timers_to_wake);
continue;
}
}
break;
}
self.state.lock().now = new_now;
}
pub fn start_waiting(&self) {
self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
}
pub fn finish_waiting(&self) {
self.state.lock().waiting_backtrace.take();
}
pub fn forbid_parking(&self) {
use rand::prelude::*;
let mut state = self.state.lock();
state.forbid_parking = true;
state.rng = StdRng::seed_from_u64(state.seed);
}
pub async fn simulate_random_delay(&self) {
use rand::prelude::*;
use smol::future::yield_now;
if self.state.lock().rng.gen_bool(0.2) {
let yields = self.state.lock().rng.gen_range(1..=10);
for _ in 0..yields {
yield_now().await;
}
}
}
pub fn record_backtrace(&self) {
let mut state = self.state.lock();
if state.enable_runnable_backtraces {
let current_id = state
.poll_history
.iter()
.rev()
.find_map(|event| match event {
ExecutorEvent::PollRunnable { id } => Some(*id),
_ => None,
});
if let Some(id) = current_id {
state
.runnable_backtraces
.insert(id, backtrace::Backtrace::new_unresolved());
}
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
#[cfg(any(test, feature = "test-support"))]
if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
state
.lock()
.pending_timers
.retain(|(timer_id, _, _)| timer_id != id)
}
}
}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
#[cfg(any(test, feature = "test-support"))]
Self::Deterministic(DeterministicTimer { rx, .. }) => {
use postage::stream::{PollRecv, Stream as _};
smol::pin!(rx);
match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
PollRecv::Pending => Poll::Pending,
}
}
Self::Production(timer) => {
smol::pin!(timer);
match timer.poll(cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
}
}
}
#[cfg(any(test, feature = "test-support"))]
impl DeterministicState {
fn push_to_history(&mut self, event: ExecutorEvent) {
use std::fmt::Write as _;
self.poll_history.push(event);
if let Some(prev_history) = &self.previous_poll_history {
let ix = self.poll_history.len() - 1;
let prev_event = prev_history[ix];
if event != prev_event {
let mut message = String::new();
writeln!(
&mut message,
"current runnable backtrace:\n{:?}",
self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
trace.resolve();
util::CwdBacktrace(trace)
})
)
.unwrap();
writeln!(
&mut message,
"previous runnable backtrace:\n{:?}",
self.runnable_backtraces
.get_mut(&prev_event.id())
.map(|trace| {
trace.resolve();
util::CwdBacktrace(trace)
})
)
.unwrap();
panic!("detected non-determinism after {ix}. {message}");
}
}
}
fn will_park(&mut self) {
if self.forbid_parking {
let mut backtrace_message = String::new();
#[cfg(any(test, feature = "test-support"))]
if let Some(backtrace) = self.waiting_backtrace.as_mut() {
backtrace.resolve();
backtrace_message = format!(
"\nbacktrace of waiting future:\n{:?}",
util::CwdBacktrace(backtrace)
);
}
panic!(
"deterministic executor parked after a call to forbid_parking{}",
backtrace_message
);
}
}
}
#[cfg(any(test, feature = "test-support"))]
impl ExecutorEvent {
pub fn id(&self) -> usize {
match self {
ExecutorEvent::PollRunnable { id } => *id,
ExecutorEvent::EnqueuRunnable { id } => *id,
}
}
}
impl Foreground {
pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
if dispatcher.is_main_thread() {
Ok(Self::Platform {
dispatcher,
_not_send_or_sync: PhantomData,
})
} else {
Err(anyhow!("must be constructed on main thread"))
}
}
pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let future = any_local_future(future);
let any_task = match self {
#[cfg(any(test, feature = "test-support"))]
Self::Deterministic { cx_id, executor } => {
executor.spawn_from_foreground(*cx_id, future, false)
}
Self::Platform { dispatcher, .. } => {
fn spawn_inner(
future: AnyLocalFuture,
dispatcher: &Arc<dyn Dispatcher>,
) -> AnyLocalTask {
let dispatcher = dispatcher.clone();
let schedule =
move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
let (runnable, task) = async_task::spawn_local(future, schedule);
runnable.schedule();
task
}
spawn_inner(future, dispatcher)
}
};
Task::local(any_task)
}
#[cfg(any(test, feature = "test-support"))]
pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
let result = match self {
Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
};
*result.downcast().unwrap()
}
#[cfg(any(test, feature = "test-support"))]
pub fn run_until_parked(&self) {
match self {
Self::Deterministic { executor, .. } => executor.run_until_parked(),
_ => panic!("this method can only be called on a deterministic executor"),
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn parking_forbidden(&self) -> bool {
match self {
Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
_ => panic!("this method can only be called on a deterministic executor"),
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn start_waiting(&self) {
match self {
Self::Deterministic { executor, .. } => executor.start_waiting(),
_ => panic!("this method can only be called on a deterministic executor"),
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn finish_waiting(&self) {
match self {
Self::Deterministic { executor, .. } => executor.finish_waiting(),
_ => panic!("this method can only be called on a deterministic executor"),
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn forbid_parking(&self) {
match self {
Self::Deterministic { executor, .. } => executor.forbid_parking(),
_ => panic!("this method can only be called on a deterministic executor"),
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn advance_clock(&self, duration: Duration) {
match self {
Self::Deterministic { executor, .. } => executor.advance_clock(duration),
_ => panic!("this method can only be called on a deterministic executor"),
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
match self {
Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
_ => panic!("this method can only be called on a deterministic executor"),
}
}
}
impl Background {
pub fn new() -> Self {
let executor = Arc::new(Executor::new());
let stop = channel::unbounded::<()>();
for i in 0..2 * num_cpus::get() {
let executor = executor.clone();
let stop = stop.1.clone();
thread::Builder::new()
.name(format!("background-executor-{}", i))
.spawn(move || smol::block_on(executor.run(stop.recv())))
.unwrap();
}
Self::Production {
executor,
_stop: stop.0,
}
}
pub fn num_cpus(&self) -> usize {
num_cpus::get()
}
pub fn spawn<T, F>(&self, future: F) -> Task<T>
where
T: 'static + Send,
F: Send + Future<Output = T> + 'static,
{
let future = any_future(future);
let any_task = match self {
Self::Production { executor, .. } => executor.spawn(future),
#[cfg(any(test, feature = "test-support"))]
Self::Deterministic { executor } => executor.spawn(future),
};
Task::send(any_task)
}
pub fn block<F, T>(&self, future: F) -> T
where
F: Future<Output = T>,
{
smol::pin!(future);
match self {
Self::Production { .. } => smol::block_on(&mut future),
#[cfg(any(test, feature = "test-support"))]
Self::Deterministic { executor, .. } => {
executor.block(&mut future, usize::MAX).unwrap()
}
}
}
pub fn block_with_timeout<F, T>(
&self,
timeout: Duration,
future: F,
) -> Result<T, impl Future<Output = T>>
where
T: 'static,
F: 'static + Unpin + Future<Output = T>,
{
let mut future = any_local_future(future);
if !timeout.is_zero() {
let output = match self {
Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
#[cfg(any(test, feature = "test-support"))]
Self::Deterministic { executor, .. } => {
use rand::prelude::*;
let max_ticks = {
let mut state = executor.state.lock();
let range = state.block_on_ticks.clone();
state.rng.gen_range(range)
};
executor.block(&mut future, max_ticks)
}
};
if let Some(output) = output {
return Ok(*output.downcast().unwrap());
}
}
Err(async { *future.await.downcast().unwrap() })
}
pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
where
F: FnOnce(&mut Scope<'scope>),
{
let mut scope = Scope::new(self.clone());
(scheduler)(&mut scope);
let spawned = mem::take(&mut scope.futures)
.into_iter()
.map(|f| self.spawn(f))
.collect::<Vec<_>>();
for task in spawned {
task.await;
}
}
pub fn timer(&self, duration: Duration) -> Timer {
match self {
Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
#[cfg(any(test, feature = "test-support"))]
Background::Deterministic { executor } => executor.timer(duration),
}
}
pub fn now(&self) -> std::time::Instant {
match self {
Background::Production { .. } => std::time::Instant::now(),
#[cfg(any(test, feature = "test-support"))]
Background::Deterministic { executor } => executor.now(),
}
}
#[cfg(any(test, feature = "test-support"))]
pub async fn simulate_random_delay(&self) {
match self {
Self::Deterministic { executor, .. } => {
executor.simulate_random_delay().await;
}
_ => {
panic!("this method can only be called on a deterministic executor")
}
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn record_backtrace(&self) {
match self {
Self::Deterministic { executor, .. } => executor.record_backtrace(),
_ => {
panic!("this method can only be called on a deterministic executor")
}
}
}
}
impl Default for Background {
fn default() -> Self {
Self::new()
}
}
pub struct Scope<'a> {
executor: Arc<Background>,
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
tx: Option<mpsc::Sender<()>>,
rx: mpsc::Receiver<()>,
_phantom: PhantomData<&'a ()>,
}
impl<'a> Scope<'a> {
fn new(executor: Arc<Background>) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
executor,
tx: Some(tx),
rx,
futures: Default::default(),
_phantom: PhantomData,
}
}
pub fn spawn<F>(&mut self, f: F)
where
F: Future<Output = ()> + Send + 'a,
{
let tx = self.tx.clone().unwrap();
// Safety: The 'a lifetime is guaranteed to outlive any of these futures because
// dropping this `Scope` blocks until all of the futures have resolved.
let f = unsafe {
mem::transmute::<
Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
>(Box::pin(async move {
f.await;
drop(tx);
}))
};
self.futures.push(f);
}
}
impl<'a> Drop for Scope<'a> {
fn drop(&mut self) {
self.tx.take().unwrap();
// Wait until the channel is closed, which means that all of the spawned
// futures have resolved.
self.executor.block(self.rx.next());
}
}
impl<T> Task<T> {
pub fn ready(value: T) -> Self {
Self::Ready(Some(value))
}
fn local(any_task: AnyLocalTask) -> Self {
Self::Local {
any_task,
result_type: PhantomData,
}
}
pub fn detach(self) {
match self {
Task::Ready(_) => {}
Task::Local { any_task, .. } => any_task.detach(),
Task::Send { any_task, .. } => any_task.detach(),
}
}
}
impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
cx.spawn(|_| async move {
if let Err(err) = self.await {
log::error!("{}", err);
}
})
.detach();
}
}
impl<T: Send> Task<T> {
fn send(any_task: AnyTask) -> Self {
Self::Send {
any_task,
result_type: PhantomData,
}
}
}
impl<T: fmt::Debug> fmt::Debug for Task<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Task::Ready(value) => value.fmt(f),
Task::Local { any_task, .. } => any_task.fmt(f),
Task::Send { any_task, .. } => any_task.fmt(f),
}
}
}
impl<T: 'static> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match unsafe { self.get_unchecked_mut() } {
Task::Ready(value) => Poll::Ready(value.take().unwrap()),
Task::Local { any_task, .. } => {
any_task.poll(cx).map(|value| *value.downcast().unwrap())
}
Task::Send { any_task, .. } => {
any_task.poll(cx).map(|value| *value.downcast().unwrap())
}
}
}
}
fn any_future<T, F>(future: F) -> AnyFuture
where
T: 'static + Send,
F: Future<Output = T> + Send + 'static,
{
async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
}
fn any_local_future<T, F>(future: F) -> AnyLocalFuture
where
T: 'static,
F: Future<Output = T> + 'static,
{
async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
}