From 768dfc8b6bf94b09c7b30dfe07784724707bdb3d Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Fri, 14 Mar 2025 18:20:24 -0400 Subject: [PATCH] Reinstate failing worktree tests (#26733) Just debugging for now Release Notes: - N/A --- crates/worktree/src/worktree.rs | 248 +++++++++++++------------- crates/worktree/src/worktree_tests.rs | 20 +-- 2 files changed, 131 insertions(+), 137 deletions(-) diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index dd7381eff1..8cd3b09fe7 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -61,7 +61,7 @@ use std::{ path::{Component, Path, PathBuf}, pin::Pin, sync::{ - atomic::{self, AtomicU32, AtomicUsize, Ordering::SeqCst}, + atomic::{self, AtomicI32, AtomicUsize, Ordering::SeqCst}, Arc, }, time::{Duration, Instant}, @@ -1525,6 +1525,7 @@ impl LocalWorktree { fs, fs_case_sensitive, status_updates_tx: scan_states_tx, + scans_running: Arc::new(AtomicI32::new(0)), executor: background, scan_requests_rx, path_prefixes_to_scan_rx, @@ -4249,11 +4250,6 @@ struct PathEntry { scan_id: usize, } -#[derive(Debug, Default)] -struct FsScanned { - status_scans: Arc, -} - impl sum_tree::Item for PathEntry { type Summary = PathEntrySummary; @@ -4321,6 +4317,7 @@ struct BackgroundScanner { fs: Arc, fs_case_sensitive: bool, status_updates_tx: UnboundedSender, + scans_running: Arc, executor: BackgroundExecutor, scan_requests_rx: channel::Receiver, path_prefixes_to_scan_rx: channel::Receiver, @@ -4428,13 +4425,13 @@ impl BackgroundScanner { // Perform an initial scan of the directory. drop(scan_job_tx); - let scans_running = self.scan_dirs(true, scan_job_rx).await; + self.scan_dirs(true, scan_job_rx).await; { let mut state = self.state.lock(); state.snapshot.completed_scan_id = state.snapshot.scan_id; } - let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + let scanning = self.scans_running.load(atomic::Ordering::Acquire) > 0; self.send_status_update(scanning, SmallVec::new()); // Process any any FS events that occurred while performing the initial scan. @@ -4461,7 +4458,7 @@ impl BackgroundScanner { // these before handling changes reported by the filesystem. request = self.next_scan_request().fuse() => { let Ok(request) = request else { break }; - let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + let scanning = self.scans_running.load(atomic::Ordering::Acquire) > 0; if !self.process_scan_request(request, scanning).await { return; } @@ -4484,7 +4481,7 @@ impl BackgroundScanner { self.process_events(vec![abs_path]).await; } } - let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + let scanning = self.scans_running.load(atomic::Ordering::Acquire) > 0; self.send_status_update(scanning, request.done); } @@ -4678,7 +4675,7 @@ impl BackgroundScanner { .await; self.update_ignore_statuses(scan_job_tx).await; - let scans_running = self.scan_dirs(false, scan_job_rx).await; + self.scan_dirs(false, scan_job_rx).await; let status_update = if !dot_git_abs_paths.is_empty() { Some(self.update_git_repositories(dot_git_abs_paths)) @@ -4689,6 +4686,7 @@ impl BackgroundScanner { let phase = self.phase; let status_update_tx = self.status_updates_tx.clone(); let state = self.state.clone(); + let scans_running = self.scans_running.clone(); self.executor .spawn(async move { if let Some(status_update) = status_update { @@ -4704,7 +4702,7 @@ impl BackgroundScanner { #[cfg(test)] state.snapshot.check_git_invariants(); } - let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + let scanning = scans_running.load(atomic::Ordering::Acquire) > 0; send_status_update_inner(phase, state, status_update_tx, scanning, SmallVec::new()); }) .detach(); @@ -4729,9 +4727,8 @@ impl BackgroundScanner { } drop(scan_job_tx); } - let scans_running = Arc::new(AtomicU32::new(0)); while let Ok(job) = scan_job_rx.recv().await { - self.scan_dir(&scans_running, &job).await.log_err(); + self.scan_dir(&job).await.log_err(); } !mem::take(&mut self.state.lock().paths_to_scan).is_empty() @@ -4741,16 +4738,16 @@ impl BackgroundScanner { &self, enable_progress_updates: bool, scan_jobs_rx: channel::Receiver, - ) -> FsScanned { + ) { if self .status_updates_tx .unbounded_send(ScanState::Started) .is_err() { - return FsScanned::default(); + return; } - let scans_running = Arc::new(AtomicU32::new(1)); + inc_scans_running(&self.scans_running); let progress_update_count = AtomicUsize::new(0); self.executor .scoped(|scope| { @@ -4795,7 +4792,7 @@ impl BackgroundScanner { // Recursively load directories from the file system. job = scan_jobs_rx.recv().fuse() => { let Ok(job) = job else { break }; - if let Err(err) = self.scan_dir(&scans_running, &job).await { + if let Err(err) = self.scan_dir(&job).await { if job.path.as_ref() != Path::new("") { log::error!("error scanning directory {:?}: {}", job.abs_path, err); } @@ -4808,10 +4805,7 @@ impl BackgroundScanner { }) .await; - scans_running.fetch_sub(1, atomic::Ordering::Release); - FsScanned { - status_scans: scans_running, - } + dec_scans_running(&self.scans_running, 1); } fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool { @@ -4824,7 +4818,7 @@ impl BackgroundScanner { ) } - async fn scan_dir(&self, scans_running: &Arc, job: &ScanJob) -> Result<()> { + async fn scan_dir(&self, job: &ScanJob) -> Result<()> { let root_abs_path; let root_char_bag; { @@ -4879,7 +4873,7 @@ impl BackgroundScanner { self.watcher.as_ref(), ); if let Some(local_repo) = repo { - scans_running.fetch_add(1, atomic::Ordering::Release); + inc_scans_running(&self.scans_running); git_status_update_jobs .push(self.schedule_git_statuses_update(&mut state, local_repo)); } @@ -5002,7 +4996,7 @@ impl BackgroundScanner { let task_state = self.state.clone(); let phase = self.phase; let status_updates_tx = self.status_updates_tx.clone(); - let scans_running = scans_running.clone(); + let scans_running = self.scans_running.clone(); self.executor .spawn(async move { if !git_status_update_jobs.is_empty() { @@ -5010,7 +5004,7 @@ impl BackgroundScanner { let status_updated = status_updates .iter() .any(|update_result| update_result.is_ok()); - scans_running.fetch_sub(status_updates.len() as u32, atomic::Ordering::Release); + dec_scans_running(&scans_running, status_updates.len() as i32); if status_updated { let scanning = scans_running.load(atomic::Ordering::Acquire) > 0; send_status_update_inner( @@ -5512,106 +5506,15 @@ impl BackgroundScanner { fn schedule_git_statuses_update( &self, state: &mut BackgroundScannerState, - mut local_repository: LocalRepositoryEntry, + local_repository: LocalRepositoryEntry, ) -> oneshot::Receiver<()> { - let repository_name = local_repository.work_directory.display_name(); - let path_key = local_repository.work_directory.path_key(); - let job_state = self.state.clone(); let (tx, rx) = oneshot::channel(); state.repository_scans.insert( - path_key.clone(), - self.executor.spawn(async move { - update_branches(&job_state, &mut local_repository) - .await - .log_err(); - log::trace!("updating git statuses for repo {repository_name}",); - let t0 = Instant::now(); - - let Some(statuses) = local_repository - .repo() - .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()]) - .log_err() - else { - return; - }; - log::trace!( - "computed git statuses for repo {repository_name} in {:?}", - t0.elapsed() - ); - - let t0 = Instant::now(); - let mut changed_paths = Vec::new(); - let snapshot = job_state.lock().snapshot.snapshot.clone(); - - let Some(mut repository) = snapshot - .repository(path_key) - .context( - "Tried to update git statuses for a repository that isn't in the snapshot", - ) - .log_err() - else { - return; - }; - - let merge_head_shas = local_repository.repo().merge_head_shas(); - if merge_head_shas != local_repository.current_merge_head_shas { - mem::take(&mut repository.current_merge_conflicts); - } - - let mut new_entries_by_path = SumTree::new(&()); - for (repo_path, status) in statuses.entries.iter() { - let project_path = repository.work_directory.try_unrelativize(repo_path); - - new_entries_by_path.insert_or_replace( - StatusEntry { - repo_path: repo_path.clone(), - status: *status, - }, - &(), - ); - if status.is_conflicted() { - repository.current_merge_conflicts.insert(repo_path.clone()); - } - - if let Some(path) = project_path { - changed_paths.push(path); - } - } - - repository.statuses_by_path = new_entries_by_path; - let mut state = job_state.lock(); - state - .snapshot - .repositories - .insert_or_replace(repository, &()); - state.snapshot.git_repositories.update( - &local_repository.work_directory_id, - |entry| { - entry.current_merge_head_shas = merge_head_shas; - entry.merge_message = std::fs::read_to_string( - local_repository.dot_git_dir_abs_path.join("MERGE_MSG"), - ) - .ok() - .and_then(|merge_msg| Some(merge_msg.lines().next()?.to_owned())); - entry.status_scan_id += 1; - }, - ); - - util::extend_sorted( - &mut state.changed_paths, - changed_paths, - usize::MAX, - Ord::cmp, - ); - - log::trace!( - "applied git status updates for repo {repository_name} in {:?}", - t0.elapsed(), - ); - tx.send(()).ok(); - }), + local_repository.work_directory.path_key(), + self.executor + .spawn(do_git_status_update(job_state, local_repository, tx)), ); rx } @@ -5643,6 +5546,15 @@ impl BackgroundScanner { } } +fn inc_scans_running(scans_running: &AtomicI32) { + scans_running.fetch_add(1, atomic::Ordering::Release); +} + +fn dec_scans_running(scans_running: &AtomicI32, by: i32) { + let old = scans_running.fetch_sub(by, atomic::Ordering::Release); + debug_assert!(old >= by); +} + fn send_status_update_inner( phase: BackgroundScannerPhase, state: Arc>, @@ -5690,6 +5602,100 @@ async fn update_branches( Ok(()) } +async fn do_git_status_update( + job_state: Arc>, + mut local_repository: LocalRepositoryEntry, + tx: oneshot::Sender<()>, +) { + let repository_name = local_repository.work_directory.display_name(); + log::trace!("updating git branches for repo {repository_name}"); + update_branches(&job_state, &mut local_repository) + .await + .log_err(); + let t0 = Instant::now(); + + log::trace!("updating git statuses for repo {repository_name}"); + let Some(statuses) = local_repository + .repo() + .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()]) + .log_err() + else { + return; + }; + log::trace!( + "computed git statuses for repo {repository_name} in {:?}", + t0.elapsed() + ); + + let t0 = Instant::now(); + let mut changed_paths = Vec::new(); + let snapshot = job_state.lock().snapshot.snapshot.clone(); + + let Some(mut repository) = snapshot + .repository(local_repository.work_directory.path_key()) + .context("Tried to update git statuses for a repository that isn't in the snapshot") + .log_err() + else { + return; + }; + + let merge_head_shas = local_repository.repo().merge_head_shas(); + if merge_head_shas != local_repository.current_merge_head_shas { + mem::take(&mut repository.current_merge_conflicts); + } + + let mut new_entries_by_path = SumTree::new(&()); + for (repo_path, status) in statuses.entries.iter() { + let project_path = repository.work_directory.try_unrelativize(repo_path); + + new_entries_by_path.insert_or_replace( + StatusEntry { + repo_path: repo_path.clone(), + status: *status, + }, + &(), + ); + if status.is_conflicted() { + repository.current_merge_conflicts.insert(repo_path.clone()); + } + + if let Some(path) = project_path { + changed_paths.push(path); + } + } + + repository.statuses_by_path = new_entries_by_path; + let mut state = job_state.lock(); + state + .snapshot + .repositories + .insert_or_replace(repository, &()); + state + .snapshot + .git_repositories + .update(&local_repository.work_directory_id, |entry| { + entry.current_merge_head_shas = merge_head_shas; + entry.merge_message = + std::fs::read_to_string(local_repository.dot_git_dir_abs_path.join("MERGE_MSG")) + .ok() + .and_then(|merge_msg| Some(merge_msg.lines().next()?.to_owned())); + entry.status_scan_id += 1; + }); + + util::extend_sorted( + &mut state.changed_paths, + changed_paths, + usize::MAX, + Ord::cmp, + ); + + log::trace!( + "applied git status updates for repo {repository_name} in {:?}", + t0.elapsed(), + ); + tx.send(()).ok(); +} + fn build_diff( phase: BackgroundScannerPhase, old_snapshot: &Snapshot, diff --git a/crates/worktree/src/worktree_tests.rs b/crates/worktree/src/worktree_tests.rs index e0aa1e9331..2ba8ac39e4 100644 --- a/crates/worktree/src/worktree_tests.rs +++ b/crates/worktree/src/worktree_tests.rs @@ -845,9 +845,7 @@ async fn test_update_gitignore(cx: &mut TestAppContext) { }); } -// TODO: Fix flaky test. -// #[gpui::test] -#[allow(unused)] +#[gpui::test] async fn test_write_file(cx: &mut TestAppContext) { init_test(cx); cx.executor().allow_parking(); @@ -2432,9 +2430,7 @@ async fn test_git_repository_for_path(cx: &mut TestAppContext) { // you can't rename a directory which some program has already open. This is a // limitation of the Windows. See: // https://stackoverflow.com/questions/41365318/access-is-denied-when-renaming-folder -// TODO: Fix flaky test. -// #[gpui::test] -#[allow(unused)] +#[gpui::test] #[cfg_attr(target_os = "windows", ignore)] async fn test_file_status(cx: &mut TestAppContext) { init_test(cx); @@ -2627,9 +2623,7 @@ async fn test_file_status(cx: &mut TestAppContext) { }); } -// TODO: Fix flaky test. -// #[gpui::test] -#[allow(unused)] +#[gpui::test] async fn test_git_repository_status(cx: &mut TestAppContext) { init_test(cx); cx.executor().allow_parking(); @@ -2743,9 +2737,7 @@ async fn test_git_repository_status(cx: &mut TestAppContext) { }); } -// TODO: Fix flaky test. -// #[gpui::test] -#[allow(unused)] +#[gpui::test] async fn test_git_status_postprocessing(cx: &mut TestAppContext) { init_test(cx); cx.executor().allow_parking(); @@ -3541,8 +3533,6 @@ fn git_cherry_pick(commit: &git2::Commit<'_>, repo: &git2::Repository) { repo.cherrypick(commit, None).expect("Failed to cherrypick"); } -// TODO: Remove allow(unused) once flaky tests are reinstated -#[allow(unused)] #[track_caller] fn git_stash(repo: &mut git2::Repository) { use git2::Signature; @@ -3552,8 +3542,6 @@ fn git_stash(repo: &mut git2::Repository) { .expect("Failed to stash"); } -// TODO: Remove allow(unused) once flaky tests are reinstated -#[allow(unused)] #[track_caller] fn git_reset(offset: usize, repo: &git2::Repository) { let head = repo.head().expect("Couldn't get repo head");