Replace async-watch with a custom watch (#32245)

The `async-watch` crate doesn't seem to be maintained and we noticed
several panics coming from it, such as:

```
[bug] failed to observe change after notificaton.
zed::reliability::init_panic_hook::{{closure}}::hea8cdcb6299fad6b+154543526
std::panicking::rust_panic_with_hook::h33b18b24045abff4+127578547
std::panicking::begin_panic_handler::{{closure}}::hf8313cc2fd0126bc+127577770
std::sys::backtrace::__rust_end_short_backtrace::h57fe07c8aea5c98a+127571385
__rustc[95feac21a9532783]::rust_begin_unwind+127576909
core::panicking::panic_fmt::hd54fb667be51beea+9433328
core::option::expect_failed::h8456634a3dada3e4+9433291
assistant_tools::edit_agent::EditAgent::apply_edit_chunks::{{closure}}::habe2e1a32b267fd4+26921553
gpui::app::async_context::AsyncApp::spawn::{{closure}}::h12f5f25757f572ea+25923441
async_task::raw::RawTask<F,T,S,M>::run::h3cca0d402690ccba+25186815
<gpui::platform::linux::x11::client::X11Client as gpui::platform::linux::platform::LinuxClient>::run::h26264aefbcfbc14b+73961666
gpui::platform::linux::platform::<impl gpui::platform::Platform for P>::run::hb12dcd4abad715b5+73562509
gpui::app::Application::run::h0f936a5f855a3f9f+150676820
zed::main::ha17f9a25fe257d35+154788471
std::sys::backtrace::__rust_begin_short_backtrace::h1edd02429370b2bd+154624579
std::rt::lang_start::{{closure}}::h3d2e300f10059b0a+154264777
std::rt::lang_start_internal::h418648f91f5be3a1+127502049
main+154806636
__libc_start_main+46051972301573
_start+12358494
```

I didn't find an executor-agnostic watch crate that was well maintained
(we already tried postage and async-watch), so decided to implement it
our own version.

Release Notes:

- Fixed a panic that could sometimes occur when the agent performed
edits.
This commit is contained in:
Antonio Scandurra 2025-06-06 18:00:09 +02:00 committed by GitHub
parent 95d78ff8d5
commit 019a14bcde
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 375 additions and 42 deletions

24
crates/watch/Cargo.toml Normal file
View file

@ -0,0 +1,24 @@
[package]
name = "watch"
version = "0.1.0"
edition.workspace = true
publish.workspace = true
license = "Apache-2.0"
[lints]
workspace = true
[lib]
path = "src/watch.rs"
doctest = true
[dependencies]
parking_lot.workspace = true
workspace-hack.workspace = true
[dev-dependencies]
ctor.workspace = true
futures.workspace = true
gpui = { workspace = true, features = ["test-support"] }
rand.workspace = true
zlog.workspace = true

1
crates/watch/LICENSE-APACHE Symbolic link
View file

@ -0,0 +1 @@
../../LICENSE-APACHE

25
crates/watch/src/error.rs Normal file
View file

@ -0,0 +1,25 @@
//! Watch error types.
use std::fmt;
#[derive(Debug, Eq, PartialEq)]
pub struct NoReceiverError;
impl fmt::Display for NoReceiverError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "all receivers were dropped")
}
}
impl std::error::Error for NoReceiverError {}
#[derive(Debug, Eq, PartialEq)]
pub struct NoSenderError;
impl fmt::Display for NoSenderError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "sender was dropped")
}
}
impl std::error::Error for NoSenderError {}

279
crates/watch/src/watch.rs Normal file
View file

