From 019a14bcde3ec262216d4fa09ff634e35d1c951d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 6 Jun 2025 18:00:09 +0200 Subject: [PATCH] Replace `async-watch` with a custom watch (#32245) The `async-watch` crate doesn't seem to be maintained and we noticed several panics coming from it, such as: ``` [bug] failed to observe change after notificaton. zed::reliability::init_panic_hook::{{closure}}::hea8cdcb6299fad6b+154543526 std::panicking::rust_panic_with_hook::h33b18b24045abff4+127578547 std::panicking::begin_panic_handler::{{closure}}::hf8313cc2fd0126bc+127577770 std::sys::backtrace::__rust_end_short_backtrace::h57fe07c8aea5c98a+127571385 __rustc[95feac21a9532783]::rust_begin_unwind+127576909 core::panicking::panic_fmt::hd54fb667be51beea+9433328 core::option::expect_failed::h8456634a3dada3e4+9433291 assistant_tools::edit_agent::EditAgent::apply_edit_chunks::{{closure}}::habe2e1a32b267fd4+26921553 gpui::app::async_context::AsyncApp::spawn::{{closure}}::h12f5f25757f572ea+25923441 async_task::raw::RawTask::run::h3cca0d402690ccba+25186815 ::run::h26264aefbcfbc14b+73961666 gpui::platform::linux::platform::::run::hb12dcd4abad715b5+73562509 gpui::app::Application::run::h0f936a5f855a3f9f+150676820 zed::main::ha17f9a25fe257d35+154788471 std::sys::backtrace::__rust_begin_short_backtrace::h1edd02429370b2bd+154624579 std::rt::lang_start::{{closure}}::h3d2e300f10059b0a+154264777 std::rt::lang_start_internal::h418648f91f5be3a1+127502049 main+154806636 __libc_start_main+46051972301573 _start+12358494 ``` I didn't find an executor-agnostic watch crate that was well maintained (we already tried postage and async-watch), so decided to implement it our own version. Release Notes: - Fixed a panic that could sometimes occur when the agent performed edits. --- Cargo.lock | 38 +-- Cargo.toml | 3 +- crates/agent/Cargo.toml | 2 +- crates/agent/src/inline_assistant.rs | 8 +- crates/assistant_tool/Cargo.toml | 2 +- crates/assistant_tool/src/action_log.rs | 2 +- crates/assistant_tools/Cargo.toml | 2 +- crates/assistant_tools/src/edit_agent.rs | 4 +- crates/eval/Cargo.toml | 2 +- crates/eval/src/eval.rs | 2 +- crates/language/Cargo.toml | 2 +- crates/language/src/buffer.rs | 3 +- crates/node_runtime/Cargo.toml | 2 +- crates/node_runtime/src/node_runtime.rs | 6 +- crates/remote_server/Cargo.toml | 2 +- crates/remote_server/src/unix.rs | 4 +- crates/watch/Cargo.toml | 24 ++ crates/watch/LICENSE-APACHE | 1 + crates/watch/src/error.rs | 25 ++ crates/watch/src/watch.rs | 279 +++++++++++++++++++++++ crates/zed/Cargo.toml | 2 +- crates/zed/src/main.rs | 2 +- 22 files changed, 375 insertions(+), 42 deletions(-) create mode 100644 crates/watch/Cargo.toml create mode 120000 crates/watch/LICENSE-APACHE create mode 100644 crates/watch/src/error.rs create mode 100644 crates/watch/src/watch.rs diff --git a/Cargo.lock b/Cargo.lock index d572bd1f78..abf5705a78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,7 +60,6 @@ dependencies = [ "assistant_slash_commands", "assistant_tool", "assistant_tools", - "async-watch", "audio", "buffer_diff", "chrono", @@ -131,6 +130,7 @@ dependencies = [ "urlencoding", "util", "uuid", + "watch", "workspace", "workspace-hack", "zed_actions", @@ -631,7 +631,6 @@ name = "assistant_tool" version = "0.1.0" dependencies = [ "anyhow", - "async-watch", "buffer_diff", "clock", "collections", @@ -653,6 +652,7 @@ dependencies = [ "settings", "text", "util", + "watch", "workspace", "workspace-hack", "zlog", @@ -665,7 +665,6 @@ dependencies = [ "agent_settings", "anyhow", "assistant_tool", - "async-watch", "buffer_diff", "chrono", "client", @@ -716,6 +715,7 @@ dependencies = [ "ui", "unindent", "util", + "watch", "web_search", "which 6.0.3", "workspace", @@ -1074,15 +1074,6 @@ dependencies = [ "tungstenite 0.26.2", ] -[[package]] -name = "async-watch" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a078faf4e27c0c6cc0efb20e5da59dcccc04968ebf2801d8e0b2195124cdcdb2" -dependencies = [ - "event-listener 2.5.3", -] - [[package]] name = "async_zip" version = "0.0.17" @@ -5013,7 +5004,6 @@ dependencies = [ "assistant_tool", "assistant_tools", "async-trait", - "async-watch", "buffer_diff", "chrono", "clap", @@ -5055,6 +5045,7 @@ dependencies = [ "unindent", "util", "uuid", + "watch", "workspace-hack", "zed_llm_client", ] @@ -8739,7 +8730,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "async-watch", "clock", "collections", "ctor", @@ -8789,6 +8779,7 @@ dependencies = [ "unicase", "unindent", "util", + "watch", "workspace-hack", "zlog", ] @@ -10147,7 +10138,6 @@ dependencies = [ "async-std", "async-tar", "async-trait", - "async-watch", "futures 0.3.31", "http_client", "log", @@ -10157,6 +10147,7 @@ dependencies = [ "serde_json", "smol", "util", + "watch", "which 6.0.3", "workspace-hack", ] @@ -13007,7 +12998,6 @@ dependencies = [ "askpass", "assistant_tool", "assistant_tools", - "async-watch", "backtrace", "cargo_toml", "chrono", @@ -13054,6 +13044,7 @@ dependencies = [ "toml 0.8.20", "unindent", "util", + "watch", "worktree", "zlog", ] @@ -17915,6 +17906,19 @@ dependencies = [ "leb128", ] +[[package]] +name = "watch" +version = "0.1.0" +dependencies = [ + "ctor", + "futures 0.3.31", + "gpui", + "parking_lot", + "rand 0.8.5", + "workspace-hack", + "zlog", +] + [[package]] name = "wayland-backend" version = "0.3.8" @@ -19726,7 +19730,6 @@ dependencies = [ "assistant_context_editor", "assistant_tool", "assistant_tools", - "async-watch", "audio", "auto_update", "auto_update_ui", @@ -19843,6 +19846,7 @@ dependencies = [ "uuid", "vim", "vim_mode_setting", + "watch", "web_search", "web_search_providers", "welcome", diff --git a/Cargo.toml b/Cargo.toml index f8ab21eedd..39d15f8c1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -165,6 +165,7 @@ members = [ "crates/util_macros", "crates/vim", "crates/vim_mode_setting", + "crates/watch", "crates/web_search", "crates/web_search_providers", "crates/welcome", @@ -373,6 +374,7 @@ util = { path = "crates/util" } util_macros = { path = "crates/util_macros" } vim = { path = "crates/vim" } vim_mode_setting = { path = "crates/vim_mode_setting" } +watch = { path = "crates/watch" } web_search = { path = "crates/web_search" } web_search_providers = { path = "crates/web_search_providers" } welcome = { path = "crates/welcome" } @@ -403,7 +405,6 @@ async-recursion = "1.0.0" async-tar = "0.5.0" async-trait = "0.1" async-tungstenite = "0.29.1" -async-watch = "0.3.1" async_zip = { version = "0.0.17", features = ["deflate", "deflate64"] } aws-config = { version = "1.6.1", features = ["behavior-version-latest"] } aws-credential-types = { version = "1.2.2", features = [ diff --git a/crates/agent/Cargo.toml b/crates/agent/Cargo.toml index 1b07d94605..cf0badcff6 100644 --- a/crates/agent/Cargo.toml +++ b/crates/agent/Cargo.toml @@ -25,7 +25,6 @@ assistant_context_editor.workspace = true assistant_slash_command.workspace = true assistant_slash_commands.workspace = true assistant_tool.workspace = true -async-watch.workspace = true audio.workspace = true buffer_diff.workspace = true chrono.workspace = true @@ -95,6 +94,7 @@ ui_input.workspace = true urlencoding.workspace = true util.workspace = true uuid.workspace = true +watch.workspace = true workspace-hack.workspace = true workspace.workspace = true zed_actions.workspace = true diff --git a/crates/agent/src/inline_assistant.rs b/crates/agent/src/inline_assistant.rs index ca286ffb6b..622098c6b8 100644 --- a/crates/agent/src/inline_assistant.rs +++ b/crates/agent/src/inline_assistant.rs @@ -1011,7 +1011,7 @@ impl InlineAssistant { self.update_editor_highlights(&editor, cx); } } else { - entry.get().highlight_updates.send(()).ok(); + entry.get_mut().highlight_updates.send(()).ok(); } } @@ -1519,7 +1519,7 @@ impl InlineAssistant { struct EditorInlineAssists { assist_ids: Vec, scroll_lock: Option, - highlight_updates: async_watch::Sender<()>, + highlight_updates: watch::Sender<()>, _update_highlights: Task>, _subscriptions: Vec, } @@ -1531,7 +1531,7 @@ struct InlineAssistScrollLock { impl EditorInlineAssists { fn new(editor: &Entity, window: &mut Window, cx: &mut App) -> Self { - let (highlight_updates_tx, mut highlight_updates_rx) = async_watch::channel(()); + let (highlight_updates_tx, mut highlight_updates_rx) = watch::channel(()); Self { assist_ids: Vec::new(), scroll_lock: None, @@ -1689,7 +1689,7 @@ impl InlineAssist { if let Some(editor) = editor.upgrade() { InlineAssistant::update_global(cx, |this, cx| { if let Some(editor_assists) = - this.assists_by_editor.get(&editor.downgrade()) + this.assists_by_editor.get_mut(&editor.downgrade()) { editor_assists.highlight_updates.send(()).ok(); } diff --git a/crates/assistant_tool/Cargo.toml b/crates/assistant_tool/Cargo.toml index 9409e2063f..a8df1131c6 100644 --- a/crates/assistant_tool/Cargo.toml +++ b/crates/assistant_tool/Cargo.toml @@ -13,7 +13,6 @@ path = "src/assistant_tool.rs" [dependencies] anyhow.workspace = true -async-watch.workspace = true buffer_diff.workspace = true clock.workspace = true collections.workspace = true @@ -30,6 +29,7 @@ serde.workspace = true serde_json.workspace = true text.workspace = true util.workspace = true +watch.workspace = true workspace.workspace = true workspace-hack.workspace = true diff --git a/crates/assistant_tool/src/action_log.rs b/crates/assistant_tool/src/action_log.rs index 69c7b06366..34223e454c 100644 --- a/crates/assistant_tool/src/action_log.rs +++ b/crates/assistant_tool/src/action_log.rs @@ -204,7 +204,7 @@ impl ActionLog { git_store.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx) })?; - let (git_diff_updates_tx, mut git_diff_updates_rx) = async_watch::channel(()); + let (mut git_diff_updates_tx, mut git_diff_updates_rx) = watch::channel(()); let _repo_subscription = if let Some((git_diff, (buffer_repo, _))) = git_diff.as_ref().zip(buffer_repo) { cx.update(|cx| { diff --git a/crates/assistant_tools/Cargo.toml b/crates/assistant_tools/Cargo.toml index 8abe78a98f..ded54460d7 100644 --- a/crates/assistant_tools/Cargo.toml +++ b/crates/assistant_tools/Cargo.toml @@ -18,7 +18,6 @@ eval = [] agent_settings.workspace = true anyhow.workspace = true assistant_tool.workspace = true -async-watch.workspace = true buffer_diff.workspace = true chrono.workspace = true collections.workspace = true @@ -58,6 +57,7 @@ terminal_view.workspace = true theme.workspace = true ui.workspace = true util.workspace = true +watch.workspace = true web_search.workspace = true which.workspace = true workspace-hack.workspace = true diff --git a/crates/assistant_tools/src/edit_agent.rs b/crates/assistant_tools/src/edit_agent.rs index 0821719b7c..a247d5f4de 100644 --- a/crates/assistant_tools/src/edit_agent.rs +++ b/crates/assistant_tools/src/edit_agent.rs @@ -420,12 +420,12 @@ impl EditAgent { cx: &mut AsyncApp, ) -> ( Task)>>, - async_watch::Receiver>>, + watch::Receiver>>, ) where T: 'static + Send + Unpin + Stream>, { - let (old_range_tx, old_range_rx) = async_watch::channel(None); + let (mut old_range_tx, old_range_rx) = watch::channel(None); let task = cx.background_spawn(async move { let mut matcher = StreamingFuzzyMatcher::new(snapshot); while let Some(edit_event) = edit_events.next().await { diff --git a/crates/eval/Cargo.toml b/crates/eval/Cargo.toml index 1dff8ad7b6..1e1e3d16e4 100644 --- a/crates/eval/Cargo.toml +++ b/crates/eval/Cargo.toml @@ -24,7 +24,6 @@ anyhow.workspace = true assistant_tool.workspace = true assistant_tools.workspace = true async-trait.workspace = true -async-watch.workspace = true buffer_diff.workspace = true chrono.workspace = true clap.workspace = true @@ -66,5 +65,6 @@ toml.workspace = true unindent.workspace = true util.workspace = true uuid.workspace = true +watch.workspace = true workspace-hack.workspace = true zed_llm_client.workspace = true diff --git a/crates/eval/src/eval.rs b/crates/eval/src/eval.rs index 41dbe25d96..93c3618409 100644 --- a/crates/eval/src/eval.rs +++ b/crates/eval/src/eval.rs @@ -385,7 +385,7 @@ pub fn init(cx: &mut App) -> Arc { extension::init(cx); - let (tx, rx) = async_watch::channel(None); + let (mut tx, rx) = watch::channel(None); cx.observe_global::(move |cx| { let settings = &ProjectSettings::get_global(cx).node; let options = NodeBinaryOptions { diff --git a/crates/language/Cargo.toml b/crates/language/Cargo.toml index a776790403..278976d3cd 100644 --- a/crates/language/Cargo.toml +++ b/crates/language/Cargo.toml @@ -28,7 +28,6 @@ test-support = [ [dependencies] anyhow.workspace = true async-trait.workspace = true -async-watch.workspace = true clock.workspace = true collections.workspace = true ec4rs.workspace = true @@ -66,6 +65,7 @@ tree-sitter-typescript = { workspace = true, optional = true } tree-sitter.workspace = true unicase = "2.6" util.workspace = true +watch.workspace = true workspace-hack.workspace = true diffy = "0.4.2" diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index 93c46efd7f..08c2acb875 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -18,7 +18,6 @@ use crate::{ text_diff::text_diff, }; use anyhow::{Context as _, Result}; -use async_watch as watch; pub use clock::ReplicaId; use clock::{AGENT_REPLICA_ID, Lamport}; use collections::HashMap; @@ -945,7 +944,7 @@ impl Buffer { reparse: None, non_text_state_update_count: 0, sync_parse_timeout: Duration::from_millis(1), - parse_status: async_watch::channel(ParseStatus::Idle), + parse_status: watch::channel(ParseStatus::Idle), autoindent_requests: Default::default(), pending_autoindent: Default::default(), language: None, diff --git a/crates/node_runtime/Cargo.toml b/crates/node_runtime/Cargo.toml index 71d281a801..144fc2ae85 100644 --- a/crates/node_runtime/Cargo.toml +++ b/crates/node_runtime/Cargo.toml @@ -18,7 +18,6 @@ test-support = [] [dependencies] anyhow.workspace = true async-compression.workspace = true -async-watch.workspace = true async-tar.workspace = true async-trait.workspace = true futures.workspace = true @@ -30,6 +29,7 @@ serde.workspace = true serde_json.workspace = true smol.workspace = true util.workspace = true +watch.workspace = true which.workspace = true workspace-hack.workspace = true diff --git a/crates/node_runtime/src/node_runtime.rs b/crates/node_runtime/src/node_runtime.rs index 6057d2af80..08698a1d6c 100644 --- a/crates/node_runtime/src/node_runtime.rs +++ b/crates/node_runtime/src/node_runtime.rs @@ -36,7 +36,7 @@ struct NodeRuntimeState { http: Arc, instance: Option>, last_options: Option, - options: async_watch::Receiver>, + options: watch::Receiver>, shell_env_loaded: Shared>, } @@ -44,7 +44,7 @@ impl NodeRuntime { pub fn new( http: Arc, shell_env_loaded: Option>, - options: async_watch::Receiver>, + options: watch::Receiver>, ) -> Self { NodeRuntime(Arc::new(Mutex::new(NodeRuntimeState { http, @@ -60,7 +60,7 @@ impl NodeRuntime { http: Arc::new(http_client::BlockedHttpClient), instance: None, last_options: None, - options: async_watch::channel(Some(NodeBinaryOptions::default())).1, + options: watch::channel(Some(NodeBinaryOptions::default())).1, shell_env_loaded: oneshot::channel().1.shared(), }))) } diff --git a/crates/remote_server/Cargo.toml b/crates/remote_server/Cargo.toml index 207f93cd32..2dbe51b605 100644 --- a/crates/remote_server/Cargo.toml +++ b/crates/remote_server/Cargo.toml @@ -24,7 +24,6 @@ test-support = ["fs/test-support"] [dependencies] anyhow.workspace = true askpass.workspace = true -async-watch.workspace = true backtrace = "0.3" chrono.workspace = true clap.workspace = true @@ -63,6 +62,7 @@ smol.workspace = true sysinfo.workspace = true telemetry_events.workspace = true util.workspace = true +watch.workspace = true worktree.workspace = true [target.'cfg(not(windows))'.dependencies] diff --git a/crates/remote_server/src/unix.rs b/crates/remote_server/src/unix.rs index be551c44ce..48b4e483b4 100644 --- a/crates/remote_server/src/unix.rs +++ b/crates/remote_server/src/unix.rs @@ -756,7 +756,7 @@ fn initialize_settings( session: Arc, fs: Arc, cx: &mut App, -) -> async_watch::Receiver> { +) -> watch::Receiver> { let user_settings_file_rx = watch_config_file( &cx.background_executor(), fs, @@ -791,7 +791,7 @@ fn initialize_settings( } }); - let (tx, rx) = async_watch::channel(None); + let (mut tx, rx) = watch::channel(None); cx.observe_global::(move |cx| { let settings = &ProjectSettings::get_global(cx).node; log::info!("Got new node settings: {:?}", settings); diff --git a/crates/watch/Cargo.toml b/crates/watch/Cargo.toml new file mode 100644 index 0000000000..439a9af49f --- /dev/null +++ b/crates/watch/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "watch" +version = "0.1.0" +edition.workspace = true +publish.workspace = true +license = "Apache-2.0" + +[lints] +workspace = true + +[lib] +path = "src/watch.rs" +doctest = true + +[dependencies] +parking_lot.workspace = true +workspace-hack.workspace = true + +[dev-dependencies] +ctor.workspace = true +futures.workspace = true +gpui = { workspace = true, features = ["test-support"] } +rand.workspace = true +zlog.workspace = true diff --git a/crates/watch/LICENSE-APACHE b/crates/watch/LICENSE-APACHE new file mode 120000 index 0000000000..1cd601d0a3 --- /dev/null +++ b/crates/watch/LICENSE-APACHE @@ -0,0 +1 @@ +../../LICENSE-APACHE \ No newline at end of file diff --git a/crates/watch/src/error.rs b/crates/watch/src/error.rs new file mode 100644 index 0000000000..231676cb73 --- /dev/null +++ b/crates/watch/src/error.rs @@ -0,0 +1,25 @@ +//! Watch error types. + +use std::fmt; + +#[derive(Debug, Eq, PartialEq)] +pub struct NoReceiverError; + +impl fmt::Display for NoReceiverError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "all receivers were dropped") + } +} + +impl std::error::Error for NoReceiverError {} + +#[derive(Debug, Eq, PartialEq)] +pub struct NoSenderError; + +impl fmt::Display for NoSenderError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "sender was dropped") + } +} + +impl std::error::Error for NoSenderError {} diff --git a/crates/watch/src/watch.rs b/crates/watch/src/watch.rs new file mode 100644 index 0000000000..a4a0ca6df4 --- /dev/null +++ b/crates/watch/src/watch.rs @@ -0,0 +1,279 @@ +mod error; + +pub use error::*; +use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard}; +use std::{ + collections::BTreeMap, + mem, + pin::Pin, + sync::Arc, + task::{Context, Poll, Waker}, +}; + +pub fn channel(value: T) -> (Sender, Receiver) { + let state = Arc::new(RwLock::new(State { + value, + wakers: BTreeMap::new(), + next_waker_id: WakerId::default(), + version: 0, + closed: false, + })); + + ( + Sender { + state: state.clone(), + }, + Receiver { state, version: 0 }, + ) +} + +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +struct WakerId(usize); + +impl WakerId { + fn post_inc(&mut self) -> Self { + let id = *self; + self.0 = id.0.wrapping_add(1); + *self + } +} + +struct State { + value: T, + wakers: BTreeMap, + next_waker_id: WakerId, + version: usize, + closed: bool, +} + +pub struct Sender { + state: Arc>>, +} + +impl Sender { + pub fn receiver(&self) -> Receiver { + let version = self.state.read().version; + Receiver { + state: self.state.clone(), + version, + } + } + + pub fn send(&mut self, value: T) -> Result<(), NoReceiverError> { + if let Some(state) = Arc::get_mut(&mut self.state) { + let state = state.get_mut(); + state.value = value; + debug_assert_eq!(state.wakers.len(), 0); + Err(NoReceiverError) + } else { + let mut state = self.state.write(); + state.value = value; + state.version = state.version.wrapping_add(1); + let wakers = mem::take(&mut state.wakers); + drop(state); + + for (_, waker) in wakers { + waker.wake(); + } + + Ok(()) + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let mut state = self.state.write(); + state.closed = true; + for (_, waker) in mem::take(&mut state.wakers) { + waker.wake(); + } + } +} + +#[derive(Clone)] +pub struct Receiver { + state: Arc>>, + version: usize, +} + +struct Changed<'a, T> { + receiver: &'a mut Receiver, + pending_waker_id: Option, +} + +impl Future for Changed<'_, T> { + type Output = Result<(), NoSenderError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = &mut *self; + + let state = this.receiver.state.upgradable_read(); + if state.version != this.receiver.version { + // The sender produced a new value. Avoid unregistering the pending + // waker, because the sender has already done so. + this.pending_waker_id = None; + this.receiver.version = state.version; + Poll::Ready(Ok(())) + } else if state.closed { + Poll::Ready(Err(NoSenderError)) + } else { + let mut state = RwLockUpgradableReadGuard::upgrade(state); + + // Unregister the pending waker. This should happen automatically + // when the waker gets awoken by the sender, but if this future was + // polled again without an explicit call to `wake` (e.g., a spurious + // wake by the executor), we need to remove it manually. + if let Some(pending_waker_id) = this.pending_waker_id.take() { + state.wakers.remove(&pending_waker_id); + } + + // Register the waker for this future. + let waker_id = state.next_waker_id.post_inc(); + state.wakers.insert(waker_id, cx.waker().clone()); + this.pending_waker_id = Some(waker_id); + + Poll::Pending + } + } +} + +impl Drop for Changed<'_, T> { + fn drop(&mut self) { + // If this future gets dropped before the waker has a chance of being + // awoken, we need to clear it to avoid a memory leak. + if let Some(waker_id) = self.pending_waker_id { + let mut state = self.receiver.state.write(); + state.wakers.remove(&waker_id); + } + } +} + +impl Receiver { + pub fn borrow(&mut self) -> parking_lot::MappedRwLockReadGuard { + let state = self.state.read(); + self.version = state.version; + RwLockReadGuard::map(state, |state| &state.value) + } + + pub fn changed(&mut self) -> impl Future> { + Changed { + receiver: self, + pending_waker_id: None, + } + } +} + +impl Receiver { + pub async fn recv(&mut self) -> Result { + self.changed().await?; + Ok(self.borrow().clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{FutureExt, select_biased}; + use gpui::{AppContext, TestAppContext}; + use std::{ + pin::pin, + sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, + }; + + #[gpui::test] + async fn test_basic_watch() { + let (mut sender, mut receiver) = channel(0); + assert_eq!(sender.send(1), Ok(())); + assert_eq!(receiver.recv().await, Ok(1)); + + assert_eq!(sender.send(2), Ok(())); + assert_eq!(sender.send(3), Ok(())); + assert_eq!(receiver.recv().await, Ok(3)); + + drop(receiver); + assert_eq!(sender.send(4), Err(NoReceiverError)); + + let mut receiver = sender.receiver(); + assert_eq!(sender.send(5), Ok(())); + assert_eq!(receiver.recv().await, Ok(5)); + + // Ensure `changed` doesn't resolve if we just read the latest value + // using `borrow`. + assert_eq!(sender.send(6), Ok(())); + assert_eq!(*receiver.borrow(), 6); + assert_eq!(receiver.changed().now_or_never(), None); + + assert_eq!(sender.send(7), Ok(())); + drop(sender); + assert_eq!(receiver.recv().await, Ok(7)); + assert_eq!(receiver.recv().await, Err(NoSenderError)); + } + + #[gpui::test(iterations = 1000)] + async fn test_watch_random(cx: &mut TestAppContext) { + let next_id = Arc::new(AtomicUsize::new(1)); + let closed = Arc::new(AtomicBool::new(false)); + let (mut tx, rx) = channel(0); + let mut tasks = Vec::new(); + + tasks.push(cx.background_spawn({ + let executor = cx.executor().clone(); + let next_id = next_id.clone(); + let closed = closed.clone(); + async move { + for _ in 0..16 { + executor.simulate_random_delay().await; + let id = next_id.fetch_add(1, SeqCst); + zlog::info!("sending {}", id); + tx.send(id).ok(); + } + closed.store(true, SeqCst); + } + })); + + for receiver_id in 0..16 { + let executor = cx.executor().clone(); + let next_id = next_id.clone(); + let closed = closed.clone(); + let mut rx = rx.clone(); + let mut prev_observed_value = *rx.borrow(); + tasks.push(cx.background_spawn(async move { + for _ in 0..16 { + executor.simulate_random_delay().await; + + zlog::info!("{}: receiving", receiver_id); + let mut timeout = executor.simulate_random_delay().fuse(); + let mut recv = pin!(rx.recv().fuse()); + select_biased! { + _ = timeout => { + zlog::info!("{}: dropping recv future", receiver_id); + } + result = recv => { + match result { + Ok(value) => { + zlog::info!("{}: received {}", receiver_id, value); + assert_eq!(value, next_id.load(SeqCst) - 1); + assert_ne!(value, prev_observed_value); + prev_observed_value = value; + } + Err(NoSenderError) => { + zlog::info!("{}: closed", receiver_id); + assert!(closed.load(SeqCst)); + break; + } + } + } + } + } + })); + } + + futures::future::join_all(tasks).await; + } + + #[ctor::ctor] + fn init_logger() { + zlog::init_test(); + } +} diff --git a/crates/zed/Cargo.toml b/crates/zed/Cargo.toml index c40ea4cb98..060f0163e6 100644 --- a/crates/zed/Cargo.toml +++ b/crates/zed/Cargo.toml @@ -28,7 +28,6 @@ assets.workspace = true assistant_context_editor.workspace = true assistant_tool.workspace = true assistant_tools.workspace = true -async-watch.workspace = true audio.workspace = true auto_update.workspace = true auto_update_ui.workspace = true @@ -142,6 +141,7 @@ util.workspace = true uuid.workspace = true vim.workspace = true vim_mode_setting.workspace = true +watch.workspace = true web_search.workspace = true web_search_providers.workspace = true welcome.workspace = true diff --git a/crates/zed/src/main.rs b/crates/zed/src/main.rs index 490f5b8d67..eeed1e9b7c 100644 --- a/crates/zed/src/main.rs +++ b/crates/zed/src/main.rs @@ -412,7 +412,7 @@ Error: Running Zed as root or via sudo is unsupported. let mut languages = LanguageRegistry::new(cx.background_executor().clone()); languages.set_language_server_download_dir(paths::languages_dir().clone()); let languages = Arc::new(languages); - let (tx, rx) = async_watch::channel(None); + let (mut tx, rx) = watch::channel(None); cx.observe_global::(move |cx| { let settings = &ProjectSettings::get_global(cx).node; let options = NodeBinaryOptions {