Fix hang when dropping ::scoped future under deterministic executor

Co-authored-by: Nathan Sobo <nathan@zed.dev>
Co-authored-by: Keith Simmons <keith@zed.dev>
This commit is contained in:
Max Brunsfeld 2022-04-15 13:25:21 -07:00
parent df0b5779a8
commit c56e2ead23

View file

@ -1,5 +1,6 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use async_task::Runnable; use async_task::Runnable;
use futures::channel::mpsc;
use smol::{channel, prelude::*, Executor}; use smol::{channel, prelude::*, Executor};
use std::{ use std::{
any::Any, any::Any,
@ -8,7 +9,7 @@ use std::{
mem, mem,
pin::Pin, pin::Pin,
rc::Rc, rc::Rc,
sync::{mpsc, Arc}, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
thread, thread,
time::Duration, time::Duration,
@ -621,11 +622,11 @@ impl Background {
Err(async { *future.await.downcast().unwrap() }) Err(async { *future.await.downcast().unwrap() })
} }
pub async fn scoped<'scope, F>(&self, scheduler: F) pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
where where
F: FnOnce(&mut Scope<'scope>), F: FnOnce(&mut Scope<'scope>),
{ {
let mut scope = Scope::new(); let mut scope = Scope::new(self.clone());
(scheduler)(&mut scope); (scheduler)(&mut scope);
let spawned = mem::take(&mut scope.futures) let spawned = mem::take(&mut scope.futures)
.into_iter() .into_iter()
@ -664,6 +665,7 @@ impl Background {
} }
pub struct Scope<'a> { pub struct Scope<'a> {
executor: Arc<Background>,
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>, futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
tx: Option<mpsc::Sender<()>>, tx: Option<mpsc::Sender<()>>,
rx: mpsc::Receiver<()>, rx: mpsc::Receiver<()>,
@ -671,9 +673,10 @@ pub struct Scope<'a> {
} }
impl<'a> Scope<'a> { impl<'a> Scope<'a> {
fn new() -> Self { fn new(executor: Arc<Background>) -> Self {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel(1);
Self { Self {
executor,
tx: Some(tx), tx: Some(tx),
rx, rx,
futures: Default::default(), futures: Default::default(),
@ -708,7 +711,7 @@ impl<'a> Drop for Scope<'a> {
// Wait until the channel is closed, which means that all of the spawned // Wait until the channel is closed, which means that all of the spawned
// futures have resolved. // futures have resolved.
self.rx.recv().ok(); self.executor.block(self.rx.next());
} }
} }