lsp: Watch paths outside of worktrees at language servers request (#17499)
Another stab at https://github.com/zed-industries/zed/pull/17173, this time fixing the segfault found in https://github.com/zed-industries/zed/pull/17206 Release Notes: - Improved language server reliability in multi-worktree projects and monorepo. We now notify the language server more reliably about which files have changed.
This commit is contained in:
parent
938c90fd3b
commit
903f92045a
7 changed files with 446 additions and 102 deletions
|
@ -45,6 +45,25 @@ pub trait Watcher: Send + Sync {
|
|||
fn remove(&self, path: &Path) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum PathEventKind {
|
||||
Removed,
|
||||
Created,
|
||||
Changed,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct PathEvent {
|
||||
pub path: PathBuf,
|
||||
pub kind: Option<PathEventKind>,
|
||||
}
|
||||
|
||||
impl From<PathEvent> for PathBuf {
|
||||
fn from(event: PathEvent) -> Self {
|
||||
event.path
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Fs: Send + Sync {
|
||||
async fn create_dir(&self, path: &Path) -> Result<()>;
|
||||
|
@ -92,7 +111,7 @@ pub trait Fs: Send + Sync {
|
|||
path: &Path,
|
||||
latency: Duration,
|
||||
) -> (
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
|
||||
Arc<dyn Watcher>,
|
||||
);
|
||||
|
||||
|
@ -469,17 +488,38 @@ impl Fs for RealFs {
|
|||
path: &Path,
|
||||
latency: Duration,
|
||||
) -> (
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
|
||||
Arc<dyn Watcher>,
|
||||
) {
|
||||
use fsevent::EventStream;
|
||||
use fsevent::{EventStream, StreamFlags};
|
||||
|
||||
let (tx, rx) = smol::channel::unbounded();
|
||||
let (stream, handle) = EventStream::new(&[path], latency);
|
||||
std::thread::spawn(move || {
|
||||
stream.run(move |events| {
|
||||
smol::block_on(tx.send(events.into_iter().map(|event| event.path).collect()))
|
||||
.is_ok()
|
||||
smol::block_on(
|
||||
tx.send(
|
||||
events
|
||||
.into_iter()
|
||||
.map(|event| {
|
||||
let kind = if event.flags.contains(StreamFlags::ITEM_REMOVED) {
|
||||
Some(PathEventKind::Removed)
|
||||
} else if event.flags.contains(StreamFlags::ITEM_CREATED) {
|
||||
Some(PathEventKind::Created)
|
||||
} else if event.flags.contains(StreamFlags::ITEM_MODIFIED) {
|
||||
Some(PathEventKind::Changed)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
PathEvent {
|
||||
path: event.path,
|
||||
kind,
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
)
|
||||
.is_ok()
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -498,32 +538,46 @@ impl Fs for RealFs {
|
|||
path: &Path,
|
||||
latency: Duration,
|
||||
) -> (
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
|
||||
Arc<dyn Watcher>,
|
||||
) {
|
||||
use notify::EventKind;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
let (tx, rx) = smol::channel::unbounded();
|
||||
let pending_paths: Arc<Mutex<Vec<PathBuf>>> = Default::default();
|
||||
let pending_paths: Arc<Mutex<Vec<PathEvent>>> = Default::default();
|
||||
let root_path = path.to_path_buf();
|
||||
|
||||
watcher::global(|g| {
|
||||
let tx = tx.clone();
|
||||
let pending_paths = pending_paths.clone();
|
||||
g.add(move |event: ¬ify::Event| {
|
||||
let kind = match event.kind {
|
||||
EventKind::Create(_) => Some(PathEventKind::Created),
|
||||
EventKind::Modify(_) => Some(PathEventKind::Changed),
|
||||
EventKind::Remove(_) => Some(PathEventKind::Removed),
|
||||
_ => None,
|
||||
};
|
||||
let mut paths = event
|
||||
.paths
|
||||
.iter()
|
||||
.filter(|path| path.starts_with(&root_path))
|
||||
.cloned()
|
||||
.filter_map(|path| {
|
||||
path.starts_with(&root_path).then(|| PathEvent {
|
||||
path: path.clone(),
|
||||
kind,
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !paths.is_empty() {
|
||||
paths.sort();
|
||||
let mut pending_paths = pending_paths.lock();
|
||||
if pending_paths.is_empty() {
|
||||
tx.try_send(()).ok();
|
||||
}
|
||||
util::extend_sorted(&mut *pending_paths, paths, usize::MAX, PathBuf::cmp);
|
||||
util::extend_sorted(&mut *pending_paths, paths, usize::MAX, |a, b| {
|
||||
a.path.cmp(&b.path)
|
||||
});
|
||||
}
|
||||
})
|
||||
})
|
||||
|
@ -561,10 +615,10 @@ impl Fs for RealFs {
|
|||
path: &Path,
|
||||
_latency: Duration,
|
||||
) -> (
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
|
||||
Arc<dyn Watcher>,
|
||||
) {
|
||||
use notify::Watcher;
|
||||
use notify::{EventKind, Watcher};
|
||||
|
||||
let (tx, rx) = smol::channel::unbounded();
|
||||
|
||||
|
@ -572,7 +626,21 @@ impl Fs for RealFs {
|
|||
let tx = tx.clone();
|
||||
move |event: Result<notify::Event, _>| {
|
||||
if let Some(event) = event.log_err() {
|
||||
tx.try_send(event.paths).ok();
|
||||
let kind = match event.kind {
|
||||
EventKind::Create(_) => Some(PathEventKind::Created),
|
||||
EventKind::Modify(_) => Some(PathEventKind::Changed),
|
||||
EventKind::Remove(_) => Some(PathEventKind::Removed),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
tx.try_send(
|
||||
event
|
||||
.paths
|
||||
.into_iter()
|
||||
.map(|path| PathEvent { path, kind })
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -682,9 +750,9 @@ struct FakeFsState {
|
|||
root: Arc<Mutex<FakeFsEntry>>,
|
||||
next_inode: u64,
|
||||
next_mtime: SystemTime,
|
||||
event_txs: Vec<smol::channel::Sender<Vec<PathBuf>>>,
|
||||
event_txs: Vec<smol::channel::Sender<Vec<PathEvent>>>,
|
||||
events_paused: bool,
|
||||
buffered_events: Vec<PathBuf>,
|
||||
buffered_events: Vec<PathEvent>,
|
||||
metadata_call_count: usize,
|
||||
read_dir_call_count: usize,
|
||||
}
|
||||
|
@ -793,11 +861,14 @@ impl FakeFsState {
|
|||
|
||||
fn emit_event<I, T>(&mut self, paths: I)
|
||||
where
|
||||
I: IntoIterator<Item = T>,
|
||||
I: IntoIterator<Item = (T, Option<PathEventKind>)>,
|
||||
T: Into<PathBuf>,
|
||||
{
|
||||
self.buffered_events
|
||||
.extend(paths.into_iter().map(Into::into));
|
||||
.extend(paths.into_iter().map(|(path, kind)| PathEvent {
|
||||
path: path.into(),
|
||||
kind,
|
||||
}));
|
||||
|
||||
if !self.events_paused {
|
||||
self.flush_events(self.buffered_events.len());
|
||||
|
@ -872,7 +943,7 @@ impl FakeFs {
|
|||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
state.emit_event([path.to_path_buf()]);
|
||||
state.emit_event([(path.to_path_buf(), None)]);
|
||||
}
|
||||
|
||||
pub async fn insert_file(&self, path: impl AsRef<Path>, content: Vec<u8>) {
|
||||
|
@ -895,7 +966,7 @@ impl FakeFs {
|
|||
}
|
||||
})
|
||||
.unwrap();
|
||||
state.emit_event([path]);
|
||||
state.emit_event([(path, None)]);
|
||||
}
|
||||
|
||||
fn write_file_internal(&self, path: impl AsRef<Path>, content: Vec<u8>) -> Result<()> {
|
||||
|
@ -910,18 +981,24 @@ impl FakeFs {
|
|||
mtime,
|
||||
content,
|
||||
}));
|
||||
state.write_path(path, move |entry| {
|
||||
match entry {
|
||||
btree_map::Entry::Vacant(e) => {
|
||||
e.insert(file);
|
||||
}
|
||||
btree_map::Entry::Occupied(mut e) => {
|
||||
*e.get_mut() = file;
|
||||
let mut kind = None;
|
||||
state.write_path(path, {
|
||||
let kind = &mut kind;
|
||||
move |entry| {
|
||||
match entry {
|
||||
btree_map::Entry::Vacant(e) => {
|
||||
*kind = Some(PathEventKind::Created);
|
||||
e.insert(file);
|
||||
}
|
||||
btree_map::Entry::Occupied(mut e) => {
|
||||
*kind = Some(PathEventKind::Changed);
|
||||
*e.get_mut() = file;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
state.emit_event([path]);
|
||||
state.emit_event([(path, kind)]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1030,7 +1107,7 @@ impl FakeFs {
|
|||
f(&mut repo_state);
|
||||
|
||||
if emit_git_event {
|
||||
state.emit_event([dot_git]);
|
||||
state.emit_event([(dot_git, None)]);
|
||||
}
|
||||
} else {
|
||||
panic!("not a directory");
|
||||
|
@ -1081,7 +1158,7 @@ impl FakeFs {
|
|||
self.state.lock().emit_event(
|
||||
statuses
|
||||
.iter()
|
||||
.map(|(path, _)| dot_git.parent().unwrap().join(path)),
|
||||
.map(|(path, _)| (dot_git.parent().unwrap().join(path), None)),
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -1251,7 +1328,7 @@ impl Fs for FakeFs {
|
|||
state.next_inode += 1;
|
||||
state.write_path(&cur_path, |entry| {
|
||||
entry.or_insert_with(|| {
|
||||
created_dirs.push(cur_path.clone());
|
||||
created_dirs.push((cur_path.clone(), Some(PathEventKind::Created)));
|
||||
Arc::new(Mutex::new(FakeFsEntry::Dir {
|
||||
inode,
|
||||
mtime,
|
||||
|
@ -1263,7 +1340,7 @@ impl Fs for FakeFs {
|
|||
})?
|
||||
}
|
||||
|
||||
self.state.lock().emit_event(&created_dirs);
|
||||
self.state.lock().emit_event(created_dirs);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1279,10 +1356,12 @@ impl Fs for FakeFs {
|
|||
mtime,
|
||||
content: Vec::new(),
|
||||
}));
|
||||
let mut kind = Some(PathEventKind::Created);
|
||||
state.write_path(path, |entry| {
|
||||
match entry {
|
||||
btree_map::Entry::Occupied(mut e) => {
|
||||
if options.overwrite {
|
||||
kind = Some(PathEventKind::Changed);
|
||||
*e.get_mut() = file;
|
||||
} else if !options.ignore_if_exists {
|
||||
return Err(anyhow!("path already exists: {}", path.display()));
|
||||
|
@ -1294,7 +1373,7 @@ impl Fs for FakeFs {
|
|||
}
|
||||
Ok(())
|
||||
})?;
|
||||
state.emit_event([path]);
|
||||
state.emit_event([(path, kind)]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1313,7 +1392,8 @@ impl Fs for FakeFs {
|
|||
}
|
||||
})
|
||||
.unwrap();
|
||||
state.emit_event([path]);
|
||||
state.emit_event([(path, None)]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1388,7 +1468,10 @@ impl Fs for FakeFs {
|
|||
})
|
||||
.unwrap();
|
||||
|
||||
state.emit_event(&[old_path, new_path]);
|
||||
state.emit_event([
|
||||
(old_path, Some(PathEventKind::Removed)),
|
||||
(new_path, Some(PathEventKind::Created)),
|
||||
]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1403,9 +1486,11 @@ impl Fs for FakeFs {
|
|||
state.next_mtime += Duration::from_nanos(1);
|
||||
let source_entry = state.read_path(&source)?;
|
||||
let content = source_entry.lock().file_content(&source)?.clone();
|
||||
let mut kind = Some(PathEventKind::Created);
|
||||
let entry = state.write_path(&target, |e| match e {
|
||||
btree_map::Entry::Occupied(e) => {
|
||||
if options.overwrite {
|
||||
kind = Some(PathEventKind::Changed);
|
||||
Ok(Some(e.get().clone()))
|
||||
} else if !options.ignore_if_exists {
|
||||
return Err(anyhow!("{target:?} already exists"));
|
||||
|
@ -1425,7 +1510,7 @@ impl Fs for FakeFs {
|
|||
if let Some(entry) = entry {
|
||||
entry.lock().set_file_content(&target, content)?;
|
||||
}
|
||||
state.emit_event(&[target]);
|
||||
state.emit_event([(target, kind)]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1462,7 +1547,7 @@ impl Fs for FakeFs {
|
|||
e.remove();
|
||||
}
|
||||
}
|
||||
state.emit_event(&[path]);
|
||||
state.emit_event([(path, Some(PathEventKind::Removed))]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1491,7 +1576,7 @@ impl Fs for FakeFs {
|
|||
e.remove();
|
||||
}
|
||||
}
|
||||
state.emit_event(&[path]);
|
||||
state.emit_event([(path, Some(PathEventKind::Removed))]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1632,7 +1717,7 @@ impl Fs for FakeFs {
|
|||
path: &Path,
|
||||
_: Duration,
|
||||
) -> (
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
|
||||
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>,
|
||||
Arc<dyn Watcher>,
|
||||
) {
|
||||
self.simulate_random_delay().await;
|
||||
|
@ -1642,7 +1727,9 @@ impl Fs for FakeFs {
|
|||
let executor = self.executor.clone();
|
||||
(
|
||||
Box::pin(futures::StreamExt::filter(rx, move |events| {
|
||||
let result = events.iter().any(|evt_path| evt_path.starts_with(&path));
|
||||
let result = events
|
||||
.iter()
|
||||
.any(|evt_path| evt_path.path.starts_with(&path));
|
||||
let executor = executor.clone();
|
||||
async move {
|
||||
executor.simulate_random_delay().await;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue