From e809d6119afc42f2eeacbac8ffdb1a715715ec86 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Thu, 18 Mar 2021 20:10:32 -0600 Subject: [PATCH] Return tasks from spawn and spawn_stream Also, eliminate the background spawning methods. We can spawn futures on the executor and then spawn those on the app if we need to wait for the result of running one. --- Cargo.lock | 21 ++ gpui/Cargo.toml | 1 + gpui/src/app.rs | 511 +++++++++++++++++----------------- gpui/src/executor.rs | 23 +- zed/src/editor/buffer_view.rs | 4 +- zed/src/watch.rs | 4 +- zed/src/worktree/worktree.rs | 20 +- 7 files changed, 317 insertions(+), 267 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58aff982bf..5c3136f082 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -782,6 +782,7 @@ dependencies = [ "parking_lot", "pathfinder_color", "pathfinder_geometry", + "pin-project", "rand 0.8.3", "smallvec", "smol", @@ -1076,6 +1077,26 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pin-project" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "758669ae3558c6f74bd2a18b41f7ac0b5a195aea6639d6a9b5e5d1ad5ba24c0b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.4" diff --git a/gpui/Cargo.toml b/gpui/Cargo.toml index 43a95abcc7..edeea5be5a 100644 --- a/gpui/Cargo.toml +++ b/gpui/Cargo.toml @@ -12,6 +12,7 @@ ordered-float = "2.1.1" parking_lot = "0.11.1" pathfinder_color = "0.5" pathfinder_geometry = "0.5" +pin-project = "1.0.5" rand = "0.8.3" smallvec = "1.6.1" smol = "1.2" diff --git a/gpui/src/app.rs b/gpui/src/app.rs index d377afc2db..cf58a389b6 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -1,6 +1,6 @@ use crate::{ elements::Element, - executor::{self}, + executor::{self, ForegroundTask}, keymap::{self, Keystroke}, platform::{self, App as _}, util::post_inc, @@ -292,7 +292,8 @@ pub struct MutableAppContext { HashMap>, foreground: Rc, background: Arc, - task_callbacks: HashMap, + future_handlers: HashMap, + stream_handlers: HashMap, task_done: (channel::Sender, channel::Receiver), pending_effects: VecDeque, pending_flushes: usize, @@ -321,7 +322,8 @@ impl MutableAppContext { invalidation_callbacks: HashMap::new(), foreground, background: Arc::new(executor::Background::new()), - task_callbacks: HashMap::new(), + future_handlers: HashMap::new(), + stream_handlers: HashMap::new(), task_done: channel::unbounded(), pending_effects: VecDeque::new(), pending_flushes: 0, @@ -869,97 +871,71 @@ impl MutableAppContext { self.flush_effects(); } - fn spawn_local(&mut self, future: F) -> usize + fn spawn(&mut self, future: F) -> (usize, ForegroundTask>) where F: 'static + Future, + T: 'static, { let task_id = post_inc(&mut self.next_task_id); - let app = self.weak_self.as_ref().unwrap().clone(); - self.foreground - .spawn(async move { - let output = future.await; - if let Some(app) = app.upgrade() { - app.borrow_mut() - .relay_task_output(task_id, Box::new(output)); - } - }) - .detach(); - task_id + let app = self.weak_self.as_ref().unwrap().upgrade().unwrap(); + let task = self.foreground.spawn(async move { + let output = future.await; + app.borrow_mut() + .handle_future_output(task_id, Box::new(output)) + .map(|result| *result.downcast::().unwrap()) + }); + (task_id, task) } - fn spawn_stream_local(&mut self, mut stream: F, done_tx: channel::Sender<()>) -> usize + fn spawn_stream(&mut self, mut stream: F) -> (usize, ForegroundTask>) where F: 'static + Stream + Unpin, + T: 'static, { let task_id = post_inc(&mut self.next_task_id); - let app = self.weak_self.as_ref().unwrap().clone(); - self.foreground - .spawn(async move { - loop { - match stream.next().await { - item @ Some(_) => { - if let Some(app) = app.upgrade() { - let mut app = app.borrow_mut(); - if app.relay_task_output(task_id, Box::new(item)) { - app.stream_completed(task_id); - break; - } - } else { - break; - } - } - item @ None => { - if let Some(app) = app.upgrade() { - let mut app = app.borrow_mut(); - app.relay_task_output(task_id, Box::new(item)); - app.stream_completed(task_id); - } - let _ = done_tx.send(()).await; + let app = self.weak_self.as_ref().unwrap().upgrade().unwrap(); + let task = self.foreground.spawn(async move { + loop { + match stream.next().await { + Some(item) => { + let mut app = app.borrow_mut(); + if app.handle_stream_item(task_id, Box::new(item)) { break; } } + None => { + break; + } } - }) - .detach(); - task_id + } + + app.borrow_mut() + .stream_completed(task_id) + .map(|result| *result.downcast::().unwrap()) + }); + + (task_id, task) } - fn relay_task_output(&mut self, task_id: usize, output: Box) -> bool { + fn handle_future_output( + &mut self, + task_id: usize, + output: Box, + ) -> Option> { self.pending_flushes += 1; - let task_callback = self.task_callbacks.remove(&task_id).unwrap(); + let future_callback = self.future_handlers.remove(&task_id).unwrap(); - let halt = match task_callback { - TaskCallback::OnModelFromFuture { model_id, callback } => { + let mut result = None; + + match future_callback { + FutureHandler::Model { model_id, callback } => { if let Some(mut model) = self.ctx.models.remove(&model_id) { - callback( - model.as_any_mut(), - output, - self, - model_id, - self.foreground.clone(), - ); + result = Some(callback(model.as_any_mut(), output, self, model_id)); self.ctx.models.insert(model_id, model); } self.task_done(task_id); - true } - TaskCallback::OnModelFromStream { - model_id, - mut callback, - } => { - if let Some(mut model) = self.ctx.models.remove(&model_id) { - let halt = callback(model.as_any_mut(), output, self, model_id); - self.ctx.models.insert(model_id, model); - self.task_callbacks.insert( - task_id, - TaskCallback::OnModelFromStream { model_id, callback }, - ); - halt - } else { - true - } - } - TaskCallback::OnViewFromFuture { + FutureHandler::View { window_id, view_id, callback, @@ -970,14 +946,7 @@ impl MutableAppContext { .get_mut(&window_id) .and_then(|w| w.views.remove(&view_id)) { - callback( - view.as_mut(), - output, - self, - window_id, - view_id, - self.foreground.clone(), - ); + result = Some(callback(view.as_mut(), output, self, window_id, view_id)); self.ctx .windows .get_mut(&window_id) @@ -986,12 +955,37 @@ impl MutableAppContext { .insert(view_id, view); } self.task_done(task_id); - true } - TaskCallback::OnViewFromStream { + }; + + self.flush_effects(); + result + } + + fn handle_stream_item(&mut self, task_id: usize, output: Box) -> bool { + self.pending_flushes += 1; + + let mut handler = self.stream_handlers.remove(&task_id).unwrap(); + let halt = match &mut handler { + StreamHandler::Model { + model_id, + item_callback, + .. + } => { + if let Some(mut model) = self.ctx.models.remove(&model_id) { + let halt = item_callback(model.as_any_mut(), output, self, *model_id); + self.ctx.models.insert(*model_id, model); + self.stream_handlers.insert(task_id, handler); + halt + } else { + true + } + } + StreamHandler::View { window_id, view_id, - mut callback, + item_callback, + .. } => { if let Some(mut view) = self .ctx @@ -999,34 +993,67 @@ impl MutableAppContext { .get_mut(&window_id) .and_then(|w| w.views.remove(&view_id)) { - let halt = callback(view.as_mut(), output, self, window_id, view_id); + let halt = item_callback(view.as_mut(), output, self, *window_id, *view_id); self.ctx .windows .get_mut(&window_id) .unwrap() .views - .insert(view_id, view); - self.task_callbacks.insert( - task_id, - TaskCallback::OnViewFromStream { - window_id, - view_id, - callback, - }, - ); + .insert(*view_id, view); + self.stream_handlers.insert(task_id, handler); halt } else { true } } }; + self.flush_effects(); halt } - fn stream_completed(&mut self, task_id: usize) { - self.task_callbacks.remove(&task_id); + fn stream_completed(&mut self, task_id: usize) -> Option> { + let result = match self.stream_handlers.remove(&task_id).unwrap() { + StreamHandler::Model { + model_id, + done_callback, + .. + } => { + if let Some(mut model) = self.ctx.models.remove(&model_id) { + let result = done_callback(model.as_any_mut(), self, model_id); + self.ctx.models.insert(model_id, model); + Some(result) + } else { + None + } + } + StreamHandler::View { + window_id, + view_id, + done_callback, + .. + } => { + if let Some(mut view) = self + .ctx + .windows + .get_mut(&window_id) + .and_then(|w| w.views.remove(&view_id)) + { + let result = done_callback(view.as_mut(), self, window_id, view_id); + self.ctx + .windows + .get_mut(&window_id) + .unwrap() + .views + .insert(view_id, view); + Some(result) + } else { + None + } + } + }; self.task_done(task_id); + result } fn task_done(&self, task_id: usize) { @@ -1039,7 +1066,7 @@ impl MutableAppContext { } pub fn finish_pending_tasks(&self) -> impl Future { - let mut pending_tasks = self.task_callbacks.keys().cloned().collect::>(); + let mut pending_tasks = self.future_handlers.keys().cloned().collect::>(); let task_done = self.task_done.1.clone(); async move { @@ -1404,82 +1431,68 @@ impl<'a, T: Entity> ModelContext<'a, T> { }); } - pub fn spawn_local(&mut self, future: S, callback: F) -> impl Future + pub fn spawn(&mut self, future: S, callback: F) -> ForegroundTask> where S: 'static + Future, F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext) -> U, U: 'static, { - let (tx, rx) = channel::bounded(1); + let (task_id, task) = self.app.spawn::(future); - let task_id = self.app.spawn_local(future); - - self.app.task_callbacks.insert( + self.app.future_handlers.insert( task_id, - TaskCallback::OnModelFromFuture { - model_id: self.model_id, - callback: Box::new(move |model, output, app, model_id, executor| { - let model = model.downcast_mut().unwrap(); - let output = *output.downcast().unwrap(); - let result = callback(model, output, &mut ModelContext::new(app, model_id)); - executor - .spawn(async move { tx.send(result).await }) - .detach(); - }), - }, - ); - - async move { rx.recv().await.unwrap() } - } - - pub fn spawn(&mut self, future: S, callback: F) -> impl Future - where - S: 'static + Future + Send, - S::Output: Send, - F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext) -> U, - U: 'static, - { - let (tx, rx) = channel::bounded(1); - - self.app - .background - .spawn(async move { - if let Err(e) = tx.send(future.await).await { - log::error!("error sending background task result to main thread: {}", e); - } - }) - .detach(); - - self.spawn_local(async move { rx.recv().await.unwrap() }, callback) - } - - pub fn spawn_stream_local( - &mut self, - stream: S, - mut callback: F, - ) -> impl Future - where - S: 'static + Stream + Unpin, - F: 'static + FnMut(&mut T, Option, &mut ModelContext), - { - let (tx, rx) = channel::bounded(1); - - let task_id = self.app.spawn_stream_local(stream, tx); - self.app.task_callbacks.insert( - task_id, - TaskCallback::OnModelFromStream { + FutureHandler::Model { model_id: self.model_id, callback: Box::new(move |model, output, app, model_id| { let model = model.downcast_mut().unwrap(); let output = *output.downcast().unwrap(); - let mut ctx = ModelContext::new(app, model_id); - callback(model, output, &mut ctx); - ctx.halt_stream + Box::new(callback( + model, + output, + &mut ModelContext::new(app, model_id), + )) }), }, ); - async move { rx.recv().await.unwrap() } + task + } + + pub fn spawn_stream( + &mut self, + stream: S, + mut item_callback: F, + done_callback: G, + ) -> ForegroundTask> + where + S: 'static + Stream + Unpin, + F: 'static + FnMut(&mut T, S::Item, &mut ModelContext), + G: 'static + FnOnce(&mut T, &mut ModelContext) -> U, + U: 'static + Any, + { + let (task_id, task) = self.app.spawn_stream(stream); + self.app.stream_handlers.insert( + task_id, + StreamHandler::Model { + model_id: self.model_id, + item_callback: Box::new(move |model, output, app, model_id| { + let model = model.downcast_mut().unwrap(); + let output = *output.downcast().unwrap(); + let mut ctx = ModelContext::new(app, model_id); + item_callback(model, output, &mut ctx); + ctx.halt_stream + }), + done_callback: Box::new( + move |model: &mut dyn Any, app: &mut MutableAppContext, model_id| { + let model = model.downcast_mut().unwrap(); + let mut ctx = ModelContext::new(app, model_id); + Box::new(done_callback(model, &mut ctx)) + }, + ), + }, + ); + + task } } @@ -1674,85 +1687,67 @@ impl<'a, T: View> ViewContext<'a, T> { self.halt_stream = true; } - pub fn spawn_local(&mut self, future: S, callback: F) -> impl Future + pub fn spawn(&mut self, future: S, callback: F) -> ForegroundTask> where S: 'static + Future, F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext) -> U, U: 'static, { - let (tx, rx) = channel::bounded(1); + let (task_id, task) = self.app.spawn(future); - let task_id = self.app.spawn_local(future); - - self.app.task_callbacks.insert( + self.app.future_handlers.insert( task_id, - TaskCallback::OnViewFromFuture { - window_id: self.window_id, - view_id: self.view_id, - callback: Box::new(move |view, output, app, window_id, view_id, executor| { - let view = view.as_any_mut().downcast_mut().unwrap(); - let output = *output.downcast().unwrap(); - let result = - callback(view, output, &mut ViewContext::new(app, window_id, view_id)); - executor - .spawn(async move { tx.send(result).await }) - .detach(); - }), - }, - ); - - async move { rx.recv().await.unwrap() } - } - - pub fn spawn(&mut self, future: S, callback: F) -> impl Future - where - S: 'static + Future + Send, - S::Output: Send, - F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext) -> U, - U: 'static, - { - let (tx, rx) = channel::bounded(1); - - self.app - .background - .spawn(async move { - if let Err(_) = tx.send(future.await).await { - log::error!("Error sending background task result to main thread",); - } - }) - .detach(); - - self.spawn_local(async move { rx.recv().await.unwrap() }, callback) - } - - pub fn spawn_stream_local( - &mut self, - stream: S, - mut callback: F, - ) -> impl Future - where - S: 'static + Stream + Unpin, - F: 'static + FnMut(&mut T, Option, &mut ViewContext), - { - let (tx, rx) = channel::bounded(1); - - let task_id = self.app.spawn_stream_local(stream, tx); - self.app.task_callbacks.insert( - task_id, - TaskCallback::OnViewFromStream { + FutureHandler::View { window_id: self.window_id, view_id: self.view_id, callback: Box::new(move |view, output, app, window_id, view_id| { let view = view.as_any_mut().downcast_mut().unwrap(); let output = *output.downcast().unwrap(); - let mut ctx = ViewContext::new(app, window_id, view_id); - callback(view, output, &mut ctx); - ctx.halt_stream + Box::new(callback( + view, + output, + &mut ViewContext::new(app, window_id, view_id), + )) }), }, ); - async move { rx.recv().await.unwrap() } + task + } + + pub fn spawn_stream( + &mut self, + stream: S, + mut item_callback: F, + done_callback: G, + ) -> ForegroundTask> + where + S: 'static + Stream + Unpin, + F: 'static + FnMut(&mut T, S::Item, &mut ViewContext), + G: 'static + FnOnce(&mut T, &mut ViewContext) -> U, + U: 'static + Any, + { + let (task_id, task) = self.app.spawn_stream(stream); + self.app.stream_handlers.insert( + task_id, + StreamHandler::View { + window_id: self.window_id, + view_id: self.view_id, + item_callback: Box::new(move |view, output, app, window_id, view_id| { + let view = view.as_any_mut().downcast_mut().unwrap(); + let output = *output.downcast().unwrap(); + let mut ctx = ViewContext::new(app, window_id, view_id); + item_callback(view, output, &mut ctx); + ctx.halt_stream + }), + done_callback: Box::new(move |view, app, window_id, view_id| { + let view = view.as_any_mut().downcast_mut().unwrap(); + let mut ctx = ViewContext::new(app, window_id, view_id); + Box::new(done_callback(view, &mut ctx)) + }), + }, + ); + task } } @@ -2192,24 +2187,14 @@ enum Observation { }, } -enum TaskCallback { - OnModelFromFuture { +enum FutureHandler { + Model { model_id: usize, callback: Box< - dyn FnOnce( - &mut dyn Any, - Box, - &mut MutableAppContext, - usize, - Rc, - ), + dyn FnOnce(&mut dyn Any, Box, &mut MutableAppContext, usize) -> Box, >, }, - OnModelFromStream { - model_id: usize, - callback: Box, &mut MutableAppContext, usize) -> bool>, - }, - OnViewFromFuture { + View { window_id: usize, view_id: usize, callback: Box< @@ -2219,16 +2204,26 @@ enum TaskCallback { &mut MutableAppContext, usize, usize, - Rc, - ), + ) -> Box, >, }, - OnViewFromStream { +} + +enum StreamHandler { + Model { + model_id: usize, + item_callback: + Box, &mut MutableAppContext, usize) -> bool>, + done_callback: Box Box>, + }, + View { window_id: usize, view_id: usize, - callback: Box< + item_callback: Box< dyn FnMut(&mut dyn AnyView, Box, &mut MutableAppContext, usize, usize) -> bool, >, + done_callback: + Box Box>, }, } @@ -2395,7 +2390,7 @@ mod tests { let handle = app.add_model(|_| Model::default()); handle .update(&mut app, |_, c| { - c.spawn_local(async { 7 }, |model, output, _| { + c.spawn(async { 7 }, |model, output, _| { model.count = output; }) }) @@ -2428,9 +2423,15 @@ mod tests { let handle = app.add_model(|_| Model::default()); handle .update(&mut app, |_, c| { - c.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |model, output, _| { - model.events.push(output); - }) + c.spawn_stream( + smol::stream::iter(vec![1, 2, 3]), + |model, output, _| { + model.events.push(Some(output)); + }, + |model, _| { + model.events.push(None); + }, + ) }) .await; @@ -2802,7 +2803,7 @@ mod tests { let (_, handle) = app.add_window(|_| View::default()); handle .update(&mut app, |_, c| { - c.spawn_local(async { 7 }, |me, output, _| { + c.spawn(async { 7 }, |me, output, _| { me.count = output; }) }) @@ -2844,9 +2845,15 @@ mod tests { let (_, handle) = app.add_window(|_| View::default()); handle .update(&mut app, |_, c| { - c.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |me, output, _| { - me.events.push(output); - }) + c.spawn_stream( + smol::stream::iter(vec![1_usize, 2, 3]), + |me, output, _| { + me.events.push(Some(output)); + }, + |me, _| { + me.events.push(None); + }, + ) }) .await; @@ -3159,19 +3166,21 @@ mod tests { model.update(&mut app, |_, ctx| { let _ = ctx.spawn(async {}, |_, _, _| {}); - let _ = ctx.spawn_local(async {}, |_, _, _| {}); - let _ = ctx.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}); + let _ = ctx.spawn(async {}, |_, _, _| {}); + let _ = + ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}); }); view.update(&mut app, |_, ctx| { let _ = ctx.spawn(async {}, |_, _, _| {}); - let _ = ctx.spawn_local(async {}, |_, _, _| {}); - let _ = ctx.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}); + let _ = ctx.spawn(async {}, |_, _, _| {}); + let _ = + ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}); }); - assert!(!app.0.borrow().task_callbacks.is_empty()); + assert!(!app.0.borrow().future_handlers.is_empty()); app.finish_pending_tasks().await; - assert!(app.0.borrow().task_callbacks.is_empty()); + assert!(app.0.borrow().future_handlers.is_empty()); app.finish_pending_tasks().await; // Don't block if there are no tasks }); } diff --git a/gpui/src/executor.rs b/gpui/src/executor.rs index 1261ec9e87..e9900ead11 100644 --- a/gpui/src/executor.rs +++ b/gpui/src/executor.rs @@ -1,6 +1,6 @@ -// #[cfg(not(test))] use anyhow::{anyhow, Result}; use async_task::Runnable; +use pin_project::pin_project; use smol::prelude::*; use smol::{channel, Executor}; use std::rc::Rc; @@ -17,9 +17,24 @@ pub enum Foreground { Test(smol::LocalExecutor<'static>), } +#[pin_project(project = ForegroundTaskProject)] pub enum ForegroundTask { - Platform(async_task::Task), - Test(smol::Task), + Platform(#[pin] async_task::Task), + Test(#[pin] smol::Task), +} + +impl Future for ForegroundTask { + type Output = T; + + fn poll( + self: std::pin::Pin<&mut Self>, + ctx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + match self.project() { + ForegroundTaskProject::Platform(task) => task.poll(ctx), + ForegroundTaskProject::Test(task) => task.poll(ctx), + } + } } pub struct Background { @@ -27,6 +42,7 @@ pub struct Background { _stop: channel::Sender<()>, } +#[must_use] pub type BackgroundTask = smol::Task; impl Foreground { @@ -69,6 +85,7 @@ impl Foreground { } } +#[must_use] impl ForegroundTask { pub fn detach(self) { match self { diff --git a/zed/src/editor/buffer_view.rs b/zed/src/editor/buffer_view.rs index 28513f70d7..d6fd9672f4 100644 --- a/zed/src/editor/buffer_view.rs +++ b/zed/src/editor/buffer_view.rs @@ -1066,7 +1066,7 @@ impl BufferView { ctx.notify(); let epoch = self.next_blink_epoch(); - let _ = ctx.spawn_local( + let _ = ctx.spawn( async move { Timer::after(CURSOR_BLINK_INTERVAL).await; epoch @@ -1088,7 +1088,7 @@ impl BufferView { ctx.notify(); let epoch = self.next_blink_epoch(); - let _ = ctx.spawn_local( + let _ = ctx.spawn( async move { Timer::after(CURSOR_BLINK_INTERVAL).await; epoch diff --git a/zed/src/watch.rs b/zed/src/watch.rs index 7cc7f59fbe..9405338c98 100644 --- a/zed/src/watch.rs +++ b/zed/src/watch.rs @@ -38,14 +38,14 @@ impl Receiver { impl Receiver { pub fn notify_model_on_change(&self, ctx: &mut ModelContext) { let watch = self.clone(); - let _ = ctx.spawn_local(async move { watch.updated().await }, |_, _, ctx| { + let _ = ctx.spawn(async move { watch.updated().await }, |_, _, ctx| { ctx.notify() }); } pub fn notify_view_on_change(&self, ctx: &mut ViewContext) { let watch = self.clone(); - let _ = ctx.spawn_local(async move { watch.updated().await }, |_, _, ctx| { + let _ = ctx.spawn(async move { watch.updated().await }, |_, _, ctx| { ctx.notify() }); } diff --git a/zed/src/worktree/worktree.rs b/zed/src/worktree/worktree.rs index f22603da87..ab8d41c46b 100644 --- a/zed/src/worktree/worktree.rs +++ b/zed/src/worktree/worktree.rs @@ -62,18 +62,19 @@ impl Worktree { let tree = tree.clone(); let (tx, rx) = smol::channel::bounded(1); - ctx.background_executor() - .spawn(async move { - tx.send(tree.scan_dirs()).await.unwrap(); - }) - .detach(); + let task = ctx.background_executor().spawn(async move { + let _ = tx.send(tree.scan_dirs()?).await; + Ok(()) + }); - let _ = ctx.spawn_local(async move { rx.recv().await.unwrap() }, Self::done_scanning); + ctx.spawn(task, Self::done_scanning).detach(); - let _ = ctx.spawn_stream_local( + ctx.spawn_stream( timer::repeat(Duration::from_millis(100)).map(|_| ()), Self::scanning, - ); + |_, _| {}, + ) + .detach(); } tree @@ -347,7 +348,7 @@ impl Worktree { } } - fn scanning(&mut self, _: Option<()>, ctx: &mut ModelContext) { + fn scanning(&mut self, _: (), ctx: &mut ModelContext) { if self.0.read().scanning { ctx.notify(); } else { @@ -356,6 +357,7 @@ impl Worktree { } fn done_scanning(&mut self, result: io::Result<()>, ctx: &mut ModelContext) { + log::info!("done scanning"); self.0.write().scanning = false; if let Err(error) = result { log::error!("error populating worktree: {}", error);