Fix redundant FS file watches due to LSP path watching (#27957)

Release Notes:

- Fixed a bug where Zed sometimes added multiple redundant FS watchers
when language servers requested to watch paths. This could cause saves
and git operations to fail if Zed exceeded the file descriptor limit.

---------

Co-authored-by: Piotr <piotr@zed.dev>
This commit is contained in:
Max Brunsfeld 2025-04-02 13:36:28 -07:00 committed by GitHub
parent 9f9746872e
commit b9f10c0adb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 279 additions and 170 deletions

View file

@ -861,7 +861,7 @@ struct FakeFsState {
next_inode: u64, next_inode: u64,
next_mtime: SystemTime, next_mtime: SystemTime,
git_event_tx: smol::channel::Sender<PathBuf>, git_event_tx: smol::channel::Sender<PathBuf>,
event_txs: Vec<smol::channel::Sender<Vec<PathEvent>>>, event_txs: Vec<(PathBuf, smol::channel::Sender<Vec<PathEvent>>)>,
events_paused: bool, events_paused: bool,
buffered_events: Vec<PathEvent>, buffered_events: Vec<PathEvent>,
metadata_call_count: usize, metadata_call_count: usize,
@ -1013,7 +1013,7 @@ impl FakeFsState {
fn flush_events(&mut self, mut count: usize) { fn flush_events(&mut self, mut count: usize) {
count = count.min(self.buffered_events.len()); count = count.min(self.buffered_events.len());
let events = self.buffered_events.drain(0..count).collect::<Vec<_>>(); let events = self.buffered_events.drain(0..count).collect::<Vec<_>>();
self.event_txs.retain(|tx| { self.event_txs.retain(|(_, tx)| {
let _ = tx.try_send(events.clone()); let _ = tx.try_send(events.clone());
!tx.is_closed() !tx.is_closed()
}); });
@ -1112,7 +1112,7 @@ impl FakeFs {
} }
pub async fn insert_file(&self, path: impl AsRef<Path>, content: Vec<u8>) { pub async fn insert_file(&self, path: impl AsRef<Path>, content: Vec<u8>) {
self.write_file_internal(path, content).unwrap() self.write_file_internal(path, content, true).unwrap()
} }
pub async fn insert_symlink(&self, path: impl AsRef<Path>, target: PathBuf) { pub async fn insert_symlink(&self, path: impl AsRef<Path>, target: PathBuf) {
@ -1134,30 +1134,50 @@ impl FakeFs {
state.emit_event([(path, None)]); state.emit_event([(path, None)]);
} }
fn write_file_internal(&self, path: impl AsRef<Path>, content: Vec<u8>) -> Result<()> { fn write_file_internal(
&self,
path: impl AsRef<Path>,
new_content: Vec<u8>,
recreate_inode: bool,
) -> Result<()> {
let mut state = self.state.lock(); let mut state = self.state.lock();
let file = Arc::new(Mutex::new(FakeFsEntry::File { let new_inode = state.get_and_increment_inode();
inode: state.get_and_increment_inode(), let new_mtime = state.get_and_increment_mtime();
mtime: state.get_and_increment_mtime(), let new_len = new_content.len() as u64;
len: content.len() as u64,
content,
}));
let mut kind = None; let mut kind = None;
state.write_path(path.as_ref(), { state.write_path(path.as_ref(), |entry| {
let kind = &mut kind; match entry {
move |entry| { btree_map::Entry::Vacant(e) => {
match entry { kind = Some(PathEventKind::Created);
btree_map::Entry::Vacant(e) => { e.insert(Arc::new(Mutex::new(FakeFsEntry::File {
*kind = Some(PathEventKind::Created); inode: new_inode,
e.insert(file); mtime: new_mtime,
} len: new_len,
btree_map::Entry::Occupied(mut e) => { content: new_content,
*kind = Some(PathEventKind::Changed); })));
*e.get_mut() = file; }
btree_map::Entry::Occupied(mut e) => {
kind = Some(PathEventKind::Changed);
if let FakeFsEntry::File {
inode,
mtime,
len,
content,
..
} = &mut *e.get_mut().lock()
{
*mtime = new_mtime;
*content = new_content;
*len = new_len;
if recreate_inode {
*inode = new_inode;
}
} else {
anyhow::bail!("not a file")
} }
} }
Ok(())
} }
Ok(())
})?; })?;
state.emit_event([(path.as_ref(), kind)]); state.emit_event([(path.as_ref(), kind)]);
Ok(()) Ok(())
@ -1589,6 +1609,15 @@ impl FakeFs {
self.state.lock().read_dir_call_count self.state.lock().read_dir_call_count
} }
pub fn watched_paths(&self) -> Vec<PathBuf> {
let state = self.state.lock();
state
.event_txs
.iter()
.filter_map(|(path, tx)| Some(path.clone()).filter(|_| !tx.is_closed()))
.collect()
}
/// How many `metadata` calls have been issued. /// How many `metadata` calls have been issued.
pub fn metadata_call_count(&self) -> usize { pub fn metadata_call_count(&self) -> usize {
self.state.lock().metadata_call_count self.state.lock().metadata_call_count
@ -1765,7 +1794,7 @@ impl Fs for FakeFs {
) -> Result<()> { ) -> Result<()> {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
content.read_to_end(&mut bytes).await?; content.read_to_end(&mut bytes).await?;
self.write_file_internal(path, bytes)?; self.write_file_internal(path, bytes, true)?;
Ok(()) Ok(())
} }
@ -1782,7 +1811,7 @@ impl Fs for FakeFs {
let mut bytes = Vec::new(); let mut bytes = Vec::new();
entry.read_to_end(&mut bytes).await?; entry.read_to_end(&mut bytes).await?;
self.create_dir(path.parent().unwrap()).await?; self.create_dir(path.parent().unwrap()).await?;
self.write_file_internal(&path, bytes)?; self.write_file_internal(&path, bytes, true)?;
} }
} }
Ok(()) Ok(())
@ -1976,7 +2005,7 @@ impl Fs for FakeFs {
async fn atomic_write(&self, path: PathBuf, data: String) -> Result<()> { async fn atomic_write(&self, path: PathBuf, data: String) -> Result<()> {
self.simulate_random_delay().await; self.simulate_random_delay().await;
let path = normalize_path(path.as_path()); let path = normalize_path(path.as_path());
self.write_file_internal(path, data.into_bytes())?; self.write_file_internal(path, data.into_bytes(), true)?;
Ok(()) Ok(())
} }
@ -1987,7 +2016,7 @@ impl Fs for FakeFs {
if let Some(path) = path.parent() { if let Some(path) = path.parent() {
self.create_dir(path).await?; self.create_dir(path).await?;
} }
self.write_file_internal(path, content.into_bytes())?; self.write_file_internal(path, content.into_bytes(), false)?;
Ok(()) Ok(())
} }
@ -2107,8 +2136,8 @@ impl Fs for FakeFs {
) { ) {
self.simulate_random_delay().await; self.simulate_random_delay().await;
let (tx, rx) = smol::channel::unbounded(); let (tx, rx) = smol::channel::unbounded();
self.state.lock().event_txs.push(tx);
let path = path.to_path_buf(); let path = path.to_path_buf();
self.state.lock().event_txs.push((path.clone(), tx));
let executor = self.executor.clone(); let executor = self.executor.clone();
( (
Box::pin(futures::StreamExt::filter(rx, move |events| { Box::pin(futures::StreamExt::filter(rx, move |events| {

View file

@ -3026,149 +3026,89 @@ impl LocalLspStore {
language_server_id language_server_id
); );
enum PathToWatch {
Worktree {
literal_prefix: Arc<Path>,
pattern: String,
},
Absolute {
path: Arc<Path>,
pattern: String,
},
}
for watcher in watchers { for watcher in watchers {
let mut found_host = false; if let Some((worktree, literal_prefix, pattern)) =
for worktree in &worktrees { self.worktree_and_path_for_file_watcher(&worktrees, &watcher, cx)
let glob_is_inside_worktree = worktree.update(cx, |tree, _| { {
let worktree_root_path = tree.abs_path(); worktree.update(cx, |worktree, _| {
let path_to_watch = match &watcher.glob_pattern { if let Some((tree, glob)) =
lsp::GlobPattern::String(s) => { worktree.as_local_mut().zip(Glob::new(&pattern).log_err())
let watcher_path = SanitizedPath::from(s); {
match watcher_path.as_path().strip_prefix(&worktree_root_path) { tree.add_path_prefix_to_scan(literal_prefix.into());
Ok(relative) => { worktree_globs
let pattern = relative.to_string_lossy().to_string(); .entry(tree.id())
let literal_prefix = glob_literal_prefix(relative).into(); .or_insert_with(GlobSetBuilder::new)
.add(glob);
PathToWatch::Worktree {
literal_prefix,
pattern,
}
}
Err(_) => {
let path = glob_literal_prefix(watcher_path.as_path());
let pattern = watcher_path
.as_path()
.strip_prefix(&path)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|e| {
debug_panic!(
"Failed to strip prefix for string pattern: {}, with prefix: {}, with error: {}",
s,
path.display(),
e
);
watcher_path.as_path().to_string_lossy().to_string()
});
let path = if path.components().next().is_none() {
worktree_root_path.clone()
} else {
path.into()
};
PathToWatch::Absolute { path, pattern }
}
}
}
lsp::GlobPattern::Relative(rp) => {
let Ok(mut base_uri) = match &rp.base_uri {
lsp::OneOf::Left(workspace_folder) => &workspace_folder.uri,
lsp::OneOf::Right(base_uri) => base_uri,
}
.to_file_path() else {
return false;
};
match base_uri.strip_prefix(&worktree_root_path) {
Ok(relative) => {
let mut literal_prefix = relative.to_owned();
literal_prefix
.push(glob_literal_prefix(Path::new(&rp.pattern)));
PathToWatch::Worktree {
literal_prefix: literal_prefix.into(),
pattern: rp.pattern.clone(),
}
}
Err(_) => {
let path = glob_literal_prefix(Path::new(&rp.pattern));
let pattern = Path::new(&rp.pattern)
.strip_prefix(&path)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|e| {
debug_panic!(
"Failed to strip prefix for relative pattern: {}, with prefix: {}, with error: {}",
rp.pattern,
path.display(),
e
);
rp.pattern.clone()
});
base_uri.push(path);
let path = if base_uri.components().next().is_none() {
debug_panic!("base_uri is empty, {}", base_uri.display());
worktree_root_path.clone()
} else {
base_uri.into()
};
PathToWatch::Absolute { path, pattern }
}
}
}
};
match path_to_watch {
PathToWatch::Worktree {
literal_prefix,
pattern,
} => {
if let Some((tree, glob)) =
tree.as_local_mut().zip(Glob::new(&pattern).log_err())
{
tree.add_path_prefix_to_scan(literal_prefix);
worktree_globs
.entry(tree.id())
.or_insert_with(GlobSetBuilder::new)
.add(glob);
} else {
return false;
}
}
PathToWatch::Absolute { path, pattern } => {
if let Some(glob) = Glob::new(&pattern).log_err() {
abs_globs
.entry(path)
.or_insert_with(GlobSetBuilder::new)
.add(glob);
}
}
} }
true
}); });
if glob_is_inside_worktree { } else {
log::trace!( let (path, pattern) = match &watcher.glob_pattern {
"Watcher pattern `{}` has been attached to the worktree at `{}`", lsp::GlobPattern::String(s) => {
serde_json::to_string(&watcher.glob_pattern).unwrap(), let watcher_path = SanitizedPath::from(s);
worktree.read(cx).abs_path().display() let path = glob_literal_prefix(watcher_path.as_path());
); let pattern = watcher_path
found_host = true; .as_path()
.strip_prefix(&path)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|e| {
debug_panic!(
"Failed to strip prefix for string pattern: {}, with prefix: {}, with error: {}",
s,
path.display(),
e
);
watcher_path.as_path().to_string_lossy().to_string()
});
(path, pattern)
}
lsp::GlobPattern::Relative(rp) => {
let Ok(mut base_uri) = match &rp.base_uri {
lsp::OneOf::Left(workspace_folder) => &workspace_folder.uri,
lsp::OneOf::Right(base_uri) => base_uri,
}
.to_file_path() else {
continue;
};
let path = glob_literal_prefix(Path::new(&rp.pattern));
let pattern = Path::new(&rp.pattern)
.strip_prefix(&path)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|e| {
debug_panic!(
"Failed to strip prefix for relative pattern: {}, with prefix: {}, with error: {}",
rp.pattern,
path.display(),
e
);
rp.pattern.clone()
});
base_uri.push(path);
(base_uri, pattern)
}
};
if let Some(glob) = Glob::new(&pattern).log_err() {
if !path
.components()
.any(|c| matches!(c, path::Component::Normal(_)))
{
// For an unrooted glob like `**/Cargo.toml`, watch it within each worktree,
// rather than adding a new watcher for `/`.
for worktree in &worktrees {
worktree_globs
.entry(worktree.read(cx).id())
.or_insert_with(GlobSetBuilder::new)
.add(glob.clone());
}
} else {
abs_globs
.entry(path.into())
.or_insert_with(GlobSetBuilder::new)
.add(glob);
}
} }
} }
if !found_host {
log::error!(
"Watcher pattern `{}` has not been attached to any worktree or absolute path",
serde_json::to_string(&watcher.glob_pattern).unwrap()
)
}
} }
let mut watch_builder = LanguageServerWatchedPathsBuilder::default(); let mut watch_builder = LanguageServerWatchedPathsBuilder::default();
@ -3185,6 +3125,45 @@ impl LocalLspStore {
watch_builder watch_builder
} }
fn worktree_and_path_for_file_watcher(
&self,
worktrees: &[Entity<Worktree>],
watcher: &FileSystemWatcher,
cx: &App,
) -> Option<(Entity<Worktree>, PathBuf, String)> {
worktrees.iter().find_map(|worktree| {
let tree = worktree.read(cx);
let worktree_root_path = tree.abs_path();
match &watcher.glob_pattern {
lsp::GlobPattern::String(s) => {
let watcher_path = SanitizedPath::from(s);
let relative = watcher_path
.as_path()
.strip_prefix(&worktree_root_path)
.ok()?;
let literal_prefix = glob_literal_prefix(relative);
Some((
worktree.clone(),
literal_prefix,
relative.to_string_lossy().to_string(),
))
}
lsp::GlobPattern::Relative(rp) => {
let base_uri = match &rp.base_uri {
lsp::OneOf::Left(workspace_folder) => &workspace_folder.uri,
lsp::OneOf::Right(base_uri) => base_uri,
}
.to_file_path()
.ok()?;
let relative = base_uri.strip_prefix(&worktree_root_path).ok()?;
let mut literal_prefix = relative.to_owned();
literal_prefix.push(glob_literal_prefix(Path::new(&rp.pattern)));
Some((worktree.clone(), literal_prefix, rp.pattern.clone()))
}
}
})
}
fn rebuild_watched_paths( fn rebuild_watched_paths(
&mut self, &mut self,
language_server_id: LanguageServerId, language_server_id: LanguageServerId,

View file

@ -27,7 +27,7 @@ use lsp::{
WillRenameFiles, notification::DidRenameFiles, WillRenameFiles, notification::DidRenameFiles,
}; };
use parking_lot::Mutex; use parking_lot::Mutex;
use paths::tasks_file; use paths::{config_dir, tasks_file};
use postage::stream::Stream as _; use postage::stream::Stream as _;
use pretty_assertions::{assert_eq, assert_matches}; use pretty_assertions::{assert_eq, assert_matches};
use serde_json::json; use serde_json::json;
@ -919,6 +919,7 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
path!("/the-root"), path!("/the-root"),
json!({ json!({
".gitignore": "target\n", ".gitignore": "target\n",
"Cargo.lock": "",
"src": { "src": {
"a.rs": "", "a.rs": "",
"b.rs": "", "b.rs": "",
@ -943,9 +944,37 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
}), }),
) )
.await; .await;
fs.insert_tree(
path!("/the-registry"),
json!({
"dep1": {
"src": {
"dep1.rs": "",
}
},
"dep2": {
"src": {
"dep2.rs": "",
}
},
}),
)
.await;
fs.insert_tree(
path!("/the/stdlib"),
json!({
"LICENSE": "",
"src": {
"string.rs": "",
}
}),
)
.await;
let project = Project::test(fs.clone(), [path!("/the-root").as_ref()], cx).await; let project = Project::test(fs.clone(), [path!("/the-root").as_ref()], cx).await;
let language_registry = project.read_with(cx, |project, _| project.languages().clone()); let (language_registry, lsp_store) = project.read_with(cx, |project, _| {
(project.languages().clone(), project.lsp_store())
});
language_registry.add(rust_lang()); language_registry.add(rust_lang());
let mut fake_servers = language_registry.register_fake_lsp( let mut fake_servers = language_registry.register_fake_lsp(
"Rust", "Rust",
@ -978,6 +1007,7 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
&[ &[
(Path::new(""), false), (Path::new(""), false),
(Path::new(".gitignore"), false), (Path::new(".gitignore"), false),
(Path::new("Cargo.lock"), false),
(Path::new("src"), false), (Path::new("src"), false),
(Path::new("src/a.rs"), false), (Path::new("src/a.rs"), false),
(Path::new("src/b.rs"), false), (Path::new("src/b.rs"), false),
@ -988,8 +1018,26 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
let prev_read_dir_count = fs.read_dir_call_count(); let prev_read_dir_count = fs.read_dir_call_count();
// Keep track of the FS events reported to the language server.
let fake_server = fake_servers.next().await.unwrap(); let fake_server = fake_servers.next().await.unwrap();
let (server_id, server_name) = lsp_store.read_with(cx, |lsp_store, _| {
let (id, status) = lsp_store.language_server_statuses().next().unwrap();
(id, LanguageServerName::from(status.name.as_str()))
});
// Simulate jumping to a definition in a dependency outside of the worktree.
let _out_of_worktree_buffer = project
.update(cx, |project, cx| {
project.open_local_buffer_via_lsp(
lsp::Url::from_file_path(path!("/the-registry/dep1/src/dep1.rs")).unwrap(),
server_id,
server_name.clone(),
cx,
)
})
.await
.unwrap();
// Keep track of the FS events reported to the language server.
let file_changes = Arc::new(Mutex::new(Vec::new())); let file_changes = Arc::new(Mutex::new(Vec::new()));
fake_server fake_server
.request::<lsp::request::RegisterCapability>(lsp::RegistrationParams { .request::<lsp::request::RegisterCapability>(lsp::RegistrationParams {
@ -1017,6 +1065,18 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
), ),
kind: None, kind: None,
}, },
lsp::FileSystemWatcher {
glob_pattern: lsp::GlobPattern::String(
path!("/the/stdlib/src/**/*.rs").to_string(),
),
kind: None,
},
lsp::FileSystemWatcher {
glob_pattern: lsp::GlobPattern::String(
path!("**/Cargo.lock").to_string(),
),
kind: None,
},
], ],
}, },
) )
@ -1036,12 +1096,23 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
cx.executor().run_until_parked(); cx.executor().run_until_parked();
assert_eq!(mem::take(&mut *file_changes.lock()), &[]); assert_eq!(mem::take(&mut *file_changes.lock()), &[]);
assert_eq!(fs.read_dir_call_count() - prev_read_dir_count, 4); assert_eq!(fs.read_dir_call_count() - prev_read_dir_count, 5);
let mut new_watched_paths = fs.watched_paths();
new_watched_paths.retain(|path| !path.starts_with(config_dir()));
assert_eq!(
&new_watched_paths,
&[
Path::new(path!("/the-root")),
Path::new(path!("/the-registry/dep1/src/dep1.rs")),
Path::new(path!("/the/stdlib/src"))
]
);
// Now the language server has asked us to watch an ignored directory path, // Now the language server has asked us to watch an ignored directory path,
// so we recursively load it. // so we recursively load it.
project.update(cx, |project, cx| { project.update(cx, |project, cx| {
let worktree = project.worktrees(cx).next().unwrap(); let worktree = project.visible_worktrees(cx).next().unwrap();
assert_eq!( assert_eq!(
worktree worktree
.read(cx) .read(cx)
@ -1052,6 +1123,7 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
&[ &[
(Path::new(""), false), (Path::new(""), false),
(Path::new(".gitignore"), false), (Path::new(".gitignore"), false),
(Path::new("Cargo.lock"), false),
(Path::new("src"), false), (Path::new("src"), false),
(Path::new("src/a.rs"), false), (Path::new("src/a.rs"), false),
(Path::new("src/b.rs"), false), (Path::new("src/b.rs"), false),
@ -1088,12 +1160,37 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
) )
.await .await
.unwrap(); .unwrap();
fs.save(
path!("/the-root/Cargo.lock").as_ref(),
&"".into(),
Default::default(),
)
.await
.unwrap();
fs.save(
path!("/the-stdlib/LICENSE").as_ref(),
&"".into(),
Default::default(),
)
.await
.unwrap();
fs.save(
path!("/the/stdlib/src/string.rs").as_ref(),
&"".into(),
Default::default(),
)
.await
.unwrap();
// The language server receives events for the FS mutations that match its watch patterns. // The language server receives events for the FS mutations that match its watch patterns.
cx.executor().run_until_parked(); cx.executor().run_until_parked();
assert_eq!( assert_eq!(
&*file_changes.lock(), &*file_changes.lock(),
&[ &[
lsp::FileEvent {
uri: lsp::Url::from_file_path(path!("/the-root/Cargo.lock")).unwrap(),
typ: lsp::FileChangeType::CHANGED,
},
lsp::FileEvent { lsp::FileEvent {
uri: lsp::Url::from_file_path(path!("/the-root/src/b.rs")).unwrap(), uri: lsp::Url::from_file_path(path!("/the-root/src/b.rs")).unwrap(),
typ: lsp::FileChangeType::DELETED, typ: lsp::FileChangeType::DELETED,
@ -1106,6 +1203,10 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
uri: lsp::Url::from_file_path(path!("/the-root/target/y/out/y2.rs")).unwrap(), uri: lsp::Url::from_file_path(path!("/the-root/target/y/out/y2.rs")).unwrap(),
typ: lsp::FileChangeType::CREATED, typ: lsp::FileChangeType::CREATED,
}, },
lsp::FileEvent {
uri: lsp::Url::from_file_path(path!("/the/stdlib/src/string.rs")).unwrap(),
typ: lsp::FileChangeType::CHANGED,
},
] ]
); );
} }