Maintain on the background scanner a set of expanded directories

This commit is contained in:
Max Brunsfeld 2023-06-13 16:01:53 -07:00
parent a305d93567
commit 4c03231863
7 changed files with 250 additions and 48 deletions

View file

@ -5,7 +5,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
use anyhow::{anyhow, Context, Result};
use client::{proto, Client};
use clock::ReplicaId;
use collections::{HashMap, VecDeque};
use collections::{HashMap, HashSet, VecDeque};
use fs::{
repository::{GitFileStatus, GitRepository, RepoPath},
Fs, LineEnding,
@ -67,7 +67,7 @@ pub enum Worktree {
pub struct LocalWorktree {
snapshot: LocalSnapshot,
path_changes_tx: channel::Sender<(Vec<PathBuf>, barrier::Sender)>,
scan_requests_tx: channel::Sender<ScanRequest>,
is_scanning: (watch::Sender<bool>, watch::Receiver<bool>),
_background_scanner_task: Task<()>,
share: Option<ShareState>,
@ -84,6 +84,18 @@ pub struct LocalWorktree {
visible: bool,
}
enum ScanRequest {
RescanPaths {
paths: Vec<PathBuf>,
done: barrier::Sender,
},
SetDirExpanded {
entry_id: ProjectEntryId,
replica_id: ReplicaId,
is_expanded: bool,
},
}
pub struct RemoteWorktree {
snapshot: Snapshot,
background_snapshot: Arc<Mutex<Snapshot>>,
@ -214,6 +226,7 @@ pub struct LocalSnapshot {
struct BackgroundScannerState {
snapshot: LocalSnapshot,
expanded_dirs: HashSet<(ProjectEntryId, ReplicaId)>,
/// The ids of all of the entries that were removed from the snapshot
/// as part of the current update. These entry ids may be re-used
/// if the same inode is discovered at a new path, or if the given
@ -330,7 +343,7 @@ impl Worktree {
);
}
let (path_changes_tx, path_changes_rx) = channel::unbounded();
let (scan_requests_tx, scan_requests_rx) = channel::unbounded();
let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
cx.spawn_weak(|this, mut cx| async move {
@ -370,7 +383,7 @@ impl Worktree {
fs,
scan_states_tx,
background,
path_changes_rx,
scan_requests_rx,
)
.run(events)
.await;
@ -381,7 +394,7 @@ impl Worktree {
snapshot,
is_scanning: watch::channel_with(true),
share: None,
path_changes_tx,
scan_requests_tx,
_background_scanner_task: background_scanner_task,
diagnostics: Default::default(),
diagnostic_summaries: Default::default(),
@ -1068,8 +1081,11 @@ impl LocalWorktree {
this.update(&mut cx, |this, _| {
this.as_local_mut()
.unwrap()
.path_changes_tx
.try_send((vec![abs_path], tx))
.scan_requests_tx
.try_send(ScanRequest::RescanPaths {
paths: vec![abs_path],
done: tx,
})
})?;
rx.recv().await;
Ok(())
@ -1135,6 +1151,22 @@ impl LocalWorktree {
}))
}
pub fn mark_entry_expanded(
&mut self,
entry_id: ProjectEntryId,
is_expanded: bool,
replica_id: ReplicaId,
_cx: &mut ModelContext<Worktree>,
) {
self.scan_requests_tx
.try_send(ScanRequest::SetDirExpanded {
entry_id,
replica_id,
is_expanded,
})
.ok();
}
fn refresh_entry(
&self,
path: Arc<Path>,
@ -1143,7 +1175,7 @@ impl LocalWorktree {
) -> Task<Result<Entry>> {
let fs = self.fs.clone();
let abs_root_path = self.abs_path.clone();
let path_changes_tx = self.path_changes_tx.clone();
let path_changes_tx = self.scan_requests_tx.clone();
cx.spawn_weak(move |this, mut cx| async move {
let abs_path = fs.canonicalize(&abs_root_path).await?;
let mut paths = Vec::with_capacity(2);
@ -1161,7 +1193,7 @@ impl LocalWorktree {
}
let (tx, mut rx) = barrier::channel();
path_changes_tx.try_send((paths, tx))?;
path_changes_tx.try_send(ScanRequest::RescanPaths { paths, done: tx })?;
rx.recv().await;
this.upgrade(&cx)
.ok_or_else(|| anyhow!("worktree was dropped"))?
@ -2784,7 +2816,7 @@ struct BackgroundScanner {
fs: Arc<dyn Fs>,
status_updates_tx: UnboundedSender<ScanState>,
executor: Arc<executor::Background>,
refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
scan_requests_rx: channel::Receiver<ScanRequest>,
next_entry_id: Arc<AtomicUsize>,
phase: BackgroundScannerPhase,
}
@ -2803,17 +2835,18 @@ impl BackgroundScanner {
fs: Arc<dyn Fs>,
status_updates_tx: UnboundedSender<ScanState>,
executor: Arc<executor::Background>,
refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
scan_requests_rx: channel::Receiver<ScanRequest>,
) -> Self {
Self {
fs,
status_updates_tx,
executor,
refresh_requests_rx,
scan_requests_rx,
next_entry_id,
state: Mutex::new(BackgroundScannerState {
prev_snapshot: snapshot.snapshot.clone(),
snapshot,
expanded_dirs: Default::default(),
removed_entry_ids: Default::default(),
changed_paths: Default::default(),
}),
@ -2898,9 +2931,9 @@ impl BackgroundScanner {
select_biased! {
// Process any path refresh requests from the worktree. Prioritize
// these before handling changes reported by the filesystem.
request = self.refresh_requests_rx.recv().fuse() => {
let Ok((paths, barrier)) = request else { break };
if !self.process_refresh_request(paths.clone(), barrier).await {
request = self.scan_requests_rx.recv().fuse() => {
let Ok(request) = request else { break };
if !self.process_scan_request(request).await {
return;
}
}
@ -2917,9 +2950,29 @@ impl BackgroundScanner {
}
}
async fn process_refresh_request(&self, paths: Vec<PathBuf>, barrier: barrier::Sender) -> bool {
self.reload_entries_for_paths(paths, None).await;
self.send_status_update(false, Some(barrier))
async fn process_scan_request(&self, request: ScanRequest) -> bool {
match request {
ScanRequest::RescanPaths { paths, done } => {
self.reload_entries_for_paths(paths, None).await;
self.send_status_update(false, Some(done))
}
ScanRequest::SetDirExpanded {
entry_id,
replica_id,
is_expanded,
} => {
let mut state = self.state.lock();
if is_expanded {
state.expanded_dirs.insert((entry_id, replica_id));
} else {
state.expanded_dirs.remove(&(entry_id, replica_id));
}
// todo
true
}
}
}
async fn process_events(&mut self, paths: Vec<PathBuf>) {
@ -2995,9 +3048,9 @@ impl BackgroundScanner {
select_biased! {
// Process any path refresh requests before moving on to process
// the scan queue, so that user operations are prioritized.
request = self.refresh_requests_rx.recv().fuse() => {
let Ok((paths, barrier)) = request else { break };
if !self.process_refresh_request(paths, barrier).await {
request = self.scan_requests_rx.recv().fuse() => {
let Ok(request) = request else { break };
if !self.process_scan_request(request).await {
return;
}
}
@ -3487,9 +3540,9 @@ impl BackgroundScanner {
select_biased! {
// Process any path refresh requests before moving on to process
// the queue of ignore statuses.
request = self.refresh_requests_rx.recv().fuse() => {
let Ok((paths, barrier)) = request else { break };
if !self.process_refresh_request(paths, barrier).await {
request = self.scan_requests_rx.recv().fuse() => {
let Ok(request) = request else { break };
if !self.process_scan_request(request).await {
return;
}
}