From 526a55d0d7e3550c6f7aa941ac22adb007cc7140 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Fri, 2 Apr 2021 12:42:23 -0600 Subject: [PATCH] Complete finish_pending_tasks future when tasks are cancelled Co-Authored-By: Max Brunsfeld --- gpui/src/app.rs | 62 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/gpui/src/app.rs b/gpui/src/app.rs index 3bff74f10b..5ef5588726 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -983,6 +983,8 @@ impl MutableAppContext { task_id, task, TaskHandlerMap::Future(self.future_handlers.clone()), + self.task_done.0.clone(), + self.background.clone(), ) } @@ -1020,6 +1022,8 @@ impl MutableAppContext { task_id, task, TaskHandlerMap::Stream(self.stream_handlers.clone()), + self.task_done.0.clone(), + self.background.clone(), ) } @@ -1039,7 +1043,6 @@ impl MutableAppContext { result = Some(callback(model.as_any_mut(), output, self, model_id)); self.ctx.models.insert(model_id, model); } - self.task_done(task_id); } FutureHandler::View { window_id, @@ -1060,11 +1063,11 @@ impl MutableAppContext { .views .insert(view_id, view); } - self.task_done(task_id); } }; self.flush_effects(); + self.task_done(task_id); result } @@ -1119,6 +1122,8 @@ impl MutableAppContext { } fn stream_completed(&mut self, task_id: usize) -> Option> { + self.pending_flushes += 1; + let stream_handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap(); let result = match stream_handler { StreamHandler::Model { @@ -1159,6 +1164,8 @@ impl MutableAppContext { } } }; + + self.flush_effects(); self.task_done(task_id); result } @@ -2350,6 +2357,8 @@ pub struct Task { id: usize, task: Option>, handler_map: TaskHandlerMap, + background: Arc, + task_done: channel::Sender, } enum TaskHandlerMap { @@ -2359,11 +2368,19 @@ enum TaskHandlerMap { } impl Task { - fn new(id: usize, task: executor::Task, handler_map: TaskHandlerMap) -> Self { + fn new( + id: usize, + task: executor::Task, + handler_map: TaskHandlerMap, + task_done: channel::Sender, + background: Arc, + ) -> Self { Self { id, task: Some(task), handler_map, + task_done, + background, } } @@ -2393,7 +2410,9 @@ impl Future for Task { impl Drop for Task { fn drop(self: &mut Self) { match &self.handler_map { - TaskHandlerMap::Detached => {} + TaskHandlerMap::Detached => { + return; + } TaskHandlerMap::Future(map) => { map.borrow_mut().remove(&self.id); } @@ -2401,6 +2420,13 @@ impl Drop for Task { map.borrow_mut().remove(&self.id); } } + let task_done = self.task_done.clone(); + let task_id = self.id; + self.background + .spawn(async move { + let _ = task_done.send(task_id).await; + }) + .detach() } } @@ -3382,6 +3408,34 @@ mod tests { app.finish_pending_tasks().await; assert!(app.0.borrow().stream_handlers.borrow().is_empty()); app.finish_pending_tasks().await; // Don't block if there are no tasks + + // Tasks are considered finished when we drop handles + let mut tasks = Vec::new(); + model.update(&mut app, |_, ctx| { + tasks.push(Box::new(ctx.spawn(async {}, |_, _, _| {}))); + tasks.push(Box::new(ctx.spawn_stream( + smol::stream::iter(vec![1, 2, 3]), + |_, _, _| {}, + |_, _| {}, + ))); + }); + + view.update(&mut app, |_, ctx| { + tasks.push(Box::new(ctx.spawn(async {}, |_, _, _| {}))); + tasks.push(Box::new(ctx.spawn_stream( + smol::stream::iter(vec![1, 2, 3]), + |_, _, _| {}, + |_, _| {}, + ))); + }); + + assert!(!app.0.borrow().stream_handlers.borrow().is_empty()); + + let finish_pending_tasks = app.finish_pending_tasks(); + drop(tasks); + finish_pending_tasks.await; + assert!(app.0.borrow().stream_handlers.borrow().is_empty()); + app.finish_pending_tasks().await; // Don't block if there are no tasks }); } }