lsp: Watch paths outside of worktrees at language servers request (#17173)

Context: https://x.com/fasterthanlime/status/1819120238228570598

Up to this PR:
- We were not watching paths outside of a worktree when language server
requested it.
- We expected GlobPattern used for file watching to be always rooted at
the worktree root.

'1 mattered for observing global files (e.g. global RA config) and both
points had impact on "monorepos".
Let's picture the following scenario:
You're working on a Rust project that has two crates: bin and lib crate:
```
my-rust-project/
  bin-crate/
  lib-crate/
```
Up to this PR, making changes like changing field visibility in
lib-crate **was not reflected** in bin-crate until RA was restarted. RA
for bin-crate asked us to watch lib-crate. Now, depending on if you had
this project open as:
- a project with one worktree rooted at my-rust-project:
- due to '2, we never noticed that we have to notify RA instance for
bin-crate about changes in lib-crate.
- a project with two worktrees (bin-crate and lib-crate):
- due to '1 (as lib-crate is not within bin-crate's worktree), we once
again missed the fact that we have to watch for changes in lib-crate.

This PR solves this by introducing a side-channel - we just store fs
watchers for abs paths at the Project level. Worktree changes handling
is left relatively untouched - as it's used for other changes besides
LSP change notifying, I've figured to better leave it as is, as right
now we have 1 worktree change watcher; if we were to change it, we'd
have `(language server) + 1` watchers per worktree, which seems.. pretty
horrid.

What's the end effect? At the very least fasterthanlime should be a tad
happier; in reality though, I expect it to have some impact on LS
reliability in monorepo setups.

TODO
- [x] Wire through FileChangeType into `fs::watch` interface.

Release Notes:

- Improved language server reliability in multi-worktree projects and
monorepo. We now notify the language server more reliably about which
files have changed.
This commit is contained in:
Piotr Osiewicz 2024-08-31 01:32:33 +02:00 committed by GitHub
parent d2cb45e9bb
commit a850731b0e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 438 additions and 102 deletions

View file

