From debb694d971cfc398f5dd8d87b7c2bb9e67d682b Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 13 Apr 2023 16:51:11 -0700 Subject: [PATCH 1/3] Always bump scan_id when refreshing an entry The scan_id needs to be bumped even if a scan is already in progress, so that worktree updates can detect that entries have changed. This means that the worktree's completed_scan_id may increase by more than one at the end of a scan. --- crates/project/src/worktree.rs | 130 +++++++++++++++++++-------------- 1 file changed, 75 insertions(+), 55 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 3459bd7e5d..29aec15610 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -95,7 +95,17 @@ pub struct Snapshot { root_char_bag: CharBag, entries_by_path: SumTree, entries_by_id: SumTree, + + /// A number that increases every time the worktree begins scanning + /// a set of paths from the filesystem. This scanning could be caused + /// by some operation performed on the worktree, such as reading or + /// writing a file, or by an event reported by the filesystem. scan_id: usize, + + /// The latest scan id that has completed, and whose preceding scans + /// have all completed. The current `scan_id` could be more than one + /// greater than the `completed_scan_id` if operations are performed + /// on the worktree while it is processing a file-system event. completed_scan_id: usize, } @@ -2168,6 +2178,7 @@ impl BackgroundScanner { } { let mut snapshot = self.snapshot.lock(); + snapshot.scan_id += 1; ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true); if ignore_stack.is_all() { if let Some(mut root_entry) = snapshot.root_entry().cloned() { @@ -2189,6 +2200,10 @@ impl BackgroundScanner { .unwrap(); drop(scan_job_tx); self.scan_dirs(true, scan_job_rx).await; + { + let mut snapshot = self.snapshot.lock(); + snapshot.completed_scan_id = snapshot.scan_id; + } self.send_status_update(false, None); // Process any any FS events that occurred while performing the initial scan. @@ -2200,7 +2215,6 @@ impl BackgroundScanner { paths.extend(more_events.into_iter().map(|e| e.path)); } self.process_events(paths).await; - self.send_status_update(false, None); } self.finished_initial_scan = true; @@ -2212,9 +2226,8 @@ impl BackgroundScanner { // these before handling changes reported by the filesystem. request = self.refresh_requests_rx.recv().fuse() => { let Ok((paths, barrier)) = request else { break }; - self.reload_entries_for_paths(paths, None).await; - if !self.send_status_update(false, Some(barrier)) { - break; + if !self.process_refresh_request(paths, barrier).await { + return; } } @@ -2225,15 +2238,17 @@ impl BackgroundScanner { paths.extend(more_events.into_iter().map(|e| e.path)); } self.process_events(paths).await; - self.send_status_update(false, None); } } } } - async fn process_events(&mut self, paths: Vec) { - use futures::FutureExt as _; + async fn process_refresh_request(&self, paths: Vec, barrier: barrier::Sender) -> bool { + self.reload_entries_for_paths(paths, None).await; + self.send_status_update(false, Some(barrier)) + } + async fn process_events(&mut self, paths: Vec) { let (scan_job_tx, scan_job_rx) = channel::unbounded(); if let Some(mut paths) = self .reload_entries_for_paths(paths, Some(scan_job_tx.clone())) @@ -2245,35 +2260,7 @@ impl BackgroundScanner { drop(scan_job_tx); self.scan_dirs(false, scan_job_rx).await; - let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); - let snapshot = self.update_ignore_statuses(ignore_queue_tx); - self.executor - .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - loop { - select_biased! { - // Process any path refresh requests before moving on to process - // the queue of ignore statuses. - request = self.refresh_requests_rx.recv().fuse() => { - let Ok((paths, barrier)) = request else { break }; - self.reload_entries_for_paths(paths, None).await; - if !self.send_status_update(false, Some(barrier)) { - return; - } - } - - // Recursively process directories whose ignores have changed. - job = ignore_queue_rx.recv().fuse() => { - let Ok(job) = job else { break }; - self.update_ignore_status(job, &snapshot).await; - } - } - } - }); - } - }) - .await; + self.update_ignore_statuses().await; let mut snapshot = self.snapshot.lock(); let mut git_repositories = mem::take(&mut snapshot.git_repositories); @@ -2281,6 +2268,9 @@ impl BackgroundScanner { snapshot.git_repositories = git_repositories; snapshot.removed_entry_ids.clear(); snapshot.completed_scan_id = snapshot.scan_id; + drop(snapshot); + + self.send_status_update(false, None); } async fn scan_dirs( @@ -2313,8 +2303,7 @@ impl BackgroundScanner { // the scan queue, so that user operations are prioritized. request = self.refresh_requests_rx.recv().fuse() => { let Ok((paths, barrier)) = request else { break }; - self.reload_entries_for_paths(paths, None).await; - if !self.send_status_update(false, Some(barrier)) { + if !self.process_refresh_request(paths, barrier).await { return; } } @@ -2521,12 +2510,10 @@ impl BackgroundScanner { .await; let mut snapshot = self.snapshot.lock(); - - if snapshot.completed_scan_id == snapshot.scan_id { - snapshot.scan_id += 1; - if !doing_recursive_update { - snapshot.completed_scan_id = snapshot.scan_id; - } + let is_idle = snapshot.completed_scan_id == snapshot.scan_id; + snapshot.scan_id += 1; + if is_idle && !doing_recursive_update { + snapshot.completed_scan_id = snapshot.scan_id; } // Remove any entries for paths that no longer exist or are being recursively @@ -2596,16 +2583,17 @@ impl BackgroundScanner { Some(event_paths) } - fn update_ignore_statuses( - &self, - ignore_queue_tx: Sender, - ) -> LocalSnapshot { + async fn update_ignore_statuses(&self) { + use futures::FutureExt as _; + let mut snapshot = self.snapshot.lock().clone(); let mut ignores_to_update = Vec::new(); let mut ignores_to_delete = Vec::new(); for (parent_abs_path, (_, scan_id)) in &snapshot.ignores_by_parent_abs_path { if let Ok(parent_path) = parent_abs_path.strip_prefix(&snapshot.abs_path) { - if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() { + if *scan_id > snapshot.completed_scan_id + && snapshot.entry_for_path(parent_path).is_some() + { ignores_to_update.push(parent_abs_path.clone()); } @@ -2624,6 +2612,7 @@ impl BackgroundScanner { .remove(&parent_abs_path); } + let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); ignores_to_update.sort_unstable(); let mut ignores_to_update = ignores_to_update.into_iter().peekable(); while let Some(parent_abs_path) = ignores_to_update.next() { @@ -2642,8 +2631,34 @@ impl BackgroundScanner { })) .unwrap(); } + drop(ignore_queue_tx); - snapshot + self.executor + .scoped(|scope| { + for _ in 0..self.executor.num_cpus() { + scope.spawn(async { + loop { + select_biased! { + // Process any path refresh requests before moving on to process + // the queue of ignore statuses. + request = self.refresh_requests_rx.recv().fuse() => { + let Ok((paths, barrier)) = request else { break }; + if !self.process_refresh_request(paths, barrier).await { + return; + } + } + + // Recursively process directories whose ignores have changed. + job = ignore_queue_rx.recv().fuse() => { + let Ok(job) = job else { break }; + self.update_ignore_status(job, &snapshot).await; + } + } + } + }); + } + }) + .await; } async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) { @@ -3054,12 +3069,11 @@ mod tests { use fs::repository::FakeGitRepository; use fs::{FakeFs, RealFs}; use gpui::{executor::Deterministic, TestAppContext}; + use pretty_assertions::assert_eq; use rand::prelude::*; use serde_json::json; use std::{env, fmt::Write}; - use util::http::FakeHttpClient; - - use util::test::temp_tree; + use util::{http::FakeHttpClient, test::temp_tree}; #[gpui::test] async fn test_traversal(cx: &mut TestAppContext) { @@ -3461,7 +3475,7 @@ mod tests { } #[gpui::test(iterations = 30)] - async fn test_create_directory(cx: &mut TestAppContext) { + async fn test_create_directory_during_initial_scan(cx: &mut TestAppContext) { let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); let fs = FakeFs::new(cx.background()); @@ -3486,6 +3500,8 @@ mod tests { .await .unwrap(); + let mut snapshot1 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let entry = tree .update(cx, |tree, cx| { tree.as_local_mut() @@ -3497,10 +3513,14 @@ mod tests { assert!(entry.is_dir()); cx.foreground().run_until_parked(); - tree.read_with(cx, |tree, _| { assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir); }); + + let snapshot2 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let update = snapshot2.build_update(&snapshot1, 0, 0, true); + snapshot1.apply_remote_update(update).unwrap(); + assert_eq!(snapshot1.to_vec(true), snapshot2.to_vec(true),); } #[gpui::test(iterations = 100)] From bb1cfd51b83e396f99a9680bbac8284571c32a27 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 13 Apr 2023 22:34:03 -0700 Subject: [PATCH 2/3] Add randomized test for mutating worktree during initial scan --- crates/fs/src/fs.rs | 57 ++++++------ crates/project/src/worktree.rs | 165 +++++++++++++++++++++++++++++++-- 2 files changed, 188 insertions(+), 34 deletions(-) diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index c53c20c774..d856b71e39 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -523,31 +523,7 @@ impl FakeFs { } pub async fn insert_file(&self, path: impl AsRef, content: String) { - let mut state = self.state.lock(); - let path = path.as_ref(); - let inode = state.next_inode; - let mtime = state.next_mtime; - state.next_inode += 1; - state.next_mtime += Duration::from_nanos(1); - let file = Arc::new(Mutex::new(FakeFsEntry::File { - inode, - mtime, - content, - })); - state - .write_path(path, move |entry| { - match entry { - btree_map::Entry::Vacant(e) => { - e.insert(file); - } - btree_map::Entry::Occupied(mut e) => { - *e.get_mut() = file; - } - } - Ok(()) - }) - .unwrap(); - state.emit_event(&[path]); + self.write_file_internal(path, content).unwrap() } pub async fn insert_symlink(&self, path: impl AsRef, target: PathBuf) { @@ -569,6 +545,33 @@ impl FakeFs { state.emit_event(&[path]); } + fn write_file_internal(&self, path: impl AsRef, content: String) -> Result<()> { + let mut state = self.state.lock(); + let path = path.as_ref(); + let inode = state.next_inode; + let mtime = state.next_mtime; + state.next_inode += 1; + state.next_mtime += Duration::from_nanos(1); + let file = Arc::new(Mutex::new(FakeFsEntry::File { + inode, + mtime, + content, + })); + state.write_path(path, move |entry| { + match entry { + btree_map::Entry::Vacant(e) => { + e.insert(file); + } + btree_map::Entry::Occupied(mut e) => { + *e.get_mut() = file; + } + } + Ok(()) + })?; + state.emit_event(&[path]); + Ok(()) + } + pub async fn pause_events(&self) { self.state.lock().events_paused = true; } @@ -952,7 +955,7 @@ impl Fs for FakeFs { async fn atomic_write(&self, path: PathBuf, data: String) -> Result<()> { self.simulate_random_delay().await; let path = normalize_path(path.as_path()); - self.insert_file(path, data.to_string()).await; + self.write_file_internal(path, data.to_string())?; Ok(()) } @@ -961,7 +964,7 @@ impl Fs for FakeFs { self.simulate_random_delay().await; let path = normalize_path(path); let content = chunks(text, line_ending).collect(); - self.insert_file(path, content).await; + self.write_file_internal(path, content)?; Ok(()) } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 29aec15610..7a826740f1 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -3523,6 +3523,83 @@ mod tests { assert_eq!(snapshot1.to_vec(true), snapshot2.to_vec(true),); } + #[gpui::test(iterations = 100)] + async fn test_random_worktree_operations_during_initial_scan( + cx: &mut TestAppContext, + mut rng: StdRng, + ) { + let operations = env::var("OPERATIONS") + .map(|o| o.parse().unwrap()) + .unwrap_or(5); + let initial_entries = env::var("INITIAL_ENTRIES") + .map(|o| o.parse().unwrap()) + .unwrap_or(20); + + let root_dir = Path::new("/test"); + let fs = FakeFs::new(cx.background()) as Arc; + fs.as_fake().insert_tree(root_dir, json!({})).await; + for _ in 0..initial_entries { + randomly_mutate_fs(&fs, root_dir, 1.0, &mut rng).await; + } + log::info!("generated initial tree"); + + let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); + let worktree = Worktree::local( + client.clone(), + root_dir, + true, + fs.clone(), + Default::default(), + &mut cx.to_async(), + ) + .await + .unwrap(); + + let mut snapshot = worktree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + + for _ in 0..operations { + worktree + .update(cx, |worktree, cx| { + randomly_mutate_worktree(worktree, &mut rng, cx) + }) + .await + .log_err(); + worktree.read_with(cx, |tree, _| { + tree.as_local().unwrap().snapshot.check_invariants() + }); + + if rng.gen_bool(0.6) { + let new_snapshot = + worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let update = new_snapshot.build_update(&snapshot, 0, 0, true); + snapshot.apply_remote_update(update.clone()).unwrap(); + assert_eq!( + snapshot.to_vec(true), + new_snapshot.to_vec(true), + "incorrect snapshot after update {:?}", + update + ); + } + } + + worktree + .update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete()) + .await; + worktree.read_with(cx, |tree, _| { + tree.as_local().unwrap().snapshot.check_invariants() + }); + + let new_snapshot = worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let update = new_snapshot.build_update(&snapshot, 0, 0, true); + snapshot.apply_remote_update(update.clone()).unwrap(); + assert_eq!( + snapshot.to_vec(true), + new_snapshot.to_vec(true), + "incorrect snapshot after update {:?}", + update + ); + } + #[gpui::test(iterations = 100)] async fn test_random_worktree_changes(cx: &mut TestAppContext, mut rng: StdRng) { let operations = env::var("OPERATIONS") @@ -3536,18 +3613,17 @@ mod tests { let fs = FakeFs::new(cx.background()) as Arc; fs.as_fake().insert_tree(root_dir, json!({})).await; for _ in 0..initial_entries { - randomly_mutate_tree(&fs, root_dir, 1.0, &mut rng).await; + randomly_mutate_fs(&fs, root_dir, 1.0, &mut rng).await; } log::info!("generated initial tree"); - let next_entry_id = Arc::new(AtomicUsize::default()); let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); let worktree = Worktree::local( client.clone(), root_dir, true, fs.clone(), - next_entry_id.clone(), + Default::default(), &mut cx.to_async(), ) .await @@ -3603,14 +3679,14 @@ mod tests { let mut snapshots = Vec::new(); let mut mutations_len = operations; while mutations_len > 1 { - randomly_mutate_tree(&fs, root_dir, 1.0, &mut rng).await; + randomly_mutate_fs(&fs, root_dir, 1.0, &mut rng).await; let buffered_event_count = fs.as_fake().buffered_event_count().await; if buffered_event_count > 0 && rng.gen_bool(0.3) { let len = rng.gen_range(0..=buffered_event_count); log::info!("flushing {} events", len); fs.as_fake().flush_events(len).await; } else { - randomly_mutate_tree(&fs, root_dir, 0.6, &mut rng).await; + randomly_mutate_fs(&fs, root_dir, 0.6, &mut rng).await; mutations_len -= 1; } @@ -3635,7 +3711,7 @@ mod tests { root_dir, true, fs.clone(), - next_entry_id, + Default::default(), &mut cx.to_async(), ) .await @@ -3679,7 +3755,67 @@ mod tests { } } - async fn randomly_mutate_tree( + fn randomly_mutate_worktree( + worktree: &mut Worktree, + rng: &mut impl Rng, + cx: &mut ModelContext, + ) -> Task> { + let worktree = worktree.as_local_mut().unwrap(); + let snapshot = worktree.snapshot(); + let entry = snapshot.entries(false).choose(rng).unwrap(); + + match rng.gen_range(0_u32..100) { + 0..=33 if entry.path.as_ref() != Path::new("") => { + log::info!("deleting entry {:?} ({})", entry.path, entry.id.0); + worktree.delete_entry(entry.id, cx).unwrap() + } + ..=66 if entry.path.as_ref() != Path::new("") => { + let other_entry = snapshot.entries(false).choose(rng).unwrap(); + let new_parent_path = if other_entry.is_dir() { + other_entry.path.clone() + } else { + other_entry.path.parent().unwrap().into() + }; + let mut new_path = new_parent_path.join(gen_name(rng)); + if new_path.starts_with(&entry.path) { + new_path = gen_name(rng).into(); + } + + log::info!( + "renaming entry {:?} ({}) to {:?}", + entry.path, + entry.id.0, + new_path + ); + let task = worktree.rename_entry(entry.id, new_path, cx).unwrap(); + cx.foreground().spawn(async move { + task.await?; + Ok(()) + }) + } + _ => { + let task = if entry.is_dir() { + let child_path = entry.path.join(gen_name(rng)); + let is_dir = rng.gen_bool(0.3); + log::info!( + "creating {} at {:?}", + if is_dir { "dir" } else { "file" }, + child_path, + ); + worktree.create_entry(child_path, is_dir, cx) + } else { + log::info!("overwriting file {:?} ({})", entry.path, entry.id.0); + worktree.write_file(entry.path.clone(), "".into(), Default::default(), cx) + }; + cx.foreground().spawn(async move { + task.await?; + Ok(()) + }) + } + } + } + + async fn randomly_mutate_fs( fs: &Arc, root_path: &Path, insertion_probability: f64, @@ -3847,6 +3983,20 @@ mod tests { impl LocalSnapshot { fn check_invariants(&self) { + assert_eq!( + self.entries_by_path + .cursor::<()>() + .map(|e| (&e.path, e.id)) + .collect::>(), + self.entries_by_id + .cursor::<()>() + .map(|e| (&e.path, e.id)) + .collect::>() + .into_iter() + .collect::>(), + "entries_by_path and entries_by_id are inconsistent" + ); + let mut files = self.files(true, 0); let mut visible_files = self.files(false, 0); for entry in self.entries_by_path.cursor::<()>() { @@ -3857,6 +4007,7 @@ mod tests { } } } + assert!(files.next().is_none()); assert!(visible_files.next().is_none()); From 5ea49b3ae3d37710f2b97a04b7338dbb56283942 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 13 Apr 2023 22:34:34 -0700 Subject: [PATCH 3/3] Fix inconsistent worktree state when renaming entries while scanning --- crates/project/src/worktree.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 7a826740f1..d0cf2faa7e 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -1491,7 +1491,12 @@ impl LocalSnapshot { } let scan_id = self.scan_id; - self.entries_by_path.insert_or_replace(entry.clone(), &()); + let removed = self.entries_by_path.insert_or_replace(entry.clone(), &()); + if let Some(removed) = removed { + if removed.id != entry.id { + self.entries_by_id.remove(&removed.id, &()); + } + } self.entries_by_id.insert_or_replace( PathEntry { id: entry.id,