diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index c87187a32e..3033a6e9fa 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -13,6 +13,7 @@ use futures::{ mpsc::{self, UnboundedSender}, oneshot, }, + future::join_all, select_biased, task::Poll, FutureExt as _, Stream, StreamExt, @@ -450,6 +451,7 @@ struct BackgroundScannerState { changed_paths: Vec>, prev_snapshot: Snapshot, git_hosting_provider_registry: Option>, + repository_scans: HashMap, Task<()>>, } #[derive(Debug, Clone)] @@ -1336,7 +1338,7 @@ impl LocalWorktree { scan_requests_rx, path_prefixes_to_scan_rx, next_entry_id, - state: Mutex::new(BackgroundScannerState { + state: Arc::new(Mutex::new(BackgroundScannerState { prev_snapshot: snapshot.snapshot.clone(), snapshot, scanned_dirs: Default::default(), @@ -1344,8 +1346,9 @@ impl LocalWorktree { paths_to_scan: Default::default(), removed_entries: Default::default(), changed_paths: Default::default(), + repository_scans: HashMap::default(), git_hosting_provider_registry, - }), + })), phase: BackgroundScannerPhase::InitialScan, share_private_files, settings, @@ -4083,7 +4086,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { } struct BackgroundScanner { - state: Mutex, + state: Arc>, fs: Arc, fs_case_sensitive: bool, status_updates_tx: UnboundedSender, @@ -4097,7 +4100,7 @@ struct BackgroundScanner { share_private_files: bool, } -#[derive(PartialEq)] +#[derive(Copy, Clone, PartialEq)] enum BackgroundScannerPhase { InitialScan, EventsReceivedDuringInitialScan, @@ -4106,8 +4109,6 @@ enum BackgroundScannerPhase { impl BackgroundScanner { async fn run(&mut self, mut fs_events_rx: Pin>>>) { - use futures::FutureExt as _; - // If the worktree root does not contain a git repository, then find // the git repository in an ancestor directory. Find any gitignore files // in ancestor directories. @@ -4418,22 +4419,33 @@ impl BackgroundScanner { self.update_ignore_statuses(scan_job_tx).await; self.scan_dirs(false, scan_job_rx).await; - if !dot_git_abs_paths.is_empty() { - self.update_git_repositories(dot_git_abs_paths).await; - } + let status_update = if !dot_git_abs_paths.is_empty() { + Some(self.schedule_git_repositories_update(dot_git_abs_paths)) + } else { + None + }; - { - let mut state = self.state.lock(); - state.snapshot.completed_scan_id = state.snapshot.scan_id; - for (_, entry) in mem::take(&mut state.removed_entries) { - state.scanned_dirs.remove(&entry.id); - } - } + let phase = self.phase; + let status_update_tx = self.status_updates_tx.clone(); + let state = self.state.clone(); + self.executor + .spawn(async move { + if let Some(status_update) = status_update { + status_update.await; + } - #[cfg(test)] - self.state.lock().snapshot.check_git_invariants(); - - self.send_status_update(false, SmallVec::new()); + { + let mut state = state.lock(); + state.snapshot.completed_scan_id = state.snapshot.scan_id; + for (_, entry) in mem::take(&mut state.removed_entries) { + state.scanned_dirs.remove(&entry.id); + } + #[cfg(test)] + state.snapshot.check_git_invariants(); + } + send_status_update_inner(phase, state, status_update_tx, false, SmallVec::new()); + }) + .detach(); } async fn forcibly_load_paths(&self, paths: &[Arc]) -> bool { @@ -4467,8 +4479,6 @@ impl BackgroundScanner { enable_progress_updates: bool, scan_jobs_rx: channel::Receiver, ) { - use futures::FutureExt as _; - if self .status_updates_tx .unbounded_send(ScanState::Started) @@ -4536,24 +4546,13 @@ impl BackgroundScanner { } fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool { - let mut state = self.state.lock(); - if state.changed_paths.is_empty() && scanning { - return true; - } - - let new_snapshot = state.snapshot.clone(); - let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); - let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths); - state.changed_paths.clear(); - - self.status_updates_tx - .unbounded_send(ScanState::Updated { - snapshot: new_snapshot, - changes, - scanning, - barrier, - }) - .is_ok() + send_status_update_inner( + self.phase, + self.state.clone(), + self.status_updates_tx.clone(), + scanning, + barrier, + ) } async fn scan_dir(&self, job: &ScanJob) -> Result<()> { @@ -4609,9 +4608,7 @@ impl BackgroundScanner { ); if let Some(local_repo) = repo { - self.update_git_statuses(UpdateGitStatusesJob { - local_repository: local_repo, - }); + let _ = self.schedule_git_statuses_update(local_repo); } } else if child_name == *GITIGNORE { match build_gitignore(&child_abs_path, self.fs.as_ref()).await { @@ -4968,8 +4965,6 @@ impl BackgroundScanner { } async fn update_ignore_statuses(&self, scan_job_tx: Sender) { - use futures::FutureExt as _; - let mut ignores_to_update = Vec::new(); let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); let prev_snapshot; @@ -5119,10 +5114,10 @@ impl BackgroundScanner { state.snapshot.entries_by_id.edit(entries_by_id_edits, &()); } - async fn update_git_repositories(&self, dot_git_paths: Vec) { + fn schedule_git_repositories_update(&self, dot_git_paths: Vec) -> Task<()> { log::debug!("reloading repositories: {dot_git_paths:?}"); - let mut repo_updates = Vec::new(); + let mut repos_to_update = Vec::new(); { let mut state = self.state.lock(); let scan_id = state.snapshot.scan_id; @@ -5182,7 +5177,7 @@ impl BackgroundScanner { } }; - repo_updates.push(UpdateGitStatusesJob { local_repository }); + repos_to_update.push(local_repository); } // Remove any git repositories whose .git entry no longer exists. @@ -5213,223 +5208,98 @@ impl BackgroundScanner { }); } - let (mut updates_done_tx, mut updates_done_rx) = barrier::channel(); - self.executor - .scoped(|scope| { - scope.spawn(async { - for repo_update in repo_updates { - self.update_git_statuses(repo_update); - } - updates_done_tx.blocking_send(()).ok(); - }); - - scope.spawn(async { - loop { - select_biased! { - // Process any path refresh requests before moving on to process - // the queue of git statuses. - request = self.next_scan_request().fuse() => { - let Ok(request) = request else { break }; - if !self.process_scan_request(request, true).await { - return; - } - } - _ = updates_done_rx.recv().fuse() => break, - } - } - }); - }) - .await; + let mut status_updates = Vec::new(); + for local_repository in repos_to_update { + status_updates.push(self.schedule_git_statuses_update(local_repository)); + } + self.executor.spawn(async move { + let _updates_finished: Vec> = + join_all(status_updates).await; + }) } /// Update the git statuses for a given batch of entries. - fn update_git_statuses(&self, job: UpdateGitStatusesJob) { - log::trace!( - "updating git statuses for repo {:?}", - job.local_repository.work_directory.path - ); - let t0 = Instant::now(); - - let Some(statuses) = job - .local_repository - .repo() - .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()]) - .log_err() - else { - return; - }; - log::trace!( - "computed git statuses for repo {:?} in {:?}", - job.local_repository.work_directory.path, - t0.elapsed() - ); - - let t0 = Instant::now(); - let mut changed_paths = Vec::new(); - let snapshot = self.state.lock().snapshot.snapshot.clone(); - - let Some(mut repository) = - snapshot.repository(job.local_repository.work_directory.path_key()) - else { - log::error!("Got an UpdateGitStatusesJob for a repository that isn't in the snapshot"); - debug_assert!(false); - return; - }; - - let mut new_entries_by_path = SumTree::new(&()); - for (repo_path, status) in statuses.entries.iter() { - let project_path = repository.work_directory.unrelativize(repo_path); - - new_entries_by_path.insert_or_replace( - StatusEntry { - repo_path: repo_path.clone(), - status: *status, - }, - &(), - ); - - if let Some(path) = project_path { - changed_paths.push(path); - } - } - - repository.statuses_by_path = new_entries_by_path; - let mut state = self.state.lock(); - state - .snapshot - .repositories - .insert_or_replace(repository, &()); - - util::extend_sorted( - &mut state.changed_paths, - changed_paths, - usize::MAX, - Ord::cmp, - ); - - log::trace!( - "applied git status updates for repo {:?} in {:?}", - job.local_repository.work_directory.path, - t0.elapsed(), - ); - } - - fn build_change_set( + fn schedule_git_statuses_update( &self, - old_snapshot: &Snapshot, - new_snapshot: &Snapshot, - event_paths: &[Arc], - ) -> UpdatedEntriesSet { - use BackgroundScannerPhase::*; - use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; + local_repository: LocalRepositoryEntry, + ) -> oneshot::Receiver<()> { + let repository_path = local_repository.work_directory.path.clone(); + let state = self.state.clone(); + let (tx, rx) = oneshot::channel(); - // Identify which paths have changed. Use the known set of changed - // parent paths to optimize the search. - let mut changes = Vec::new(); - let mut old_paths = old_snapshot.entries_by_path.cursor::(&()); - let mut new_paths = new_snapshot.entries_by_path.cursor::(&()); - let mut last_newly_loaded_dir_path = None; - old_paths.next(&()); - new_paths.next(&()); - for path in event_paths { - let path = PathKey(path.clone()); - if old_paths.item().map_or(false, |e| e.path < path.0) { - old_paths.seek_forward(&path, Bias::Left, &()); - } - if new_paths.item().map_or(false, |e| e.path < path.0) { - new_paths.seek_forward(&path, Bias::Left, &()); - } - loop { - match (old_paths.item(), new_paths.item()) { - (Some(old_entry), Some(new_entry)) => { - if old_entry.path > path.0 - && new_entry.path > path.0 - && !old_entry.path.starts_with(&path.0) - && !new_entry.path.starts_with(&path.0) - { - break; - } + self.state.lock().repository_scans.insert( + repository_path.clone(), + self.executor.spawn(async move { + log::trace!("updating git statuses for repo {repository_path:?}",); + let t0 = Instant::now(); - match Ord::cmp(&old_entry.path, &new_entry.path) { - Ordering::Less => { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - old_paths.next(&()); - } - Ordering::Equal => { - if self.phase == EventsReceivedDuringInitialScan { - if old_entry.id != new_entry.id { - changes.push(( - old_entry.path.clone(), - old_entry.id, - Removed, - )); - } - // If the worktree was not fully initialized when this event was generated, - // we can't know whether this entry was added during the scan or whether - // it was merely updated. - changes.push(( - new_entry.path.clone(), - new_entry.id, - AddedOrUpdated, - )); - } else if old_entry.id != new_entry.id { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - changes.push((new_entry.path.clone(), new_entry.id, Added)); - } else if old_entry != new_entry { - if old_entry.kind.is_unloaded() { - last_newly_loaded_dir_path = Some(&new_entry.path); - changes.push(( - new_entry.path.clone(), - new_entry.id, - Loaded, - )); - } else { - changes.push(( - new_entry.path.clone(), - new_entry.id, - Updated, - )); - } - } - old_paths.next(&()); - new_paths.next(&()); - } - Ordering::Greater => { - let is_newly_loaded = self.phase == InitialScan - || last_newly_loaded_dir_path - .as_ref() - .map_or(false, |dir| new_entry.path.starts_with(dir)); - changes.push(( - new_entry.path.clone(), - new_entry.id, - if is_newly_loaded { Loaded } else { Added }, - )); - new_paths.next(&()); - } - } + let Some(statuses) = local_repository + .repo() + .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()]) + .log_err() + else { + return; + }; + log::trace!( + "computed git statuses for repo {:?} in {:?}", + repository_path, + t0.elapsed() + ); + + let t0 = Instant::now(); + let mut changed_paths = Vec::new(); + let snapshot = state.lock().snapshot.snapshot.clone(); + + let Some(mut repository) = + snapshot.repository(local_repository.work_directory.path_key()) + else { + log::error!( + "Tried to update git statuses for a repository that isn't in the snapshot" + ); + debug_assert!(false); + return; + }; + + let mut new_entries_by_path = SumTree::new(&()); + for (repo_path, status) in statuses.entries.iter() { + let project_path = repository.work_directory.unrelativize(repo_path); + + new_entries_by_path.insert_or_replace( + StatusEntry { + repo_path: repo_path.clone(), + status: *status, + }, + &(), + ); + + if let Some(path) = project_path { + changed_paths.push(path); } - (Some(old_entry), None) => { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - old_paths.next(&()); - } - (None, Some(new_entry)) => { - let is_newly_loaded = self.phase == InitialScan - || last_newly_loaded_dir_path - .as_ref() - .map_or(false, |dir| new_entry.path.starts_with(dir)); - changes.push(( - new_entry.path.clone(), - new_entry.id, - if is_newly_loaded { Loaded } else { Added }, - )); - new_paths.next(&()); - } - (None, None) => break, } - } - } - changes.into() + repository.statuses_by_path = new_entries_by_path; + let mut state = state.lock(); + state + .snapshot + .repositories + .insert_or_replace(repository, &()); + + util::extend_sorted( + &mut state.changed_paths, + changed_paths, + usize::MAX, + Ord::cmp, + ); + + log::trace!( + "applied git status updates for repo {:?} in {:?}", + repository_path, + t0.elapsed(), + ); + tx.send(()).ok(); + }), + ); + rx } async fn progress_timer(&self, running: bool) { @@ -5459,6 +5329,139 @@ impl BackgroundScanner { } } +fn send_status_update_inner( + phase: BackgroundScannerPhase, + state: Arc>, + status_updates_tx: UnboundedSender, + scanning: bool, + barrier: SmallVec<[barrier::Sender; 1]>, +) -> bool { + let mut state = state.lock(); + if state.changed_paths.is_empty() && scanning { + return true; + } + + let new_snapshot = state.snapshot.clone(); + let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); + let changes = build_change_set(phase, &old_snapshot, &new_snapshot, &state.changed_paths); + state.changed_paths.clear(); + + status_updates_tx + .unbounded_send(ScanState::Updated { + snapshot: new_snapshot, + changes, + scanning, + barrier, + }) + .is_ok() +} + +fn build_change_set( + phase: BackgroundScannerPhase, + old_snapshot: &Snapshot, + new_snapshot: &Snapshot, + event_paths: &[Arc], +) -> UpdatedEntriesSet { + use BackgroundScannerPhase::*; + use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; + + // Identify which paths have changed. Use the known set of changed + // parent paths to optimize the search. + let mut changes = Vec::new(); + let mut old_paths = old_snapshot.entries_by_path.cursor::(&()); + let mut new_paths = new_snapshot.entries_by_path.cursor::(&()); + let mut last_newly_loaded_dir_path = None; + old_paths.next(&()); + new_paths.next(&()); + for path in event_paths { + let path = PathKey(path.clone()); + if old_paths.item().map_or(false, |e| e.path < path.0) { + old_paths.seek_forward(&path, Bias::Left, &()); + } + if new_paths.item().map_or(false, |e| e.path < path.0) { + new_paths.seek_forward(&path, Bias::Left, &()); + } + loop { + match (old_paths.item(), new_paths.item()) { + (Some(old_entry), Some(new_entry)) => { + if old_entry.path > path.0 + && new_entry.path > path.0 + && !old_entry.path.starts_with(&path.0) + && !new_entry.path.starts_with(&path.0) + { + break; + } + + match Ord::cmp(&old_entry.path, &new_entry.path) { + Ordering::Less => { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + old_paths.next(&()); + } + Ordering::Equal => { + if phase == EventsReceivedDuringInitialScan { + if old_entry.id != new_entry.id { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + } + // If the worktree was not fully initialized when this event was generated, + // we can't know whether this entry was added during the scan or whether + // it was merely updated. + changes.push(( + new_entry.path.clone(), + new_entry.id, + AddedOrUpdated, + )); + } else if old_entry.id != new_entry.id { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + changes.push((new_entry.path.clone(), new_entry.id, Added)); + } else if old_entry != new_entry { + if old_entry.kind.is_unloaded() { + last_newly_loaded_dir_path = Some(&new_entry.path); + changes.push((new_entry.path.clone(), new_entry.id, Loaded)); + } else { + changes.push((new_entry.path.clone(), new_entry.id, Updated)); + } + } + old_paths.next(&()); + new_paths.next(&()); + } + Ordering::Greater => { + let is_newly_loaded = phase == InitialScan + || last_newly_loaded_dir_path + .as_ref() + .map_or(false, |dir| new_entry.path.starts_with(dir)); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if is_newly_loaded { Loaded } else { Added }, + )); + new_paths.next(&()); + } + } + } + (Some(old_entry), None) => { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + old_paths.next(&()); + } + (None, Some(new_entry)) => { + let is_newly_loaded = phase == InitialScan + || last_newly_loaded_dir_path + .as_ref() + .map_or(false, |dir| new_entry.path.starts_with(dir)); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if is_newly_loaded { Loaded } else { Added }, + )); + new_paths.next(&()); + } + (None, None) => break, + } + } + } + + changes.into() +} + fn swap_to_front(child_paths: &mut Vec, file: &OsStr) { let position = child_paths .iter() @@ -5521,10 +5524,6 @@ struct UpdateIgnoreStatusJob { scan_queue: Sender, } -struct UpdateGitStatusesJob { - local_repository: LocalRepositoryEntry, -} - pub trait WorktreeModelHandle { #[cfg(any(test, feature = "test-support"))] fn flush_fs_events<'a>( diff --git a/crates/worktree/src/worktree_tests.rs b/crates/worktree/src/worktree_tests.rs index 2cee728aec..34e1f0063e 100644 --- a/crates/worktree/src/worktree_tests.rs +++ b/crates/worktree/src/worktree_tests.rs @@ -24,6 +24,7 @@ use std::{ mem, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use util::{test::TempTree, ResultExt}; @@ -1504,6 +1505,7 @@ async fn test_bump_mtime_of_git_repo_workdir(cx: &mut TestAppContext) { &[(Path::new("b/c.txt"), StatusCode::Modified.index())], ); cx.executor().run_until_parked(); + cx.executor().advance_clock(Duration::from_secs(1)); let snapshot = tree.read_with(cx, |tree, _| tree.snapshot());