From 2d79193fb6e0df3cf6e8de823e02ef221c7c551d Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Fri, 2 Apr 2021 12:03:35 -0600 Subject: [PATCH] Remove future/stream handlers when task is dropped Co-Authored-By: Max Brunsfeld --- gpui/src/app.rs | 193 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 139 insertions(+), 54 deletions(-) diff --git a/gpui/src/app.rs b/gpui/src/app.rs index b18830061f..3bff74f10b 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -1,6 +1,6 @@ use crate::{ elements::ElementBox, - executor::{self, Task}, + executor, keymap::{self, Keystroke}, platform::{self, App as _, WindowOptions}, presenter::Presenter, @@ -311,8 +311,8 @@ pub struct MutableAppContext { HashMap>, foreground: Rc, background: Arc, - future_handlers: HashMap, - stream_handlers: HashMap, + future_handlers: Rc>>, + stream_handlers: Rc>>, task_done: (channel::Sender, channel::Receiver), pending_effects: VecDeque, pending_flushes: usize, @@ -348,8 +348,8 @@ impl MutableAppContext { invalidation_callbacks: HashMap::new(), foreground, background: Arc::new(executor::Background::new()), - future_handlers: HashMap::new(), - stream_handlers: HashMap::new(), + future_handlers: Default::default(), + stream_handlers: Default::default(), task_done: channel::unbounded(), pending_effects: VecDeque::new(), pending_flushes: 0, @@ -963,50 +963,64 @@ impl MutableAppContext { self.flush_effects(); } - fn spawn(&mut self, future: F) -> (usize, Task>) + fn spawn(&mut self, future: F) -> Task> where F: 'static + Future, T: 'static, { let task_id = post_inc(&mut self.next_task_id); let app = self.weak_self.as_ref().unwrap().upgrade().unwrap(); - let task = self.foreground.spawn(async move { - let output = future.await; - app.borrow_mut() - .handle_future_output(task_id, Box::new(output)) - .map(|result| *result.downcast::().unwrap()) - }); - (task_id, task) + let task = { + let app = app.clone(); + self.foreground.spawn(async move { + let output = future.await; + app.borrow_mut() + .handle_future_output(task_id, Box::new(output)) + .map(|result| *result.downcast::().unwrap()) + }) + }; + Task::new( + task_id, + task, + TaskHandlerMap::Future(self.future_handlers.clone()), + ) } - fn spawn_stream(&mut self, mut stream: F) -> (usize, Task>) + fn spawn_stream(&mut self, mut stream: F) -> Task> where F: 'static + Stream + Unpin, T: 'static, { let task_id = post_inc(&mut self.next_task_id); let app = self.weak_self.as_ref().unwrap().upgrade().unwrap(); - let task = self.foreground.spawn(async move { - loop { - match stream.next().await { - Some(item) => { - let mut app = app.borrow_mut(); - if app.handle_stream_item(task_id, Box::new(item)) { + let task = { + let app = app.clone(); + self.foreground.spawn(async move { + loop { + match stream.next().await { + Some(item) => { + let mut app = app.borrow_mut(); + if app.handle_stream_item(task_id, Box::new(item)) { + break; + } + } + None => { break; } } - None => { - break; - } } - } - app.borrow_mut() - .stream_completed(task_id) - .map(|result| *result.downcast::().unwrap()) - }); + app.borrow_mut() + .stream_completed(task_id) + .map(|result| *result.downcast::().unwrap()) + }) + }; - (task_id, task) + Task::new( + task_id, + task, + TaskHandlerMap::Stream(self.stream_handlers.clone()), + ) } fn handle_future_output( @@ -1015,7 +1029,7 @@ impl MutableAppContext { output: Box, ) -> Option> { self.pending_flushes += 1; - let future_callback = self.future_handlers.remove(&task_id).unwrap(); + let future_callback = self.future_handlers.borrow_mut().remove(&task_id).unwrap(); let mut result = None; @@ -1057,7 +1071,7 @@ impl MutableAppContext { fn handle_stream_item(&mut self, task_id: usize, output: Box) -> bool { self.pending_flushes += 1; - let mut handler = self.stream_handlers.remove(&task_id).unwrap(); + let mut handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap(); let halt = match &mut handler { StreamHandler::Model { model_id, @@ -1067,7 +1081,7 @@ impl MutableAppContext { if let Some(mut model) = self.ctx.models.remove(&model_id) { let halt = item_callback(model.as_any_mut(), output, self, *model_id); self.ctx.models.insert(*model_id, model); - self.stream_handlers.insert(task_id, handler); + self.stream_handlers.borrow_mut().insert(task_id, handler); halt } else { true @@ -1092,7 +1106,7 @@ impl MutableAppContext { .unwrap() .views .insert(*view_id, view); - self.stream_handlers.insert(task_id, handler); + self.stream_handlers.borrow_mut().insert(task_id, handler); halt } else { true @@ -1105,7 +1119,8 @@ impl MutableAppContext { } fn stream_completed(&mut self, task_id: usize) -> Option> { - let result = match self.stream_handlers.remove(&task_id).unwrap() { + let stream_handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap(); + let result = match stream_handler { StreamHandler::Model { model_id, done_callback, @@ -1158,8 +1173,13 @@ impl MutableAppContext { } pub fn finish_pending_tasks(&self) -> impl Future { - let mut pending_tasks = self.future_handlers.keys().cloned().collect::>(); - pending_tasks.extend(self.stream_handlers.keys()); + let mut pending_tasks = self + .future_handlers + .borrow() + .keys() + .cloned() + .collect::>(); + pending_tasks.extend(self.stream_handlers.borrow().keys()); let task_done = self.task_done.1.clone(); @@ -1531,10 +1551,10 @@ impl<'a, T: Entity> ModelContext<'a, T> { F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext) -> U, U: 'static, { - let (task_id, task) = self.app.spawn::(future); + let task = self.app.spawn::(future); - self.app.future_handlers.insert( - task_id, + self.app.future_handlers.borrow_mut().insert( + task.id, FutureHandler::Model { model_id: self.model_id, callback: Box::new(move |model, output, app, model_id| { @@ -1564,9 +1584,9 @@ impl<'a, T: Entity> ModelContext<'a, T> { G: 'static + FnOnce(&mut T, &mut ModelContext) -> U, U: 'static + Any, { - let (task_id, task) = self.app.spawn_stream(stream); - self.app.stream_handlers.insert( - task_id, + let task = self.app.spawn_stream(stream); + self.app.stream_handlers.borrow_mut().insert( + task.id, StreamHandler::Model { model_id: self.model_id, item_callback: Box::new(move |model, output, app, model_id| { @@ -1791,10 +1811,10 @@ impl<'a, T: View> ViewContext<'a, T> { F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext) -> U, U: 'static, { - let (task_id, task) = self.app.spawn(future); + let task = self.app.spawn(future); - self.app.future_handlers.insert( - task_id, + self.app.future_handlers.borrow_mut().insert( + task.id, FutureHandler::View { window_id: self.window_id, view_id: self.view_id, @@ -1825,9 +1845,9 @@ impl<'a, T: View> ViewContext<'a, T> { G: 'static + FnOnce(&mut T, &mut ViewContext) -> U, U: 'static + Any, { - let (task_id, task) = self.app.spawn_stream(stream); - self.app.stream_handlers.insert( - task_id, + let task = self.app.spawn_stream(stream); + self.app.stream_handlers.borrow_mut().insert( + task.id, StreamHandler::View { window_id: self.window_id, view_id: self.view_id, @@ -2325,6 +2345,65 @@ enum StreamHandler { }, } +#[must_use] +pub struct Task { + id: usize, + task: Option>, + handler_map: TaskHandlerMap, +} + +enum TaskHandlerMap { + Detached, + Future(Rc>>), + Stream(Rc>>), +} + +impl Task { + fn new(id: usize, task: executor::Task, handler_map: TaskHandlerMap) -> Self { + Self { + id, + task: Some(task), + handler_map, + } + } + + pub fn detach(mut self) { + self.handler_map = TaskHandlerMap::Detached; + self.task.take().unwrap().detach(); + } + + pub async fn cancel(mut self) -> Option { + let task = self.task.take().unwrap(); + task.cancel().await + } +} + +impl Future for Task { + type Output = T; + + fn poll( + self: std::pin::Pin<&mut Self>, + ctx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let task = unsafe { self.map_unchecked_mut(|task| task.task.as_mut().unwrap()) }; + task.poll(ctx) + } +} + +impl Drop for Task { + fn drop(self: &mut Self) { + match &self.handler_map { + TaskHandlerMap::Detached => {} + TaskHandlerMap::Future(map) => { + map.borrow_mut().remove(&self.id); + } + TaskHandlerMap::Stream(map) => { + map.borrow_mut().remove(&self.id); + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -3270,32 +3349,38 @@ mod tests { model.update(&mut app, |_, ctx| { ctx.spawn(async {}, |_, _, _| {}).detach(); - ctx.spawn(async {}, |_, _, _| {}).detach(); + // Cancel this task + drop(ctx.spawn(async {}, |_, _, _| {})); }); view.update(&mut app, |_, ctx| { ctx.spawn(async {}, |_, _, _| {}).detach(); - ctx.spawn(async {}, |_, _, _| {}).detach(); + // Cancel this task + drop(ctx.spawn(async {}, |_, _, _| {})); }); - assert!(!app.0.borrow().future_handlers.is_empty()); + assert!(!app.0.borrow().future_handlers.borrow().is_empty()); app.finish_pending_tasks().await; - assert!(app.0.borrow().future_handlers.is_empty()); + assert!(app.0.borrow().future_handlers.borrow().is_empty()); app.finish_pending_tasks().await; // Don't block if there are no tasks model.update(&mut app, |_, ctx| { ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}) .detach(); + // Cancel this task + drop(ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {})); }); view.update(&mut app, |_, ctx| { ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}) .detach(); + // Cancel this task + drop(ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {})); }); - assert!(!app.0.borrow().stream_handlers.is_empty()); + assert!(!app.0.borrow().stream_handlers.borrow().is_empty()); app.finish_pending_tasks().await; - assert!(app.0.borrow().stream_handlers.is_empty()); + assert!(app.0.borrow().stream_handlers.borrow().is_empty()); app.finish_pending_tasks().await; // Don't block if there are no tasks }); }