@ -0,0 +1,279 @@
mod error;
pub use error::*;
use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard};
use std::{
collections::BTreeMap,
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
};
pub fn channel<T>(value: T) -> (Sender<T>, Receiver<T>) {
let state = Arc::new(RwLock::new(State {
value,
wakers: BTreeMap::new(),
next_waker_id: WakerId::default(),
version: 0,
closed: false,
}));
(
Sender {
state: state.clone(),
},
Receiver { state, version: 0 },
)
}
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct WakerId(usize);
impl WakerId {
fn post_inc(&mut self) -> Self {
let id = *self;
self.0 = id.0.wrapping_add(1);
*self
}
}
struct State<T> {
value: T,
wakers: BTreeMap<WakerId, Waker>,
next_waker_id: WakerId,
version: usize,
closed: bool,
}
pub struct Sender<T> {
state: Arc<RwLock<State<T>>>,
}
impl<T> Sender<T> {
pub fn receiver(&self) -> Receiver<T> {
let version = self.state.read().version;
Receiver {
state: self.state.clone(),
version,
}
}
pub fn send(&mut self, value: T) -> Result<(), NoReceiverError> {
if let Some(state) = Arc::get_mut(&mut self.state) {
let state = state.get_mut();
state.value = value;
debug_assert_eq!(state.wakers.len(), 0);
Err(NoReceiverError)
} else {
let mut state = self.state.write();
state.value = value;
state.version = state.version.wrapping_add(1);
let wakers = mem::take(&mut state.wakers);
drop(state);
for (_, waker) in wakers {
waker.wake();
}
Ok(())
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut state = self.state.write();
state.closed = true;
for (_, waker) in mem::take(&mut state.wakers) {
waker.wake();
}
}
}
#[derive(Clone)]
pub struct Receiver<T> {
state: Arc<RwLock<State<T>>>,
version: usize,
}
struct Changed<'a, T> {
receiver: &'a mut Receiver<T>,
pending_waker_id: Option<WakerId>,
}
impl<T> Future for Changed<'_, T> {
type Output = Result<(), NoSenderError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self;
let state = this.receiver.state.upgradable_read();
if state.version != this.receiver.version {
// The sender produced a new value. Avoid unregistering the pending
// waker, because the sender has already done so.
this.pending_waker_id = None;
this.receiver.version = state.version;
Poll::Ready(Ok(()))
} else if state.closed {
Poll::Ready(Err(NoSenderError))
} else {
let mut state = RwLockUpgradableReadGuard::upgrade(state);
// Unregister the pending waker. This should happen automatically
// when the waker gets awoken by the sender, but if this future was
// polled again without an explicit call to `wake` (e.g., a spurious
// wake by the executor), we need to remove it manually.
if let Some(pending_waker_id) = this.pending_waker_id.take() {
state.wakers.remove(&pending_waker_id);
}
// Register the waker for this future.
let waker_id = state.next_waker_id.post_inc();
state.wakers.insert(waker_id, cx.waker().clone());
this.pending_waker_id = Some(waker_id);
Poll::Pending
}
}
}
impl<T> Drop for Changed<'_, T> {
fn drop(&mut self) {
// If this future gets dropped before the waker has a chance of being
// awoken, we need to clear it to avoid a memory leak.
if let Some(waker_id) = self.pending_waker_id {
let mut state = self.receiver.state.write();
state.wakers.remove(&waker_id);
}
}
}
impl<T> Receiver<T> {
pub fn borrow(&mut self) -> parking_lot::MappedRwLockReadGuard<T> {
let state = self.state.read();
self.version = state.version;
RwLockReadGuard::map(state, |state| &state.value)
}
pub fn changed(&mut self) -> impl Future<Output = Result<(), NoSenderError>> {
Changed {
receiver: self,
pending_waker_id: None,
}
}
}
impl<T: Clone> Receiver<T> {
pub async fn recv(&mut self) -> Result<T, NoSenderError> {
self.changed().await?;
Ok(self.borrow().clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{FutureExt, select_biased};
use gpui::{AppContext, TestAppContext};
use std::{
pin::pin,
sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
};
#[gpui::test]
async fn test_basic_watch() {
let (mut sender, mut receiver) = channel(0);
assert_eq!(sender.send(1), Ok(()));
assert_eq!(receiver.recv().await, Ok(1));
assert_eq!(sender.send(2), Ok(()));
assert_eq!(sender.send(3), Ok(()));
assert_eq!(receiver.recv().await, Ok(3));
drop(receiver);
assert_eq!(sender.send(4), Err(NoReceiverError));
let mut receiver = sender.receiver();
assert_eq!(sender.send(5), Ok(()));
assert_eq!(receiver.recv().await, Ok(5));
// Ensure `changed` doesn't resolve if we just read the latest value
// using `borrow`.
assert_eq!(sender.send(6), Ok(()));
assert_eq!(*receiver.borrow(), 6);
assert_eq!(receiver.changed().now_or_never(), None);
assert_eq!(sender.send(7), Ok(()));
drop(sender);
assert_eq!(receiver.recv().await, Ok(7));
assert_eq!(receiver.recv().await, Err(NoSenderError));
}
#[gpui::test(iterations = 1000)]
async fn test_watch_random(cx: &mut TestAppContext) {
let next_id = Arc::new(AtomicUsize::new(1));
let closed = Arc::new(AtomicBool::new(false));
let (mut tx, rx) = channel(0);
let mut tasks = Vec::new();
tasks.push(cx.background_spawn({
let executor = cx.executor().clone();
let next_id = next_id.clone();
let closed = closed.clone();
async move {
for _ in 0..16 {
executor.simulate_random_delay().await;
let id = next_id.fetch_add(1, SeqCst);
zlog::info!("sending {}", id);
tx.send(id).ok();
}
closed.store(true, SeqCst);
}
}));
for receiver_id in 0..16 {
let executor = cx.executor().clone();
let next_id = next_id.clone();
let closed = closed.clone();
let mut rx = rx.clone();
let mut prev_observed_value = *rx.borrow();
tasks.push(cx.background_spawn(async move {
for _ in 0..16 {
executor.simulate_random_delay().await;
zlog::info!("{}: receiving", receiver_id);
let mut timeout = executor.simulate_random_delay().fuse();
let mut recv = pin!(rx.recv().fuse());
select_biased! {
_ = timeout => {
zlog::info!("{}: dropping recv future", receiver_id);
}
result = recv => {
match result {
Ok(value) => {
zlog::info!("{}: received {}", receiver_id, value);
assert_eq!(value, next_id.load(SeqCst) - 1);
assert_ne!(value, prev_observed_value);
prev_observed_value = value;
}
Err(NoSenderError) => {
zlog::info!("{}: closed", receiver_id);
assert!(closed.load(SeqCst));
break;
}
}
}
}
}
}));
}
futures::future::join_all(tasks).await;
}
#[ctor::ctor]
fn init_logger() {
zlog::init_test();
}
}