Add a facility for delaying quit until critical tasks finish

Co-Authored-By: Antonio Scandurra <me@as-cii.com>
This commit is contained in:
Max Brunsfeld 2021-11-01 11:56:49 -07:00
parent 6e5ec2a00d
commit b8994c2a89
7 changed files with 70 additions and 4 deletions

View file

@ -38,9 +38,13 @@ pub enum Foreground {
}
pub enum Background {
Deterministic(Arc<Deterministic>),
Deterministic {
executor: Arc<Deterministic>,
critical_tasks: Mutex<Vec<Task<()>>>,
},
Production {
executor: Arc<smol::Executor<'static>>,
critical_tasks: Mutex<Vec<Task<()>>>,
_stop: channel::Sender<()>,
},
}
@ -500,6 +504,7 @@ impl Background {
Self::Production {
executor,
critical_tasks: Default::default(),
_stop: stop.0,
}
}
@ -516,11 +521,36 @@ impl Background {
let future = any_future(future);
let any_task = match self {
Self::Production { executor, .. } => executor.spawn(future),
Self::Deterministic(executor) => executor.spawn(future),
Self::Deterministic { executor, .. } => executor.spawn(future),
};
Task::send(any_task)
}
pub fn spawn_critical<T, F>(&self, future: F)
where
T: 'static + Send,
F: Send + Future<Output = T> + 'static,
{
let task = self.spawn(async move {
future.await;
});
match self {
Self::Production { critical_tasks, .. }
| Self::Deterministic { critical_tasks, .. } => critical_tasks.lock().push(task),
}
}
pub fn block_on_critical_tasks(&self, timeout: Duration) -> bool {
match self {
Background::Production { critical_tasks, .. }
| Self::Deterministic { critical_tasks, .. } => {
let tasks = mem::take(&mut *critical_tasks.lock());
self.block_with_timeout(timeout, futures::future::join_all(tasks))
.is_ok()
}
}
}
pub fn block_with_timeout<F, T>(
&self,
timeout: Duration,
@ -534,7 +564,7 @@ impl Background {
if !timeout.is_zero() {
let output = match self {
Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
Self::Deterministic(executor) => executor.block_on(&mut future),
Self::Deterministic { executor, .. } => executor.block_on(&mut future),
};
if let Some(output) = output {
return Ok(*output.downcast().unwrap());
@ -587,7 +617,10 @@ pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
let executor = Arc::new(Deterministic::new(seed));
(
Rc::new(Foreground::Deterministic(executor.clone())),
Arc::new(Background::Deterministic(executor)),
Arc::new(Background::Deterministic {
executor,
critical_tasks: Default::default(),
}),
)
}