@ -45,6 +45,25 @@ pub trait Watcher: Send + Sync {
fn remove(&self, path: &Path) -> Result<()>;
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum PathEventKind {
Removed,
Created,
Changed,
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct PathEvent {
pub path: PathBuf,
pub kind: Option<PathEventKind>,
}
impl From<PathEvent> for PathBuf {
fn from(event: PathEvent) -> Self {
event.path
}
}
#[async_trait::async_trait]
pub trait Fs: Send + Sync {
async fn create_dir(&self, path: &Path) -> Result<()>;
@ -92,7 +111,7 @@ pub trait Fs: Send + Sync {
path: &Path,
latency: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
Arc<dyn Watcher>,
);
@ -469,17 +488,38 @@ impl Fs for RealFs {
path: &Path,
latency: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
Arc<dyn Watcher>,
) {
use fsevent::EventStream;
use fsevent::{EventStream, StreamFlags};
let (tx, rx) = smol::channel::unbounded();
let (stream, handle) = EventStream::new(&[path], latency);
std::thread::spawn(move || {
stream.run(move |events| {
smol::block_on(tx.send(events.into_iter().map(|event| event.path).collect()))
.is_ok()
smol::block_on(
tx.send(
events
.into_iter()
.map(|event| {
let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) {
Some(PathEventKind::Removed)
} else if event.flags.contains(StreamFlags::ITEM_CREATED) {
Some(PathEventKind::Created)
} else if event.flags.contains(StreamFlags::ITEM_MODIFIED) {
Some(PathEventKind::Changed)
} else {
None
};
PathEvent {
path: event.path,
kind,
}
})
.collect(),
),
)
.is_ok()
});
});
@ -498,32 +538,46 @@ impl Fs for RealFs {
path: &Path,
latency: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
Arc<dyn Watcher>,
) {
use notify::EventKind;
use parking_lot::Mutex;
let (tx, rx) = smol::channel::unbounded();
let pending_paths: Arc<Mutex<Vec<PathBuf>>> = Default::default();
let pending_paths: Arc<Mutex<Vec<PathEvent>>> = Default::default();
let root_path = path.to_path_buf();
watcher::global(|g| {
let tx = tx.clone();
let pending_paths = pending_paths.clone();
g.add(move |event: &notify::Event| {
let kind = match event.kind {
EventKind::Create(_) => Some(PathEventKind::Created),
EventKind::Modify(_) => Some(PathEventKind::Changed),
EventKind::Remove(_) => Some(PathEventKind::Removed),
_ => None,
};
let mut paths = event
.paths
.iter()
.filter(|path| path.starts_with(&root_path))
.cloned()
.filter_map(|path| {
path.starts_with(&root_path).then(|| PathEvent {
path: path.clone(),
kind,
})
})
.collect::<Vec<_>>();
if !paths.is_empty() {
paths.sort();
let mut pending_paths = pending_paths.lock();
if pending_paths.is_empty() {
tx.try_send(()).ok();
}
util::extend_sorted(&mut *pending_paths, paths, usize::MAX, PathBuf::cmp);
util::extend_sorted(&mut *pending_paths, paths, usize::MAX, |a, b| {
a.path.cmp(&b.path)
});
}
})
})
@ -561,10 +615,10 @@ impl Fs for RealFs {
path: &Path,
_latency: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
Arc<dyn Watcher>,
) {
use notify::Watcher;
use notify::{EventKind, Watcher};
let (tx, rx) = smol::channel::unbounded();
@ -572,7 +626,21 @@ impl Fs for RealFs {
let tx = tx.clone();
move |event: Result<notify::Event, _>| {
if let Some(event) = event.log_err() {
tx.try_send(event.paths).ok();
let kind = match event.kind {
EventKind::Create(_) => Some(PathEventKind::Created),
EventKind::Modify(_) => Some(PathEventKind::Changed),
EventKind::Remove(_) => Some(PathEventKind::Removed),
_ => None,
};
tx.try_send(
event
.paths
.into_iter()
.map(|path| PathEvent { path, kind })
.collect::<Vec<_>>(),
)
.ok();
}
}
})
@ -682,9 +750,9 @@ struct FakeFsState {
root: Arc<Mutex<FakeFsEntry>>,
next_inode: u64,
next_mtime: SystemTime,
event_txs: Vec<smol::channel::Sender<Vec<PathBuf>>>,
event_txs: Vec<smol::channel::Sender<Vec<PathEvent>>>,
events_paused: bool,
buffered_events: Vec<PathBuf>,
buffered_events: Vec<PathEvent>,
metadata_call_count: usize,
read_dir_call_count: usize,
}
@ -793,11 +861,14 @@ impl FakeFsState {
fn emit_event<I, T>(&mut self, paths: I)
where
I: IntoIterator<Item = T>,
I: IntoIterator<Item = (T, Option<PathEventKind>)>,
T: Into<PathBuf>,
{
self.buffered_events
.extend(paths.into_iter().map(Into::into));
.extend(paths.into_iter().map(|(path, kind)| PathEvent {
path: path.into(),
kind,
}));
if !self.events_paused {
self.flush_events(self.buffered_events.len());
@ -872,7 +943,7 @@ impl FakeFs {
Ok(())
})
.unwrap();
state.emit_event([path.to_path_buf()]);
state.emit_event([(path.to_path_buf(), None)]);
}
pub async fn insert_file(&self, path: impl AsRef<Path>, content: Vec<u8>) {
@ -895,7 +966,7 @@ impl FakeFs {
}
})
.unwrap();
state.emit_event([path]);
state.emit_event([(path, None)]);
}
fn write_file_internal(&self, path: impl AsRef<Path>, content: Vec<u8>) -> Result<()> {
@ -910,18 +981,24 @@ impl FakeFs {
mtime,
content,
}));
state.write_path(path, move |entry| {
match entry {
btree_map::Entry::Vacant(e) => {
e.insert(file);
}
btree_map::Entry::Occupied(mut e) => {
*e.get_mut() = file;
let mut kind = None;
state.write_path(path, {
let kind = &mut kind;
move |entry| {
match entry {
btree_map::Entry::Vacant(e) => {
*kind = Some(PathEventKind::Created);
e.insert(file);
}
btree_map::Entry::Occupied(mut e) => {
*kind = Some(PathEventKind::Changed);
*e.get_mut() = file;
}
}
Ok(())
}
Ok(())
})?;
state.emit_event([path]);
state.emit_event([(path, kind)]);
Ok(())
}
@ -1030,7 +1107,7 @@ impl FakeFs {
f(&mut repo_state);
if emit_git_event {
state.emit_event([dot_git]);
state.emit_event([(dot_git, None)]);
}
} else {
panic!("not a directory");
@ -1081,7 +1158,7 @@ impl FakeFs {
self.state.lock().emit_event(
statuses
.iter()
.map(|(path, _)| dot_git.parent().unwrap().join(path)),
.map(|(path, _)| (dot_git.parent().unwrap().join(path), None)),
);
}
@ -1251,7 +1328,7 @@ impl Fs for FakeFs {
state.next_inode += 1;
state.write_path(&cur_path, |entry| {
entry.or_insert_with(|| {
created_dirs.push(cur_path.clone());
created_dirs.push((cur_path.clone(), Some(PathEventKind::Created)));
Arc::new(Mutex::new(FakeFsEntry::Dir {
inode,
mtime,
@ -1263,7 +1340,7 @@ impl Fs for FakeFs {
})?
}
self.state.lock().emit_event(&created_dirs);
self.state.lock().emit_event(created_dirs);
Ok(())
}
@ -1279,10 +1356,12 @@ impl Fs for FakeFs {
mtime,
content: Vec::new(),
}));
let mut kind = Some(PathEventKind::Created);
state.write_path(path, |entry| {
match entry {
btree_map::Entry::Occupied(mut e) => {
if options.overwrite {
kind = Some(PathEventKind::Changed);
*e.get_mut() = file;
} else if !options.ignore_if_exists {
return Err(anyhow!("path already exists: {}", path.display()));
@ -1294,7 +1373,7 @@ impl Fs for FakeFs {
}
Ok(())
})?;
state.emit_event([path]);
state.emit_event([(path, kind)]);
Ok(())
}
@ -1313,7 +1392,7 @@ impl Fs for FakeFs {
}
})
.unwrap();
state.emit_event(&[path]);
state.emit_event([(path, None)]);
Ok(())
}
@ -1388,7 +1467,10 @@ impl Fs for FakeFs {
})
.unwrap();
state.emit_event(&[old_path, new_path]);
state.emit_event([
(old_path, Some(PathEventKind::Removed)),
(new_path, Some(PathEventKind::Created)),
]);
Ok(())
}
@ -1403,9 +1485,11 @@ impl Fs for FakeFs {
state.next_mtime += Duration::from_nanos(1);
let source_entry = state.read_path(&source)?;
let content = source_entry.lock().file_content(&source)?.clone();
let mut kind = Some(PathEventKind::Created);
let entry = state.write_path(&target, |e| match e {
btree_map::Entry::Occupied(e) => {
if options.overwrite {
kind = Some(PathEventKind::Changed);
Ok(Some(e.get().clone()))
} else if !options.ignore_if_exists {
return Err(anyhow!("{target:?} already exists"));
@ -1425,7 +1509,7 @@ impl Fs for FakeFs {
if let Some(entry) = entry {
entry.lock().set_file_content(&target, content)?;
}
state.emit_event(&[target]);
state.emit_event([(target, kind)]);
Ok(())
}
@ -1462,7 +1546,7 @@ impl Fs for FakeFs {
e.remove();
}
}
state.emit_event(&[path]);
state.emit_event([(path, Some(PathEventKind::Removed))]);
Ok(())
}
@ -1491,7 +1575,7 @@ impl Fs for FakeFs {
e.remove();
}
}
state.emit_event(&[path]);
state.emit_event([(path, Some(PathEventKind::Removed))]);
Ok(())
}
@ -1632,7 +1716,7 @@ impl Fs for FakeFs {
path: &Path,
_: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
Arc<dyn Watcher>,
) {
self.simulate_random_delay().await;
@ -1642,7 +1726,9 @@ impl Fs for FakeFs {
let executor = self.executor.clone();
(
Box::pin(futures::StreamExt::filter(rx, move |events| {
let result = events.iter().any(|evt_path| evt_path.starts_with(&path));
let result = events
.iter()
.any(|evt_path| evt_path.path.starts_with(&path));
let executor = executor.clone();
async move {
executor.simulate_random_delay().await;