diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 16650b4eac..4351e1a548 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1459,7 +1459,7 @@ impl Project { }; cx.foreground().spawn(async move { - pump_loading_buffer_reciever(loading_watch) + wait_for_loading_buffer(loading_watch) .await .map_err(|error| anyhow!("{}", error)) }) @@ -5057,98 +5057,102 @@ impl Project { fn update_local_worktree_buffers_git_repos( &mut self, worktree_handle: ModelHandle, - repos: &HashMap, LocalRepositoryEntry>, + changed_repos: &UpdatedGitRepositoriesSet, cx: &mut ModelContext, ) { debug_assert!(worktree_handle.read(cx).is_local()); - // Setup the pending buffers + // Identify the loading buffers whose containing repository that has changed. let future_buffers = self .loading_buffers_by_path .iter() - .filter_map(|(path, receiver)| { - let path = &path.path; - let (work_directory, repo) = repos + .filter_map(|(project_path, receiver)| { + if project_path.worktree_id != worktree_handle.read(cx).id() { + return None; + } + let path = &project_path.path; + changed_repos .iter() - .find(|(work_directory, _)| path.starts_with(work_directory))?; - - let repo_relative_path = path.strip_prefix(work_directory).log_err()?; - + .find(|(work_dir, _)| path.starts_with(work_dir))?; let receiver = receiver.clone(); - let repo_ptr = repo.repo_ptr.clone(); - let repo_relative_path = repo_relative_path.to_owned(); + let path = path.clone(); Some(async move { - pump_loading_buffer_reciever(receiver) + wait_for_loading_buffer(receiver) .await .ok() - .map(|buffer| (buffer, repo_relative_path, repo_ptr)) + .map(|buffer| (buffer, path)) }) }) - .collect::>() - .filter_map(|result| async move { - let (buffer_handle, repo_relative_path, repo_ptr) = result?; + .collect::>(); - let lock = repo_ptr.lock(); - lock.load_index_text(&repo_relative_path) - .map(|diff_base| (diff_base, buffer_handle)) - }); + // Identify the current buffers whose containing repository has changed. + let current_buffers = self + .opened_buffers + .values() + .filter_map(|buffer| { + let buffer = buffer.upgrade(cx)?; + let file = File::from_dyn(buffer.read(cx).file())?; + if file.worktree != worktree_handle { + return None; + } + let path = file.path(); + changed_repos + .iter() + .find(|(work_dir, _)| path.starts_with(work_dir))?; + Some((buffer, path.clone())) + }) + .collect::>(); - let update_diff_base_fn = update_diff_base(self); - cx.spawn(|_, mut cx| async move { - let diff_base_tasks = cx + if future_buffers.len() + current_buffers.len() == 0 { + return; + } + + let remote_id = self.remote_id(); + let client = self.client.clone(); + cx.spawn_weak(move |_, mut cx| async move { + // Wait for all of the buffers to load. + let future_buffers = future_buffers.collect::>().await; + + // Reload the diff base for every buffer whose containing git repository has changed. + let snapshot = + worktree_handle.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot()); + let diff_bases_by_buffer = cx .background() - .spawn(future_buffers.collect::>()) + .spawn(async move { + future_buffers + .into_iter() + .filter_map(|e| e) + .chain(current_buffers) + .filter_map(|(buffer, path)| { + let (work_directory, repo) = + snapshot.repository_and_work_directory_for_path(&path)?; + let repo = snapshot.get_local_repo(&repo)?; + let relative_path = path.strip_prefix(&work_directory).ok()?; + let base_text = repo.repo_ptr.lock().load_index_text(&relative_path); + Some((buffer, base_text)) + }) + .collect::>() + }) .await; - for (diff_base, buffer) in diff_base_tasks.into_iter() { - update_diff_base_fn(Some(diff_base), buffer, &mut cx); + // Assign the new diff bases on all of the buffers. + for (buffer, diff_base) in diff_bases_by_buffer { + let buffer_id = buffer.update(&mut cx, |buffer, cx| { + buffer.set_diff_base(diff_base.clone(), cx); + buffer.remote_id() + }); + if let Some(project_id) = remote_id { + client + .send(proto::UpdateDiffBase { + project_id, + buffer_id, + diff_base, + }) + .log_err(); + } } }) .detach(); - - // And the current buffers - for (_, buffer) in &self.opened_buffers { - if let Some(buffer) = buffer.upgrade(cx) { - let file = match File::from_dyn(buffer.read(cx).file()) { - Some(file) => file, - None => continue, - }; - if file.worktree != worktree_handle { - continue; - } - - let path = file.path().clone(); - - let worktree = worktree_handle.read(cx); - - let (work_directory, repo) = match repos - .iter() - .find(|(work_directory, _)| path.starts_with(work_directory)) - { - Some(repo) => repo.clone(), - None => continue, - }; - - let relative_repo = match path.strip_prefix(work_directory).log_err() { - Some(relative_repo) => relative_repo.to_owned(), - None => continue, - }; - - drop(worktree); - - let update_diff_base_fn = update_diff_base(self); - let git_ptr = repo.repo_ptr.clone(); - let diff_base_task = cx - .background() - .spawn(async move { git_ptr.lock().load_index_text(&relative_repo) }); - - cx.spawn(|_, mut cx| async move { - let diff_base = diff_base_task.await; - update_diff_base_fn(diff_base, buffer, &mut cx); - }) - .detach(); - } - } } pub fn set_active_path(&mut self, entry: Option, cx: &mut ModelContext) { @@ -7070,7 +7074,7 @@ impl Item for Buffer { } } -async fn pump_loading_buffer_reciever( +async fn wait_for_loading_buffer( mut receiver: postage::watch::Receiver, Arc>>>, ) -> Result, Arc> { loop { @@ -7083,26 +7087,3 @@ async fn pump_loading_buffer_reciever( receiver.next().await; } } - -fn update_diff_base( - project: &Project, -) -> impl Fn(Option, ModelHandle, &mut AsyncAppContext) { - let remote_id = project.remote_id(); - let client = project.client().clone(); - move |diff_base, buffer, cx| { - let buffer_id = buffer.update(cx, |buffer, cx| { - buffer.set_diff_base(diff_base.clone(), cx); - buffer.remote_id() - }); - - if let Some(project_id) = remote_id { - client - .send(proto::UpdateDiffBase { - project_id, - buffer_id: buffer_id as u64, - diff_base, - }) - .log_err(); - } - } -} diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 285e3ee9b6..d20c5e2619 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -2524,29 +2524,21 @@ async fn test_rescan_and_remote_updates( // Create a remote copy of this worktree. let tree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap()); - let initial_snapshot = tree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let remote = cx.update(|cx| { - Worktree::remote( - 1, - 1, - proto::WorktreeMetadata { - id: initial_snapshot.id().to_proto(), - root_name: initial_snapshot.root_name().into(), - abs_path: initial_snapshot - .abs_path() - .as_os_str() - .to_string_lossy() - .into(), - visible: true, - }, - rpc.clone(), - cx, - ) - }); - remote.update(cx, |remote, _| { - let update = initial_snapshot.build_initial_update(1); - remote.as_remote_mut().unwrap().update_from_remote(update); + + let metadata = tree.read_with(cx, |tree, _| tree.as_local().unwrap().metadata_proto()); + + let updates = Arc::new(Mutex::new(Vec::new())); + tree.update(cx, |tree, cx| { + let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, { + let updates = updates.clone(); + move |update| { + updates.lock().push(update); + async { true } + } + }); }); + + let remote = cx.update(|cx| Worktree::remote(1, 1, metadata, rpc.clone(), cx)); deterministic.run_until_parked(); cx.read(|cx| { @@ -2612,14 +2604,11 @@ async fn test_rescan_and_remote_updates( // Update the remote worktree. Check that it becomes consistent with the // local worktree. - remote.update(cx, |remote, cx| { - let update = tree.read(cx).as_local().unwrap().snapshot().build_update( - &initial_snapshot, - 1, - 1, - true, - ); - remote.as_remote_mut().unwrap().update_from_remote(update); + deterministic.run_until_parked(); + remote.update(cx, |remote, _| { + for update in updates.lock().drain(..) { + remote.as_remote_mut().unwrap().update_from_remote(update); + } }); deterministic.run_until_parked(); remote.read_with(cx, |remote, _| { diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 1f02910404..ea0d88bef9 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -17,7 +17,7 @@ use futures::{ }, select_biased, task::Poll, - Stream, StreamExt, + FutureExt, Stream, StreamExt, }; use fuzzy::CharBag; use git::{DOT_GIT, GITIGNORE}; @@ -55,7 +55,7 @@ use std::{ time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet}; -use util::{paths::HOME, ResultExt, TakeUntilExt, TryFutureExt}; +use util::{paths::HOME, ResultExt, TakeUntilExt}; #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)] pub struct WorktreeId(usize); @@ -363,7 +363,7 @@ enum ScanState { Started, Updated { snapshot: LocalSnapshot, - changes: Arc<[(Arc, ProjectEntryId, PathChange)]>, + changes: UpdatedEntriesSet, barrier: Option, scanning: bool, }, @@ -371,14 +371,15 @@ enum ScanState { struct ShareState { project_id: u64, - snapshots_tx: watch::Sender, + snapshots_tx: + mpsc::UnboundedSender<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>, resume_updates: watch::Sender<()>, _maintain_remote_snapshot: Task>, } pub enum Event { - UpdatedEntries(Arc<[(Arc, ProjectEntryId, PathChange)]>), - UpdatedGitRepositories(HashMap, LocalRepositoryEntry>), + UpdatedEntries(UpdatedEntriesSet), + UpdatedGitRepositories(UpdatedGitRepositoriesSet), } impl Entity for Worktree { @@ -453,8 +454,7 @@ impl Worktree { scanning, } => { *this.is_scanning.0.borrow_mut() = scanning; - this.set_snapshot(snapshot, cx); - cx.emit(Event::UpdatedEntries(changes)); + this.set_snapshot(snapshot, changes, cx); drop(barrier); } } @@ -820,73 +820,109 @@ impl LocalWorktree { Ok(!old_summary.is_empty() || !new_summary.is_empty()) } - fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext) { - let updated_repos = - self.changed_repos(&self.git_repositories, &new_snapshot.git_repositories); + fn set_snapshot( + &mut self, + new_snapshot: LocalSnapshot, + entry_changes: UpdatedEntriesSet, + cx: &mut ModelContext, + ) { + let repo_changes = self.changed_repos(&self.snapshot, &new_snapshot); self.snapshot = new_snapshot; if let Some(share) = self.share.as_mut() { - *share.snapshots_tx.borrow_mut() = self.snapshot.clone(); + share + .snapshots_tx + .unbounded_send(( + self.snapshot.clone(), + entry_changes.clone(), + repo_changes.clone(), + )) + .ok(); } - if !updated_repos.is_empty() { - cx.emit(Event::UpdatedGitRepositories(updated_repos)); + if !entry_changes.is_empty() { + cx.emit(Event::UpdatedEntries(entry_changes)); + } + if !repo_changes.is_empty() { + cx.emit(Event::UpdatedGitRepositories(repo_changes)); } } fn changed_repos( &self, - old_repos: &TreeMap, - new_repos: &TreeMap, - ) -> HashMap, LocalRepositoryEntry> { - let mut diff = HashMap::default(); - let mut old_repos = old_repos.iter().peekable(); - let mut new_repos = new_repos.iter().peekable(); + old_snapshot: &LocalSnapshot, + new_snapshot: &LocalSnapshot, + ) -> UpdatedGitRepositoriesSet { + let mut changes = Vec::new(); + let mut old_repos = old_snapshot.git_repositories.iter().peekable(); + let mut new_repos = new_snapshot.git_repositories.iter().peekable(); loop { - match (old_repos.peek(), new_repos.peek()) { - (Some((old_entry_id, old_repo)), Some((new_entry_id, new_repo))) => { - match Ord::cmp(old_entry_id, new_entry_id) { + match (new_repos.peek().map(clone), old_repos.peek().map(clone)) { + (Some((new_entry_id, new_repo)), Some((old_entry_id, old_repo))) => { + match Ord::cmp(&new_entry_id, &old_entry_id) { Ordering::Less => { - if let Some(entry) = self.entry_for_id(**old_entry_id) { - diff.insert(entry.path.clone(), (*old_repo).clone()); + if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) { + changes.push((entry.path.clone(), None)); } - old_repos.next(); + new_repos.next(); } Ordering::Equal => { - if old_repo.git_dir_scan_id != new_repo.git_dir_scan_id { - if let Some(entry) = self.entry_for_id(**new_entry_id) { - diff.insert(entry.path.clone(), (*new_repo).clone()); + if new_repo.git_dir_scan_id != old_repo.git_dir_scan_id { + if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) { + changes.push(( + entry.path.clone(), + old_snapshot + .repository_entries + .get(&RepositoryWorkDirectory(entry.path.clone())) + .cloned(), + )); } } - - old_repos.next(); new_repos.next(); + old_repos.next(); } Ordering::Greater => { - if let Some(entry) = self.entry_for_id(**new_entry_id) { - diff.insert(entry.path.clone(), (*new_repo).clone()); + if let Some(entry) = old_snapshot.entry_for_id(old_entry_id) { + changes.push(( + entry.path.clone(), + old_snapshot + .repository_entries + .get(&RepositoryWorkDirectory(entry.path.clone())) + .cloned(), + )); } - new_repos.next(); + old_repos.next(); } } } - (Some((old_entry_id, old_repo)), None) => { - if let Some(entry) = self.entry_for_id(**old_entry_id) { - diff.insert(entry.path.clone(), (*old_repo).clone()); - } - old_repos.next(); - } - (None, Some((new_entry_id, new_repo))) => { - if let Some(entry) = self.entry_for_id(**new_entry_id) { - diff.insert(entry.path.clone(), (*new_repo).clone()); + (Some((entry_id, _)), None) => { + if let Some(entry) = new_snapshot.entry_for_id(entry_id) { + changes.push((entry.path.clone(), None)); } new_repos.next(); } + (None, Some((entry_id, _))) => { + if let Some(entry) = old_snapshot.entry_for_id(entry_id) { + changes.push(( + entry.path.clone(), + old_snapshot + .repository_entries + .get(&RepositoryWorkDirectory(entry.path.clone())) + .cloned(), + )); + } + old_repos.next(); + } (None, None) => break, } } - diff + + fn clone(value: &(&T, &U)) -> (T, U) { + (value.0.clone(), value.1.clone()) + } + + changes.into() } pub fn scan_complete(&self) -> impl Future { @@ -1227,89 +1263,97 @@ impl LocalWorktree { }) } - pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { + pub fn observe_updates( + &mut self, + project_id: u64, + cx: &mut ModelContext, + callback: F, + ) -> oneshot::Receiver<()> + where + F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut, + Fut: Send + Future, + { + #[cfg(any(test, feature = "test-support"))] + const MAX_CHUNK_SIZE: usize = 2; + #[cfg(not(any(test, feature = "test-support")))] + const MAX_CHUNK_SIZE: usize = 256; + let (share_tx, share_rx) = oneshot::channel(); if let Some(share) = self.share.as_mut() { - let _ = share_tx.send(()); + share_tx.send(()).ok(); *share.resume_updates.borrow_mut() = (); - } else { - let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot()); - let (resume_updates_tx, mut resume_updates_rx) = watch::channel(); - let worktree_id = cx.model_id() as u64; + return share_rx; + } - for (path, summaries) in &self.diagnostic_summaries { - for (&server_id, summary) in summaries { - if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary { - project_id, - worktree_id, - summary: Some(summary.to_proto(server_id, &path)), - }) { - return Task::ready(Err(e)); + let (resume_updates_tx, mut resume_updates_rx) = watch::channel::<()>(); + let (snapshots_tx, mut snapshots_rx) = + mpsc::unbounded::<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>(); + snapshots_tx + .unbounded_send((self.snapshot(), Arc::from([]), Arc::from([]))) + .ok(); + + let worktree_id = cx.model_id() as u64; + let _maintain_remote_snapshot = cx.background().spawn(async move { + let mut is_first = true; + while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await { + let update; + if is_first { + update = snapshot.build_initial_update(project_id, worktree_id); + is_first = false; + } else { + update = + snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes); + } + + for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) { + let _ = resume_updates_rx.try_recv(); + loop { + let result = callback(update.clone()); + if result.await { + break; + } else { + log::info!("waiting to resume updates"); + if resume_updates_rx.next().await.is_none() { + return Some(()); + } + } } } } + share_tx.send(()).ok(); + Some(()) + }); - let _maintain_remote_snapshot = cx.background().spawn({ - let client = self.client.clone(); - async move { - let mut share_tx = Some(share_tx); - let mut prev_snapshot = LocalSnapshot { - ignores_by_parent_abs_path: Default::default(), - git_repositories: Default::default(), - snapshot: Snapshot { - id: WorktreeId(worktree_id as usize), - abs_path: Path::new("").into(), - root_name: Default::default(), - root_char_bag: Default::default(), - entries_by_path: Default::default(), - entries_by_id: Default::default(), - repository_entries: Default::default(), - scan_id: 0, - completed_scan_id: 0, - }, - }; - while let Some(snapshot) = snapshots_rx.recv().await { - #[cfg(any(test, feature = "test-support"))] - const MAX_CHUNK_SIZE: usize = 2; - #[cfg(not(any(test, feature = "test-support")))] - const MAX_CHUNK_SIZE: usize = 256; + self.share = Some(ShareState { + project_id, + snapshots_tx, + resume_updates: resume_updates_tx, + _maintain_remote_snapshot, + }); + share_rx + } - let update = - snapshot.build_update(&prev_snapshot, project_id, worktree_id, true); - for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) { - let _ = resume_updates_rx.try_recv(); - while let Err(error) = client.request(update.clone()).await { - log::error!("failed to send worktree update: {}", error); - log::info!("waiting to resume updates"); - if resume_updates_rx.next().await.is_none() { - return Ok(()); - } - } - } + pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { + let client = self.client.clone(); - if let Some(share_tx) = share_tx.take() { - let _ = share_tx.send(()); - } - - prev_snapshot = snapshot; - } - - Ok::<_, anyhow::Error>(()) + for (path, summaries) in &self.diagnostic_summaries { + for (&server_id, summary) in summaries { + if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary { + project_id, + worktree_id: cx.model_id() as u64, + summary: Some(summary.to_proto(server_id, &path)), + }) { + return Task::ready(Err(e)); } - .log_err() - }); - - self.share = Some(ShareState { - project_id, - snapshots_tx, - resume_updates: resume_updates_tx, - _maintain_remote_snapshot, - }); + } } + let rx = self.observe_updates(project_id, cx, move |update| { + client.request(update).map(|result| result.is_ok()) + }); cx.foreground() - .spawn(async move { share_rx.await.map_err(|_| anyhow!("share ended")) }) + .spawn(async move { rx.await.map_err(|_| anyhow!("share ended")) }) } pub fn unshare(&mut self) { @@ -1518,10 +1562,12 @@ impl Snapshot { pub(crate) fn apply_remote_update(&mut self, mut update: proto::UpdateWorktree) -> Result<()> { let mut entries_by_path_edits = Vec::new(); let mut entries_by_id_edits = Vec::new(); + for entry_id in update.removed_entries { - if let Some(entry) = self.entry_for_id(ProjectEntryId::from_proto(entry_id)) { + let entry_id = ProjectEntryId::from_proto(entry_id); + entries_by_id_edits.push(Edit::Remove(entry_id)); + if let Some(entry) = self.entry_for_id(entry_id) { entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone()))); - entries_by_id_edits.push(Edit::Remove(entry.id)); } } @@ -1530,6 +1576,11 @@ impl Snapshot { if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) { entries_by_path_edits.push(Edit::Remove(PathKey(path.clone()))); } + if let Some(old_entry) = self.entries_by_path.get(&PathKey(entry.path.clone()), &()) { + if old_entry.id != entry.id { + entries_by_id_edits.push(Edit::Remove(old_entry.id)); + } + } entries_by_id_edits.push(Edit::Insert(PathEntry { id: entry.id, path: entry.path.clone(), @@ -1672,20 +1723,19 @@ impl Snapshot { /// Get the repository whose work directory contains the given path. pub fn repository_for_path(&self, path: &Path) -> Option { - let mut max_len = 0; - let mut current_candidate = None; - for (work_directory, repo) in (&self.repository_entries).iter() { - if path.starts_with(&work_directory.0) { - if work_directory.0.as_os_str().len() >= max_len { - current_candidate = Some(repo); - max_len = work_directory.0.as_os_str().len(); - } else { - break; - } - } - } + self.repository_and_work_directory_for_path(path) + .map(|e| e.1) + } - current_candidate.cloned() + pub fn repository_and_work_directory_for_path( + &self, + path: &Path, + ) -> Option<(RepositoryWorkDirectory, RepositoryEntry)> { + self.repository_entries + .iter() + .filter(|(workdir_path, _)| path.starts_with(workdir_path)) + .last() + .map(|(path, repo)| (path.clone(), repo.clone())) } /// Given an ordered iterator of entries, returns an iterator of those entries, @@ -1821,116 +1871,50 @@ impl LocalSnapshot { .find(|(_, repo)| repo.in_dot_git(path)) } - #[cfg(test)] - pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree { - let root_name = self.root_name.clone(); - proto::UpdateWorktree { - project_id, - worktree_id: self.id().to_proto(), - abs_path: self.abs_path().to_string_lossy().into(), - root_name, - updated_entries: self.entries_by_path.iter().map(Into::into).collect(), - removed_entries: Default::default(), - scan_id: self.scan_id as u64, - is_last_update: true, - updated_repositories: self.repository_entries.values().map(Into::into).collect(), - removed_repositories: Default::default(), - } - } - - pub(crate) fn build_update( + fn build_update( &self, - other: &Self, project_id: u64, worktree_id: u64, - include_ignored: bool, + entry_changes: UpdatedEntriesSet, + repo_changes: UpdatedGitRepositoriesSet, ) -> proto::UpdateWorktree { let mut updated_entries = Vec::new(); let mut removed_entries = Vec::new(); - let mut self_entries = self - .entries_by_id - .cursor::<()>() - .filter(|e| include_ignored || !e.is_ignored) - .peekable(); - let mut other_entries = other - .entries_by_id - .cursor::<()>() - .filter(|e| include_ignored || !e.is_ignored) - .peekable(); - loop { - match (self_entries.peek(), other_entries.peek()) { - (Some(self_entry), Some(other_entry)) => { - match Ord::cmp(&self_entry.id, &other_entry.id) { - Ordering::Less => { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - self_entries.next(); - } - Ordering::Equal => { - if self_entry.scan_id != other_entry.scan_id { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - } - - self_entries.next(); - other_entries.next(); - } - Ordering::Greater => { - removed_entries.push(other_entry.id.to_proto()); - other_entries.next(); - } - } - } - (Some(self_entry), None) => { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - self_entries.next(); - } - (None, Some(other_entry)) => { - removed_entries.push(other_entry.id.to_proto()); - other_entries.next(); - } - (None, None) => break, - } - } - - let mut updated_repositories: Vec = Vec::new(); + let mut updated_repositories = Vec::new(); let mut removed_repositories = Vec::new(); - let mut self_repos = self.snapshot.repository_entries.iter().peekable(); - let mut other_repos = other.snapshot.repository_entries.iter().peekable(); - loop { - match (self_repos.peek(), other_repos.peek()) { - (Some((self_work_dir, self_repo)), Some((other_work_dir, other_repo))) => { - match Ord::cmp(self_work_dir, other_work_dir) { - Ordering::Less => { - updated_repositories.push((*self_repo).into()); - self_repos.next(); - } - Ordering::Equal => { - if self_repo != other_repo { - updated_repositories.push(self_repo.build_update(other_repo)); - } - self_repos.next(); - other_repos.next(); - } - Ordering::Greater => { - removed_repositories.push(other_repo.work_directory.to_proto()); - other_repos.next(); - } - } - } - (Some((_, self_repo)), None) => { - updated_repositories.push((*self_repo).into()); - self_repos.next(); - } - (None, Some((_, other_repo))) => { - removed_repositories.push(other_repo.work_directory.to_proto()); - other_repos.next(); - } - (None, None) => break, + for (_, entry_id, path_change) in entry_changes.iter() { + if let PathChange::Removed = path_change { + removed_entries.push(entry_id.0 as u64); + } else if let Some(entry) = self.entry_for_id(*entry_id) { + updated_entries.push(proto::Entry::from(entry)); } } + for (work_dir_path, old_repo) in repo_changes.iter() { + let new_repo = self + .repository_entries + .get(&RepositoryWorkDirectory(work_dir_path.clone())); + match (old_repo, new_repo) { + (Some(old_repo), Some(new_repo)) => { + updated_repositories.push(new_repo.build_update(old_repo)); + } + (None, Some(new_repo)) => { + updated_repositories.push(proto::RepositoryEntry::from(new_repo)); + } + (Some(old_repo), None) => { + removed_repositories.push(old_repo.work_directory.0.to_proto()); + } + _ => {} + } + } + + removed_entries.sort_unstable(); + updated_entries.sort_unstable_by_key(|e| e.id); + removed_repositories.sort_unstable(); + updated_repositories.sort_unstable_by_key(|e| e.work_directory_id); + + // TODO - optimize, knowing that removed_entries are sorted. + removed_entries.retain(|id| updated_entries.binary_search_by_key(id, |e| e.id).is_err()); proto::UpdateWorktree { project_id, @@ -1946,6 +1930,35 @@ impl LocalSnapshot { } } + fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree { + let mut updated_entries = self + .entries_by_path + .iter() + .map(proto::Entry::from) + .collect::>(); + updated_entries.sort_unstable_by_key(|e| e.id); + + let mut updated_repositories = self + .repository_entries + .values() + .map(proto::RepositoryEntry::from) + .collect::>(); + updated_repositories.sort_unstable_by_key(|e| e.work_directory_id); + + proto::UpdateWorktree { + project_id, + worktree_id, + abs_path: self.abs_path().to_string_lossy().into(), + root_name: self.root_name().to_string(), + updated_entries, + removed_entries: Vec::new(), + scan_id: self.scan_id as u64, + is_last_update: self.completed_scan_id == self.scan_id, + updated_repositories, + removed_repositories: Vec::new(), + } + } + fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry { if entry.is_file() && entry.path.file_name() == Some(&GITIGNORE) { let abs_path = self.abs_path.join(&entry.path); @@ -2481,6 +2494,9 @@ pub enum PathChange { Loaded, } +pub type UpdatedEntriesSet = Arc<[(Arc, ProjectEntryId, PathChange)]>; +pub type UpdatedGitRepositoriesSet = Arc<[(Arc, Option)]>; + impl Entry { fn new( path: Arc, @@ -2896,11 +2912,13 @@ impl BackgroundScanner { fn send_status_update(&self, scanning: bool, barrier: Option) -> bool { let mut state = self.state.lock(); + if state.changed_paths.is_empty() && scanning { + return true; + } + let new_snapshot = state.snapshot.clone(); let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); - - let changes = - self.build_change_set(&old_snapshot, &new_snapshot.snapshot, &state.changed_paths); + let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths); state.changed_paths.clear(); self.status_updates_tx @@ -3386,9 +3404,20 @@ impl BackgroundScanner { } } - let snapshot = &mut self.state.lock().snapshot; - snapshot.entries_by_path.edit(entries_by_path_edits, &()); - snapshot.entries_by_id.edit(entries_by_id_edits, &()); + let state = &mut self.state.lock(); + for edit in &entries_by_path_edits { + if let Edit::Insert(entry) = edit { + if let Err(ix) = state.changed_paths.binary_search(&entry.path) { + state.changed_paths.insert(ix, entry.path.clone()); + } + } + } + + state + .snapshot + .entries_by_path + .edit(entries_by_path_edits, &()); + state.snapshot.entries_by_id.edit(entries_by_id_edits, &()); } fn build_change_set( @@ -3396,16 +3425,17 @@ impl BackgroundScanner { old_snapshot: &Snapshot, new_snapshot: &Snapshot, event_paths: &[Arc], - ) -> Arc<[(Arc, ProjectEntryId, PathChange)]> { + ) -> UpdatedEntriesSet { use BackgroundScannerPhase::*; use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; + // Identify which paths have changed. Use the known set of changed + // parent paths to optimize the search. + let mut changes = Vec::new(); let mut old_paths = old_snapshot.entries_by_path.cursor::(); let mut new_paths = new_snapshot.entries_by_path.cursor::(); old_paths.next(&()); new_paths.next(&()); - - let mut changes = Vec::new(); for path in event_paths { let path = PathKey(path.clone()); if old_paths.item().map_or(false, |e| e.path < path.0) { @@ -3441,7 +3471,10 @@ impl BackgroundScanner { new_entry.id, AddedOrUpdated, )); - } else if old_entry.mtime != new_entry.mtime { + } else if old_entry.id != new_entry.id { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + changes.push((new_entry.path.clone(), new_entry.id, Added)); + } else if old_entry != new_entry { changes.push((new_entry.path.clone(), new_entry.id, Updated)); } old_paths.next(&()); @@ -3543,8 +3576,6 @@ impl WorktreeHandle for ModelHandle { &self, cx: &'a gpui::TestAppContext, ) -> futures::future::LocalBoxFuture<'a, ()> { - use smol::future::FutureExt; - let filename = "fs-event-sentinel"; let tree = self.clone(); let (fs, root_path) = self.read_with(cx, |tree, _| { @@ -4207,7 +4238,18 @@ mod tests { .await .unwrap(); - let mut snapshot1 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let snapshot1 = tree.update(cx, |tree, cx| { + let tree = tree.as_local_mut().unwrap(); + let snapshot = Arc::new(Mutex::new(tree.snapshot())); + let _ = tree.observe_updates(0, cx, { + let snapshot = snapshot.clone(); + move |update| { + snapshot.lock().apply_remote_update(update).unwrap(); + async { true } + } + }); + snapshot + }); let entry = tree .update(cx, |tree, cx| { @@ -4225,9 +4267,10 @@ mod tests { }); let snapshot2 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let update = snapshot2.build_update(&snapshot1, 0, 0, true); - snapshot1.apply_remote_update(update).unwrap(); - assert_eq!(snapshot1.to_vec(true), snapshot2.to_vec(true),); + assert_eq!( + snapshot1.lock().entries(true).collect::>(), + snapshot2.entries(true).collect::>() + ); } #[gpui::test(iterations = 100)] @@ -4262,7 +4305,20 @@ mod tests { .await .unwrap(); - let mut snapshot = worktree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let mut snapshots = + vec![worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot())]; + let updates = Arc::new(Mutex::new(Vec::new())); + worktree.update(cx, |tree, cx| { + check_worktree_change_events(tree, cx); + + let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, { + let updates = updates.clone(); + move |update| { + updates.lock().push(update); + async { true } + } + }); + }); for _ in 0..operations { worktree @@ -4276,35 +4332,39 @@ mod tests { }); if rng.gen_bool(0.6) { - let new_snapshot = - worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let update = new_snapshot.build_update(&snapshot, 0, 0, true); - snapshot.apply_remote_update(update.clone()).unwrap(); - assert_eq!( - snapshot.to_vec(true), - new_snapshot.to_vec(true), - "incorrect snapshot after update {:?}", - update - ); + snapshots + .push(worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot())); } } worktree .update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete()) .await; - worktree.read_with(cx, |tree, _| { - tree.as_local().unwrap().snapshot.check_invariants() + + cx.foreground().run_until_parked(); + + let final_snapshot = worktree.read_with(cx, |tree, _| { + let tree = tree.as_local().unwrap(); + tree.snapshot.check_invariants(); + tree.snapshot() }); - let new_snapshot = worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let update = new_snapshot.build_update(&snapshot, 0, 0, true); - snapshot.apply_remote_update(update.clone()).unwrap(); - assert_eq!( - snapshot.to_vec(true), - new_snapshot.to_vec(true), - "incorrect snapshot after update {:?}", - update - ); + for (i, snapshot) in snapshots.into_iter().enumerate().rev() { + let mut updated_snapshot = snapshot.clone(); + for update in updates.lock().iter() { + if update.scan_id >= updated_snapshot.scan_id() as u64 { + updated_snapshot + .apply_remote_update(update.clone()) + .unwrap(); + } + } + + assert_eq!( + updated_snapshot.entries(true).collect::>(), + final_snapshot.entries(true).collect::>(), + "wrong updates after snapshot {i}: {snapshot:#?} {updates:#?}", + ); + } } #[gpui::test(iterations = 100)] @@ -4336,55 +4396,17 @@ mod tests { .await .unwrap(); - // The worktree's `UpdatedEntries` event can be used to follow along with - // all changes to the worktree's snapshot. + let updates = Arc::new(Mutex::new(Vec::new())); worktree.update(cx, |tree, cx| { - let mut paths = tree - .entries(true) - .map(|e| (e.path.clone(), e.mtime)) - .collect::>(); + check_worktree_change_events(tree, cx); - cx.subscribe(&worktree, move |tree, _, event, _| { - if let Event::UpdatedEntries(changes) = event { - for (path, _, change_type) in changes.iter() { - let mtime = tree.entry_for_path(&path).map(|e| e.mtime); - let path = path.clone(); - let ix = match paths.binary_search_by_key(&&path, |e| &e.0) { - Ok(ix) | Err(ix) => ix, - }; - match change_type { - PathChange::Loaded => { - paths.insert(ix, (path, mtime.unwrap())); - } - PathChange::Added => { - paths.insert(ix, (path, mtime.unwrap())); - } - PathChange::Removed => { - paths.remove(ix); - } - PathChange::Updated => { - let entry = paths.get_mut(ix).unwrap(); - assert_eq!(entry.0, path); - entry.1 = mtime.unwrap(); - } - PathChange::AddedOrUpdated => { - if paths.get(ix).map(|e| &e.0) == Some(&path) { - paths.get_mut(ix).unwrap().1 = mtime.unwrap(); - } else { - paths.insert(ix, (path, mtime.unwrap())); - } - } - } - } - - let new_paths = tree - .entries(true) - .map(|e| (e.path.clone(), e.mtime)) - .collect::>(); - assert_eq!(paths, new_paths, "incorrect changes: {:?}", changes); + let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, { + let updates = updates.clone(); + move |update| { + updates.lock().push(update); + async { true } } - }) - .detach(); + }); }); worktree @@ -4447,38 +4469,64 @@ mod tests { .await; let new_snapshot = new_worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - assert_eq!(snapshot.to_vec(true), new_snapshot.to_vec(true)); - } - - for (i, mut prev_snapshot) in snapshots.into_iter().enumerate() { - let include_ignored = rng.gen::(); - if !include_ignored { - let mut entries_by_path_edits = Vec::new(); - let mut entries_by_id_edits = Vec::new(); - for entry in prev_snapshot - .entries_by_id - .cursor::<()>() - .filter(|e| e.is_ignored) - { - entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone()))); - entries_by_id_edits.push(Edit::Remove(entry.id)); - } - - prev_snapshot - .entries_by_path - .edit(entries_by_path_edits, &()); - prev_snapshot.entries_by_id.edit(entries_by_id_edits, &()); - } - - let update = snapshot.build_update(&prev_snapshot, 0, 0, include_ignored); - prev_snapshot.apply_remote_update(update.clone()).unwrap(); assert_eq!( - prev_snapshot.to_vec(include_ignored), - snapshot.to_vec(include_ignored), - "wrong update for snapshot {i}. update: {:?}", - update + snapshot.entries_without_ids(true), + new_snapshot.entries_without_ids(true) ); } + + for (i, mut prev_snapshot) in snapshots.into_iter().enumerate().rev() { + for update in updates.lock().iter() { + if update.scan_id >= prev_snapshot.scan_id() as u64 { + prev_snapshot.apply_remote_update(update.clone()).unwrap(); + } + } + + assert_eq!( + prev_snapshot.entries(true).collect::>(), + snapshot.entries(true).collect::>(), + "wrong updates after snapshot {i}: {updates:#?}", + ); + } + } + + // The worktree's `UpdatedEntries` event can be used to follow along with + // all changes to the worktree's snapshot. + fn check_worktree_change_events(tree: &mut Worktree, cx: &mut ModelContext) { + let mut entries = tree.entries(true).cloned().collect::>(); + cx.subscribe(&cx.handle(), move |tree, _, event, _| { + if let Event::UpdatedEntries(changes) = event { + for (path, _, change_type) in changes.iter() { + let entry = tree.entry_for_path(&path).cloned(); + let ix = match entries.binary_search_by_key(&path, |e| &e.path) { + Ok(ix) | Err(ix) => ix, + }; + match change_type { + PathChange::Loaded => entries.insert(ix, entry.unwrap()), + PathChange::Added => entries.insert(ix, entry.unwrap()), + PathChange::Removed => drop(entries.remove(ix)), + PathChange::Updated => { + let entry = entry.unwrap(); + let existing_entry = entries.get_mut(ix).unwrap(); + assert_eq!(existing_entry.path, entry.path); + *existing_entry = entry; + } + PathChange::AddedOrUpdated => { + let entry = entry.unwrap(); + if entries.get(ix).map(|e| &e.path) == Some(&entry.path) { + *entries.get_mut(ix).unwrap() = entry; + } else { + entries.insert(ix, entry); + } + } + } + } + + let new_entries = tree.entries(true).cloned().collect::>(); + assert_eq!(entries, new_entries, "incorrect changes: {:?}", changes); + } + }) + .detach(); } fn randomly_mutate_worktree( @@ -4772,7 +4820,7 @@ mod tests { } } - fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> { + fn entries_without_ids(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> { let mut paths = Vec::new(); for entry in self.entries_by_path.cursor::<()>() { if include_ignored || !entry.is_ignored { @@ -4964,8 +5012,8 @@ mod tests { assert_eq!( repo_update_events.lock()[0] - .keys() - .cloned() + .iter() + .map(|e| e.0.clone()) .collect::>>(), vec![Path::new("dir1").into()] );