Restructure background scanner to handle refresh requests even while scanning

This commit is contained in:
Max Brunsfeld 2023-04-11 15:15:45 -07:00
parent 6d8635fa29
commit 2d97387f49
3 changed files with 283 additions and 360 deletions

View file

@ -2183,7 +2183,7 @@ async fn test_apply_code_actions_with_commands(cx: &mut gpui::TestAppContext) {
}); });
} }
#[gpui::test] #[gpui::test(iterations = 10)]
async fn test_save_file(cx: &mut gpui::TestAppContext) { async fn test_save_file(cx: &mut gpui::TestAppContext) {
let fs = FakeFs::new(cx.background()); let fs = FakeFs::new(cx.background());
fs.insert_tree( fs.insert_tree(

View file

@ -12,7 +12,9 @@ use futures::{
mpsc::{self, UnboundedSender}, mpsc::{self, UnboundedSender},
oneshot, oneshot,
}, },
select_biased, Stream, StreamExt, select_biased,
task::Poll,
Stream, StreamExt,
}; };
use fuzzy::CharBag; use fuzzy::CharBag;
use git::{DOT_GIT, GITIGNORE}; use git::{DOT_GIT, GITIGNORE};
@ -41,11 +43,11 @@ use std::{
mem, mem,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin,
sync::{ sync::{
atomic::{AtomicUsize, Ordering::SeqCst}, atomic::{AtomicUsize, Ordering::SeqCst},
Arc, Arc,
}, },
task::Poll,
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet}; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet};
@ -154,20 +156,12 @@ impl DerefMut for LocalSnapshot {
} }
enum ScanState { enum ScanState {
/// The worktree is performing its initial scan of the filesystem. Started,
Initializing {
snapshot: LocalSnapshot,
barrier: Option<barrier::Sender>,
},
Initialized {
snapshot: LocalSnapshot,
},
/// The worktree is updating in response to filesystem events.
Updating,
Updated { Updated {
snapshot: LocalSnapshot, snapshot: LocalSnapshot,
changes: HashMap<Arc<Path>, PathChange>, changes: HashMap<Arc<Path>, PathChange>,
barrier: Option<barrier::Sender>, barrier: Option<barrier::Sender>,
scanning: bool,
}, },
} }
@ -244,9 +238,24 @@ impl Worktree {
cx.spawn_weak(|this, mut cx| async move { cx.spawn_weak(|this, mut cx| async move {
while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) { while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) {
this.update(&mut cx, |this, cx| { this.update(&mut cx, |this, cx| {
this.as_local_mut() let this = this.as_local_mut().unwrap();
.unwrap() match state {
.background_scanner_updated(state, cx); ScanState::Started => {
*this.is_scanning.0.borrow_mut() = true;
}
ScanState::Updated {
snapshot,
changes,
barrier,
scanning,
} => {
*this.is_scanning.0.borrow_mut() = scanning;
this.set_snapshot(snapshot, cx);
cx.emit(Event::UpdatedEntries(changes));
drop(barrier);
}
}
cx.notify();
}); });
} }
}) })
@ -258,9 +267,15 @@ impl Worktree {
let background = cx.background().clone(); let background = cx.background().clone();
async move { async move {
let events = fs.watch(&abs_path, Duration::from_millis(100)).await; let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
BackgroundScanner::new(snapshot, scan_states_tx, fs, background) BackgroundScanner::new(
.run(events, path_changes_rx) snapshot,
.await; fs,
scan_states_tx,
background,
path_changes_rx,
)
.run(events)
.await;
} }
}); });
@ -533,38 +548,6 @@ impl LocalWorktree {
Ok(updated) Ok(updated)
} }
fn background_scanner_updated(
&mut self,
scan_state: ScanState,
cx: &mut ModelContext<Worktree>,
) {
match scan_state {
ScanState::Initializing { snapshot, barrier } => {
*self.is_scanning.0.borrow_mut() = true;
self.set_snapshot(snapshot, cx);
drop(barrier);
}
ScanState::Initialized { snapshot } => {
*self.is_scanning.0.borrow_mut() = false;
self.set_snapshot(snapshot, cx);
}
ScanState::Updating => {
*self.is_scanning.0.borrow_mut() = true;
}
ScanState::Updated {
snapshot,
changes,
barrier,
} => {
*self.is_scanning.0.borrow_mut() = false;
cx.emit(Event::UpdatedEntries(changes));
self.set_snapshot(snapshot, cx);
drop(barrier);
}
}
cx.notify();
}
fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) { fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
let updated_repos = Self::changed_repos( let updated_repos = Self::changed_repos(
&self.snapshot.git_repositories, &self.snapshot.git_repositories,
@ -1337,14 +1320,6 @@ impl Snapshot {
&self.root_name &self.root_name
} }
pub fn scan_started(&mut self) {
self.scan_id += 1;
}
pub fn scan_completed(&mut self) {
self.completed_scan_id = self.scan_id;
}
pub fn scan_id(&self) -> usize { pub fn scan_id(&self) -> usize {
self.scan_id self.scan_id
} }
@ -1539,17 +1514,20 @@ impl LocalSnapshot {
return; return;
}; };
match parent_entry.kind {
EntryKind::PendingDir => {
parent_entry.kind = EntryKind::Dir;
}
EntryKind::Dir => {}
_ => return,
}
if let Some(ignore) = ignore { if let Some(ignore) = ignore {
self.ignores_by_parent_abs_path.insert( self.ignores_by_parent_abs_path.insert(
self.abs_path.join(&parent_path).into(), self.abs_path.join(&parent_path).into(),
(ignore, self.scan_id), (ignore, self.scan_id),
); );
} }
if matches!(parent_entry.kind, EntryKind::PendingDir) {
parent_entry.kind = EntryKind::Dir;
} else {
unreachable!();
}
if parent_path.file_name() == Some(&DOT_GIT) { if parent_path.file_name() == Some(&DOT_GIT) {
let abs_path = self.abs_path.join(&parent_path); let abs_path = self.abs_path.join(&parent_path);
@ -2135,53 +2113,47 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
} }
struct BackgroundScanner { struct BackgroundScanner {
fs: Arc<dyn Fs>,
snapshot: Mutex<LocalSnapshot>, snapshot: Mutex<LocalSnapshot>,
notify: UnboundedSender<ScanState>, fs: Arc<dyn Fs>,
status_updates_tx: UnboundedSender<ScanState>,
executor: Arc<executor::Background>, executor: Arc<executor::Background>,
refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
prev_state: Mutex<(Snapshot, Vec<Arc<Path>>)>,
finished_initial_scan: bool,
} }
impl BackgroundScanner { impl BackgroundScanner {
fn new( fn new(
snapshot: LocalSnapshot, snapshot: LocalSnapshot,
notify: UnboundedSender<ScanState>,
fs: Arc<dyn Fs>, fs: Arc<dyn Fs>,
status_updates_tx: UnboundedSender<ScanState>,
executor: Arc<executor::Background>, executor: Arc<executor::Background>,
refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
) -> Self { ) -> Self {
Self { Self {
fs, fs,
snapshot: Mutex::new(snapshot), status_updates_tx,
notify,
executor, executor,
refresh_requests_rx,
prev_state: Mutex::new((snapshot.snapshot.clone(), Vec::new())),
snapshot: Mutex::new(snapshot),
finished_initial_scan: false,
} }
} }
fn abs_path(&self) -> Arc<Path> {
self.snapshot.lock().abs_path.clone()
}
async fn run( async fn run(
self, &mut self,
events_rx: impl Stream<Item = Vec<fsevent::Event>>, mut events_rx: Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>,
mut changed_paths: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
) { ) {
use futures::FutureExt as _; use futures::FutureExt as _;
// Retrieve the basic properties of the root node. let (root_abs_path, root_inode) = {
let root_char_bag; let snapshot = self.snapshot.lock();
let root_abs_path; (
let root_inode; snapshot.abs_path.clone(),
let root_is_dir; snapshot.root_entry().map(|e| e.inode),
let next_entry_id; )
{ };
let mut snapshot = self.snapshot.lock();
snapshot.scan_started();
root_char_bag = snapshot.root_char_bag;
root_abs_path = snapshot.abs_path.clone();
root_inode = snapshot.root_entry().map(|e| e.inode);
root_is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir());
next_entry_id = snapshot.next_entry_id.clone();
}
// Populate ignores above the root. // Populate ignores above the root.
let ignore_stack; let ignore_stack;
@ -2205,198 +2177,191 @@ impl BackgroundScanner {
} }
}; };
if root_is_dir { // Perform an initial scan of the directory.
let mut ancestor_inodes = TreeSet::default(); let (scan_job_tx, scan_job_rx) = channel::unbounded();
if let Some(root_inode) = root_inode { smol::block_on(scan_job_tx.send(ScanJob {
ancestor_inodes.insert(root_inode); abs_path: root_abs_path,
path: Arc::from(Path::new("")),
ignore_stack,
ancestor_inodes: TreeSet::from_ordered_entries(root_inode),
scan_queue: scan_job_tx.clone(),
}))
.unwrap();
drop(scan_job_tx);
self.scan_dirs(true, scan_job_rx).await;
// Process any any FS events that occurred while performing the initial scan.
// For these events, update events cannot be as precise, because we didn't
// have the previous state loaded yet.
if let Poll::Ready(Some(events)) = futures::poll!(events_rx.next()) {
let mut paths = events.into_iter().map(|e| e.path).collect::<Vec<_>>();
while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) {
paths.extend(more_events.into_iter().map(|e| e.path));
} }
self.process_events(paths).await;
let (tx, rx) = channel::unbounded();
self.executor
.block(tx.send(ScanJob {
abs_path: root_abs_path.to_path_buf(),
path: Arc::from(Path::new("")),
ignore_stack,
ancestor_inodes,
scan_queue: tx.clone(),
}))
.unwrap();
drop(tx);
let progress_update_count = AtomicUsize::new(0);
self.executor
.scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
let mut last_progress_update_count = 0;
let progress_update_timer = self.pause_between_progress_updates().fuse();
futures::pin_mut!(progress_update_timer);
loop {
select_biased! {
// Send periodic progress updates to the worktree. Use an atomic counter
// to ensure that only one of the workers sends a progress update after
// the update interval elapses.
_ = progress_update_timer => {
match progress_update_count.compare_exchange(
last_progress_update_count,
last_progress_update_count + 1,
SeqCst,
SeqCst
) {
Ok(_) => {
last_progress_update_count += 1;
if self
.notify
.unbounded_send(ScanState::Initializing {
snapshot: self.snapshot.lock().clone(),
barrier: None,
})
.is_err()
{
break;
}
}
Err(current_count) => last_progress_update_count = current_count,
}
progress_update_timer.set(self.pause_between_progress_updates().fuse());
}
// Refresh any paths requested by the main thread.
job = changed_paths.recv().fuse() => {
let Ok((abs_paths, barrier)) = job else { break };
self.update_entries_for_paths(abs_paths, None).await;
if self
.notify
.unbounded_send(ScanState::Initializing {
snapshot: self.snapshot.lock().clone(),
barrier: Some(barrier),
})
.is_err()
{
break;
}
}
// Recursively load directories from the file system.
job = rx.recv().fuse() => {
let Ok(job) = job else { break };
if let Err(err) = self
.scan_dir(root_char_bag, next_entry_id.clone(), &job)
.await
{
log::error!("error scanning {:?}: {}", job.abs_path, err);
}
}
}
}
});
}
})
.await;
} }
self.snapshot.lock().scan_completed(); self.finished_initial_scan = true;
// Continue processing events until the worktree is dropped.
loop {
select_biased! {
// Process any path refresh requests from the worktree. Prioritize
// these before handling changes reported by the filesystem.
request = self.refresh_requests_rx.recv().fuse() => {
let Ok((paths, barrier)) = request else { break };
self.reload_entries_for_paths(paths, None).await;
if !self.send_status_update(false, Some(barrier)) {
break;
}
}
events = events_rx.next().fuse() => {
let Some(events) = events else { break };
let mut paths = events.into_iter().map(|e| e.path).collect::<Vec<_>>();
while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) {
paths.extend(more_events.into_iter().map(|e| e.path));
}
self.process_events(paths).await;
}
}
}
}
async fn process_events(&mut self, paths: Vec<PathBuf>) {
let (scan_job_tx, scan_job_rx) = channel::unbounded();
if let Some(mut paths) = self
.reload_entries_for_paths(paths, Some(scan_job_tx.clone()))
.await
{
paths.sort_unstable();
util::extend_sorted(&mut self.prev_state.lock().1, paths, usize::MAX, Ord::cmp);
}
drop(scan_job_tx);
self.scan_dirs(false, scan_job_rx).await;
}
async fn scan_dirs(
&self,
enable_progress_updates: bool,
scan_jobs_rx: channel::Receiver<ScanJob>,
) {
use futures::FutureExt as _;
self.snapshot.lock().scan_id += 1;
if self if self
.notify .status_updates_tx
.unbounded_send(ScanState::Initialized { .unbounded_send(ScanState::Started)
snapshot: self.snapshot.lock().clone(),
})
.is_err() .is_err()
{ {
return; return;
} }
// Process any events that occurred while performing the initial scan. These let progress_update_count = AtomicUsize::new(0);
// events can't be reported as precisely, because there is no snapshot of the self.executor
// worktree before they occurred. .scoped(|scope| {
futures::pin_mut!(events_rx); for _ in 0..self.executor.num_cpus() {
if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) { scope.spawn(async {
while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) { let mut last_progress_update_count = 0;
events.extend(additional_events); let progress_update_timer = self.progress_timer(enable_progress_updates).fuse();
} futures::pin_mut!(progress_update_timer);
let abs_paths = events.into_iter().map(|e| e.path).collect();
if self.notify.unbounded_send(ScanState::Updating).is_err() {
return;
}
if let Some(changes) = self.process_events(abs_paths, true).await {
if self
.notify
.unbounded_send(ScanState::Updated {
snapshot: self.snapshot.lock().clone(),
changes,
barrier: None,
})
.is_err()
{
return;
}
} else {
return;
}
}
// Continue processing events until the worktree is dropped. loop {
loop { select_biased! {
let barrier; // Process any path refresh requests before moving on to process
let abs_paths; // the scan queue, so that user operations are prioritized.
select_biased! { request = self.refresh_requests_rx.recv().fuse() => {
request = changed_paths.next().fuse() => { let Ok((paths, barrier)) = request else { break };
let Some((paths, b)) = request else { break }; self.reload_entries_for_paths(paths, None).await;
abs_paths = paths; if !self.send_status_update(false, Some(barrier)) {
barrier = Some(b); return;
} }
events = events_rx.next().fuse() => { }
let Some(events) = events else { break };
abs_paths = events.into_iter().map(|e| e.path).collect();
barrier = None;
}
}
if self.notify.unbounded_send(ScanState::Updating).is_err() { // Send periodic progress updates to the worktree. Use an atomic counter
return; // to ensure that only one of the workers sends a progress update after
} // the update interval elapses.
if let Some(changes) = self.process_events(abs_paths, false).await { _ = progress_update_timer => {
if self match progress_update_count.compare_exchange(
.notify last_progress_update_count,
.unbounded_send(ScanState::Updated { last_progress_update_count + 1,
snapshot: self.snapshot.lock().clone(), SeqCst,
changes, SeqCst
barrier, ) {
Ok(_) => {
last_progress_update_count += 1;
self.send_status_update(true, None);
}
Err(count) => {
last_progress_update_count = count;
}
}
progress_update_timer.set(self.progress_timer(enable_progress_updates).fuse());
}
// Recursively load directories from the file system.
job = scan_jobs_rx.recv().fuse() => {
let Ok(job) = job else { break };
if let Err(err) = self.scan_dir(&job).await {
if job.path.as_ref() != Path::new("") {
log::error!("error scanning directory {:?}: {}", job.abs_path, err);
}
}
}
}
}
}) })
.is_err()
{
return;
} }
} else { })
return; .await;
}
} self.update_ignore_statuses().await;
let mut snapshot = self.snapshot.lock();
let mut git_repositories = mem::take(&mut snapshot.git_repositories);
git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some());
snapshot.git_repositories = git_repositories;
snapshot.removed_entry_ids.clear();
snapshot.completed_scan_id = snapshot.scan_id;
drop(snapshot);
self.send_status_update(false, None);
} }
async fn pause_between_progress_updates(&self) { fn send_status_update(&self, scanning: bool, barrier: Option<barrier::Sender>) -> bool {
#[cfg(any(test, feature = "test-support"))] let mut prev_state = self.prev_state.lock();
if self.fs.is_fake() { let snapshot = self.snapshot.lock().clone();
return self.executor.simulate_random_delay().await; let mut old_snapshot = snapshot.snapshot.clone();
} mem::swap(&mut old_snapshot, &mut prev_state.0);
smol::Timer::after(Duration::from_millis(100)).await; let changed_paths = mem::take(&mut prev_state.1);
let changes = self.build_change_set(&old_snapshot, &snapshot.snapshot, changed_paths);
self.status_updates_tx
.unbounded_send(ScanState::Updated {
snapshot,
changes,
scanning,
barrier,
})
.is_ok()
} }
async fn scan_dir( async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
&self,
root_char_bag: CharBag,
next_entry_id: Arc<AtomicUsize>,
job: &ScanJob,
) -> Result<()> {
let mut new_entries: Vec<Entry> = Vec::new(); let mut new_entries: Vec<Entry> = Vec::new();
let mut new_jobs: Vec<Option<ScanJob>> = Vec::new(); let mut new_jobs: Vec<Option<ScanJob>> = Vec::new();
let mut ignore_stack = job.ignore_stack.clone(); let mut ignore_stack = job.ignore_stack.clone();
let mut new_ignore = None; let mut new_ignore = None;
let (root_abs_path, root_char_bag, next_entry_id) = {
let snapshot = self.snapshot.lock();
(
snapshot.abs_path().clone(),
snapshot.root_char_bag,
snapshot.next_entry_id.clone(),
)
};
let mut child_paths = self.fs.read_dir(&job.abs_path).await?; let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
while let Some(child_abs_path) = child_paths.next().await { while let Some(child_abs_path) = child_paths.next().await {
let child_abs_path = match child_abs_path { let child_abs_path: Arc<Path> = match child_abs_path {
Ok(child_abs_path) => child_abs_path, Ok(child_abs_path) => child_abs_path.into(),
Err(error) => { Err(error) => {
log::error!("error processing entry {:?}", error); log::error!("error processing entry {:?}", error);
continue; continue;
@ -2419,8 +2384,7 @@ impl BackgroundScanner {
match build_gitignore(&child_abs_path, self.fs.as_ref()).await { match build_gitignore(&child_abs_path, self.fs.as_ref()).await {
Ok(ignore) => { Ok(ignore) => {
let ignore = Arc::new(ignore); let ignore = Arc::new(ignore);
ignore_stack = ignore_stack = ignore_stack.append(job.abs_path.clone(), ignore.clone());
ignore_stack.append(job.abs_path.as_path().into(), ignore.clone());
new_ignore = Some(ignore); new_ignore = Some(ignore);
} }
Err(error) => { Err(error) => {
@ -2438,7 +2402,7 @@ impl BackgroundScanner {
// new jobs as well. // new jobs as well.
let mut new_jobs = new_jobs.iter_mut(); let mut new_jobs = new_jobs.iter_mut();
for entry in &mut new_entries { for entry in &mut new_entries {
let entry_abs_path = self.abs_path().join(&entry.path); let entry_abs_path = root_abs_path.join(&entry.path);
entry.is_ignored = entry.is_ignored =
ignore_stack.is_abs_path_ignored(&entry_abs_path, entry.is_dir()); ignore_stack.is_abs_path_ignored(&entry_abs_path, entry.is_dir());
@ -2507,60 +2471,7 @@ impl BackgroundScanner {
Ok(()) Ok(())
} }
async fn process_events( async fn reload_entries_for_paths(
&self,
abs_paths: Vec<PathBuf>,
received_before_initialized: bool,
) -> Option<HashMap<Arc<Path>, PathChange>> {
let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
let prev_snapshot = {
let mut snapshot = self.snapshot.lock();
snapshot.scan_started();
snapshot.clone()
};
let event_paths = self
.update_entries_for_paths(abs_paths, Some(scan_queue_tx))
.await?;
// Scan any directories that were created as part of this event batch.
self.executor
.scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
while let Ok(job) = scan_queue_rx.recv().await {
if let Err(err) = self
.scan_dir(
prev_snapshot.root_char_bag,
prev_snapshot.next_entry_id.clone(),
&job,
)
.await
{
log::error!("error scanning {:?}: {}", job.abs_path, err);
}
}
});
}
})
.await;
// Attempt to detect renames only over a single batch of file-system events.
self.snapshot.lock().removed_entry_ids.clear();
self.update_ignore_statuses().await;
self.update_git_repositories();
let changes = self.build_change_set(
prev_snapshot.snapshot,
event_paths,
received_before_initialized,
);
self.snapshot.lock().scan_completed();
Some(changes)
}
async fn update_entries_for_paths(
&self, &self,
mut abs_paths: Vec<PathBuf>, mut abs_paths: Vec<PathBuf>,
scan_queue_tx: Option<Sender<ScanJob>>, scan_queue_tx: Option<Sender<ScanJob>>,
@ -2569,7 +2480,7 @@ impl BackgroundScanner {
abs_paths.dedup_by(|a, b| a.starts_with(&b)); abs_paths.dedup_by(|a, b| a.starts_with(&b));
let root_abs_path = self.snapshot.lock().abs_path.clone(); let root_abs_path = self.snapshot.lock().abs_path.clone();
let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.ok()?; let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.log_err()?;
let metadata = futures::future::join_all( let metadata = futures::future::join_all(
abs_paths abs_paths
.iter() .iter()
@ -2579,29 +2490,29 @@ impl BackgroundScanner {
.await; .await;
let mut snapshot = self.snapshot.lock(); let mut snapshot = self.snapshot.lock();
if scan_queue_tx.is_some() { let doing_recursive_update = scan_queue_tx.is_some();
for abs_path in &abs_paths {
if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) { // Remove any entries for paths that no longer exist or are being recursively
// refreshed. Do this before adding any new entries, so that renames can be
// detected regardless of the order of the paths.
let mut event_paths = Vec::<Arc<Path>>::with_capacity(abs_paths.len());
for (abs_path, metadata) in abs_paths.iter().zip(metadata.iter()) {
if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) {
if matches!(metadata, Ok(None)) || doing_recursive_update {
snapshot.remove_path(path); snapshot.remove_path(path);
} }
event_paths.push(path.into());
} else {
log::error!(
"unexpected event {:?} for root path {:?}",
abs_path,
root_canonical_path
);
} }
} }
let mut event_paths = Vec::with_capacity(abs_paths.len()); for (path, metadata) in event_paths.iter().cloned().zip(metadata.into_iter()) {
for (abs_path, metadata) in abs_paths.into_iter().zip(metadata.into_iter()) { let abs_path: Arc<Path> = root_abs_path.join(&path).into();
let path: Arc<Path> = match abs_path.strip_prefix(&root_canonical_path) {
Ok(path) => Arc::from(path.to_path_buf()),
Err(_) => {
log::error!(
"unexpected event {:?} for root path {:?}",
abs_path,
root_canonical_path
);
continue;
}
};
event_paths.push(path.clone());
let abs_path = root_abs_path.join(&path);
match metadata { match metadata {
Ok(Some(metadata)) => { Ok(Some(metadata)) => {
@ -2626,15 +2537,14 @@ impl BackgroundScanner {
let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path); let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path);
if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) { if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) {
ancestor_inodes.insert(metadata.inode); ancestor_inodes.insert(metadata.inode);
self.executor smol::block_on(scan_queue_tx.send(ScanJob {
.block(scan_queue_tx.send(ScanJob { abs_path,
abs_path, path,
path, ignore_stack,
ignore_stack, ancestor_inodes,
ancestor_inodes, scan_queue: scan_queue_tx.clone(),
scan_queue: scan_queue_tx.clone(), }))
})) .unwrap();
.unwrap();
} }
} }
} }
@ -2710,13 +2620,6 @@ impl BackgroundScanner {
.await; .await;
} }
fn update_git_repositories(&self) {
let mut snapshot = self.snapshot.lock();
let mut git_repositories = mem::take(&mut snapshot.git_repositories);
git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some());
snapshot.git_repositories = git_repositories;
}
async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) { async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) {
let mut ignore_stack = job.ignore_stack; let mut ignore_stack = job.ignore_stack;
if let Some((ignore, _)) = snapshot.ignores_by_parent_abs_path.get(&job.abs_path) { if let Some((ignore, _)) = snapshot.ignores_by_parent_abs_path.get(&job.abs_path) {
@ -2728,7 +2631,7 @@ impl BackgroundScanner {
let path = job.abs_path.strip_prefix(&snapshot.abs_path).unwrap(); let path = job.abs_path.strip_prefix(&snapshot.abs_path).unwrap();
for mut entry in snapshot.child_entries(path).cloned() { for mut entry in snapshot.child_entries(path).cloned() {
let was_ignored = entry.is_ignored; let was_ignored = entry.is_ignored;
let abs_path = self.abs_path().join(&entry.path); let abs_path = snapshot.abs_path().join(&entry.path);
entry.is_ignored = ignore_stack.is_abs_path_ignored(&abs_path, entry.is_dir()); entry.is_ignored = ignore_stack.is_abs_path_ignored(&abs_path, entry.is_dir());
if entry.is_dir() { if entry.is_dir() {
let child_ignore_stack = if entry.is_ignored { let child_ignore_stack = if entry.is_ignored {
@ -2762,16 +2665,16 @@ impl BackgroundScanner {
fn build_change_set( fn build_change_set(
&self, &self,
old_snapshot: Snapshot, old_snapshot: &Snapshot,
new_snapshot: &Snapshot,
event_paths: Vec<Arc<Path>>, event_paths: Vec<Arc<Path>>,
received_before_initialized: bool,
) -> HashMap<Arc<Path>, PathChange> { ) -> HashMap<Arc<Path>, PathChange> {
use PathChange::{Added, AddedOrUpdated, Removed, Updated}; use PathChange::{Added, AddedOrUpdated, Removed, Updated};
let new_snapshot = self.snapshot.lock();
let mut changes = HashMap::default(); let mut changes = HashMap::default();
let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>(); let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>();
let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>(); let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>();
let received_before_initialized = !self.finished_initial_scan;
for path in event_paths { for path in event_paths {
let path = PathKey(path); let path = PathKey(path);
@ -2799,9 +2702,9 @@ impl BackgroundScanner {
// If the worktree was not fully initialized when this event was generated, // If the worktree was not fully initialized when this event was generated,
// we can't know whether this entry was added during the scan or whether // we can't know whether this entry was added during the scan or whether
// it was merely updated. // it was merely updated.
changes.insert(old_entry.path.clone(), AddedOrUpdated); changes.insert(new_entry.path.clone(), AddedOrUpdated);
} else if old_entry.mtime != new_entry.mtime { } else if old_entry.mtime != new_entry.mtime {
changes.insert(old_entry.path.clone(), Updated); changes.insert(new_entry.path.clone(), Updated);
} }
old_paths.next(&()); old_paths.next(&());
new_paths.next(&()); new_paths.next(&());
@ -2826,6 +2729,19 @@ impl BackgroundScanner {
} }
changes changes
} }
async fn progress_timer(&self, running: bool) {
if !running {
return futures::future::pending().await;
}
#[cfg(any(test, feature = "test-support"))]
if self.fs.is_fake() {
return self.executor.simulate_random_delay().await;
}
smol::Timer::after(Duration::from_millis(100)).await;
}
} }
fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag { fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
@ -2839,7 +2755,7 @@ fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
} }
struct ScanJob { struct ScanJob {
abs_path: PathBuf, abs_path: Arc<Path>,
path: Arc<Path>, path: Arc<Path>,
ignore_stack: Arc<IgnoreStack>, ignore_stack: Arc<IgnoreStack>,
scan_queue: Sender<ScanJob>, scan_queue: Sender<ScanJob>,
@ -3524,7 +3440,7 @@ mod tests {
let fs = FakeFs::new(cx.background()); let fs = FakeFs::new(cx.background());
fs.insert_tree( fs.insert_tree(
"/a", "/root",
json!({ json!({
"b": {}, "b": {},
"c": {}, "c": {},
@ -3535,7 +3451,7 @@ mod tests {
let tree = Worktree::local( let tree = Worktree::local(
client, client,
"/a".as_ref(), "/root".as_ref(),
true, true,
fs, fs,
Default::default(), Default::default(),
@ -3555,6 +3471,7 @@ mod tests {
assert!(entry.is_dir()); assert!(entry.is_dir());
cx.foreground().run_until_parked(); cx.foreground().run_until_parked();
tree.read_with(cx, |tree, _| { tree.read_with(cx, |tree, _| {
assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir); assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir);
}); });

View file

@ -154,6 +154,12 @@ impl<K> TreeSet<K>
where where
K: Clone + Debug + Default + Ord, K: Clone + Debug + Default + Ord,
{ {
pub fn from_ordered_entries(entries: impl IntoIterator<Item = K>) -> Self {
Self(TreeMap::from_ordered_entries(
entries.into_iter().map(|key| (key, ())),
))
}
pub fn insert(&mut self, key: K) { pub fn insert(&mut self, key: K) {
self.0.insert(key, ()); self.0.insert(key, ());
} }