From 294a1b63c0956843688d4d16cda1e1c2d52f06a9 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 10 Apr 2025 15:58:41 -0700 Subject: [PATCH] Fix diff recalculation hang (#28377) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://github.com/zed-industries/zed/issues/26039 Release Notes: - Fixed an issue where diffs stopped updating closing and reopening them after staging hunks. - Fixed a bug where staging a hunk while the cursor was in a deleted line would move the cursor erroneously. --------- Co-authored-by: Cole Miller Co-authored-by: João Marcos --- crates/assistant_tool/src/action_log.rs | 2 +- crates/buffer_diff/src/buffer_diff.rs | 172 ++++--- crates/git_ui/src/commit_view.rs | 2 +- crates/git_ui/src/project_diff.rs | 6 +- crates/project/src/git_store.rs | 617 ++++++++++++++---------- crates/project/src/project_tests.rs | 186 +++---- 6 files changed, 544 insertions(+), 441 deletions(-) diff --git a/crates/assistant_tool/src/action_log.rs b/crates/assistant_tool/src/action_log.rs index f839570f94..1c0911c189 100644 --- a/crates/assistant_tool/src/action_log.rs +++ b/crates/assistant_tool/src/action_log.rs @@ -235,7 +235,7 @@ impl ActionLog { .await; diff.update(cx, |diff, cx| { - diff.set_snapshot(diff_snapshot, &buffer_snapshot, None, cx) + diff.set_snapshot(diff_snapshot, &buffer_snapshot, cx) })?; } this.update(cx, |this, cx| { diff --git a/crates/buffer_diff/src/buffer_diff.rs b/crates/buffer_diff/src/buffer_diff.rs index 308502b7ce..ad01af6f56 100644 --- a/crates/buffer_diff/src/buffer_diff.rs +++ b/crates/buffer_diff/src/buffer_diff.rs @@ -1,13 +1,21 @@ use futures::channel::oneshot; use git2::{DiffLineType as GitDiffLineType, DiffOptions as GitOptions, Patch as GitPatch}; -use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task}; +use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, TaskLabel}; use language::{Language, LanguageRegistry}; use rope::Rope; -use std::{cmp::Ordering, future::Future, iter, mem, ops::Range, sync::Arc}; +use std::{ + cmp::Ordering, + future::Future, + iter, + ops::Range, + sync::{Arc, LazyLock}, +}; use sum_tree::SumTree; use text::{Anchor, Bias, BufferId, OffsetRangeExt, Point, ToOffset as _}; use util::ResultExt; +pub static CALCULATE_DIFF_TASK: LazyLock = LazyLock::new(TaskLabel::new); + pub struct BufferDiff { pub buffer_id: BufferId, inner: BufferDiffInner, @@ -181,10 +189,12 @@ impl BufferDiffSnapshot { base_text_exists = false; }; - let hunks = cx.background_spawn({ - let buffer = buffer.clone(); - async move { compute_hunks(base_text_pair, buffer) } - }); + let hunks = cx + .background_executor() + .spawn_labeled(*CALCULATE_DIFF_TASK, { + let buffer = buffer.clone(); + async move { compute_hunks(base_text_pair, buffer) } + }); async move { let (base_text, hunks) = futures::join!(base_text_snapshot, hunks); @@ -208,17 +218,18 @@ impl BufferDiffSnapshot { ) -> impl Future + use<> { let base_text_exists = base_text.is_some(); let base_text_pair = base_text.map(|text| (text, base_text_snapshot.as_rope().clone())); - cx.background_spawn(async move { - Self { - inner: BufferDiffInner { - base_text: base_text_snapshot, - pending_hunks: SumTree::new(&buffer), - hunks: compute_hunks(base_text_pair, buffer), - base_text_exists, - }, - secondary_diff: None, - } - }) + cx.background_executor() + .spawn_labeled(*CALCULATE_DIFF_TASK, async move { + Self { + inner: BufferDiffInner { + base_text: base_text_snapshot, + pending_hunks: SumTree::new(&buffer), + hunks: compute_hunks(base_text_pair, buffer), + base_text_exists, + }, + secondary_diff: None, + } + }) } #[cfg(test)] @@ -381,6 +392,7 @@ impl BufferDiffInner { while let Some(PendingHunk { buffer_range, diff_base_byte_range, + new_status, .. }) = pending_hunks_iter.next() { @@ -439,16 +451,23 @@ impl BufferDiffInner { let index_end = prev_unstaged_hunk_base_text_end + end_overshoot; let index_byte_range = index_start..index_end; - let replacement_text = if stage { - log::debug!("stage hunk {:?}", buffer_offset_range); - buffer - .text_for_range(buffer_offset_range) - .collect::() - } else { - log::debug!("unstage hunk {:?}", buffer_offset_range); - head_text - .chunks_in_range(diff_base_byte_range.clone()) - .collect::() + let replacement_text = match new_status { + DiffHunkSecondaryStatus::SecondaryHunkRemovalPending => { + log::debug!("staging hunk {:?}", buffer_offset_range); + buffer + .text_for_range(buffer_offset_range) + .collect::() + } + DiffHunkSecondaryStatus::SecondaryHunkAdditionPending => { + log::debug!("unstaging hunk {:?}", buffer_offset_range); + head_text + .chunks_in_range(diff_base_byte_range.clone()) + .collect::() + } + _ => { + debug_assert!(false); + continue; + } }; edits.push((index_byte_range, replacement_text)); @@ -631,28 +650,6 @@ impl BufferDiffInner { }) } - fn set_state( - &mut self, - new_state: Self, - buffer: &text::BufferSnapshot, - ) -> Option> { - let (base_text_changed, changed_range) = - match (self.base_text_exists, new_state.base_text_exists) { - (false, false) => (true, None), - (true, true) if self.base_text.remote_id() == new_state.base_text.remote_id() => { - (false, new_state.compare(&self, buffer)) - } - _ => (true, Some(text::Anchor::MIN..text::Anchor::MAX)), - }; - - let pending_hunks = mem::replace(&mut self.pending_hunks, SumTree::new(buffer)); - *self = new_state; - if !base_text_changed { - self.pending_hunks = pending_hunks; - } - changed_range - } - fn compare(&self, old: &Self, new_snapshot: &text::BufferSnapshot) -> Option> { let mut new_cursor = self.hunks.cursor::<()>(new_snapshot); let mut old_cursor = old.hunks.cursor::<()>(new_snapshot); @@ -1011,26 +1008,61 @@ impl BufferDiff { &mut self, new_snapshot: BufferDiffSnapshot, buffer: &text::BufferSnapshot, - secondary_changed_range: Option>, cx: &mut Context, ) -> Option> { - let changed_range = self.inner.set_state(new_snapshot.inner, buffer); + self.set_snapshot_with_secondary(new_snapshot, buffer, None, false, cx) + } - let changed_range = match (secondary_changed_range, changed_range) { - (None, None) => None, - (Some(unstaged_range), None) => self.range_to_hunk_range(unstaged_range, &buffer, cx), - (None, Some(uncommitted_range)) => Some(uncommitted_range), - (Some(unstaged_range), Some(uncommitted_range)) => { - let mut start = uncommitted_range.start; - let mut end = uncommitted_range.end; - if let Some(unstaged_range) = self.range_to_hunk_range(unstaged_range, &buffer, cx) - { - start = unstaged_range.start.min(&uncommitted_range.start, &buffer); - end = unstaged_range.end.max(&uncommitted_range.end, &buffer); + pub fn set_snapshot_with_secondary( + &mut self, + new_snapshot: BufferDiffSnapshot, + buffer: &text::BufferSnapshot, + secondary_diff_change: Option>, + clear_pending_hunks: bool, + cx: &mut Context, + ) -> Option> { + log::debug!("set snapshot with secondary {secondary_diff_change:?}"); + + let state = &mut self.inner; + let new_state = new_snapshot.inner; + let (base_text_changed, mut changed_range) = + match (state.base_text_exists, new_state.base_text_exists) { + (false, false) => (true, None), + (true, true) if state.base_text.remote_id() == new_state.base_text.remote_id() => { + (false, new_state.compare(&state, buffer)) + } + _ => (true, Some(text::Anchor::MIN..text::Anchor::MAX)), + }; + + if let Some(secondary_changed_range) = secondary_diff_change { + if let Some(secondary_hunk_range) = + self.range_to_hunk_range(secondary_changed_range, &buffer, cx) + { + if let Some(range) = &mut changed_range { + range.start = secondary_hunk_range.start.min(&range.start, &buffer); + range.end = secondary_hunk_range.end.max(&range.end, &buffer); + } else { + changed_range = Some(secondary_hunk_range); } - Some(start..end) } - }; + } + + let state = &mut self.inner; + state.base_text_exists = new_state.base_text_exists; + state.base_text = new_state.base_text; + state.hunks = new_state.hunks; + if base_text_changed || clear_pending_hunks { + if let Some((first, last)) = state.pending_hunks.first().zip(state.pending_hunks.last()) + { + if let Some(range) = &mut changed_range { + range.start = range.start.min(&first.buffer_range.start, &buffer); + range.end = range.end.max(&last.buffer_range.end, &buffer); + } else { + changed_range = Some(first.buffer_range.start..last.buffer_range.end); + } + } + state.pending_hunks = SumTree::new(buffer); + } cx.emit(BufferDiffEvent::DiffChanged { changed_range: changed_range.clone(), @@ -1138,7 +1170,7 @@ impl BufferDiff { return; }; this.update(cx, |this, cx| { - this.set_snapshot(snapshot, &buffer, None, cx); + this.set_snapshot(snapshot, &buffer, cx); }) .log_err(); drop(complete_on_drop) @@ -1163,7 +1195,7 @@ impl BufferDiff { cx, ); let snapshot = cx.background_executor().block(snapshot); - self.set_snapshot(snapshot, &buffer, None, cx); + self.set_snapshot(snapshot, &buffer, cx); } } @@ -1752,13 +1784,13 @@ mod tests { let unstaged_diff = cx.new(|cx| { let mut diff = BufferDiff::new(&buffer, cx); - diff.set_snapshot(unstaged, &buffer, None, cx); + diff.set_snapshot(unstaged, &buffer, cx); diff }); let uncommitted_diff = cx.new(|cx| { let mut diff = BufferDiff::new(&buffer, cx); - diff.set_snapshot(uncommitted, &buffer, None, cx); + diff.set_snapshot(uncommitted, &buffer, cx); diff.set_secondary_diff(unstaged_diff); diff }); @@ -1819,12 +1851,12 @@ mod tests { let uncommitted = BufferDiffSnapshot::new_sync(buffer.clone(), head_text.clone(), cx); let unstaged_diff = cx.new(|cx| { let mut diff = BufferDiff::new(&buffer, cx); - diff.set_snapshot(unstaged, &buffer, None, cx); + diff.set_snapshot(unstaged, &buffer, cx); diff }); let uncommitted_diff = cx.new(|cx| { let mut diff = BufferDiff::new(&buffer, cx); - diff.set_snapshot(uncommitted, &buffer, None, cx); + diff.set_snapshot(uncommitted, &buffer, cx); diff.set_secondary_diff(unstaged_diff.clone()); diff }); diff --git a/crates/git_ui/src/commit_view.rs b/crates/git_ui/src/commit_view.rs index 06e4c92460..d7ec189028 100644 --- a/crates/git_ui/src/commit_view.rs +++ b/crates/git_ui/src/commit_view.rs @@ -356,7 +356,7 @@ async fn build_buffer_diff( cx.new(|cx| { let mut diff = BufferDiff::new(&buffer.text, cx); - diff.set_snapshot(diff_snapshot, &buffer.text, None, cx); + diff.set_snapshot(diff_snapshot, &buffer.text, cx); diff }) } diff --git a/crates/git_ui/src/project_diff.rs b/crates/git_ui/src/project_diff.rs index cc770055f3..7975d11960 100644 --- a/crates/git_ui/src/project_diff.rs +++ b/crates/git_ui/src/project_diff.rs @@ -1501,7 +1501,6 @@ mod tests { .unindent(), ); - eprintln!(">>>>>>>> git restore"); let prev_buffer_hunks = cx.update_window_entity(&buffer_editor, |buffer_editor, window, cx| { let snapshot = buffer_editor.snapshot(window, cx); @@ -1525,7 +1524,6 @@ mod tests { }); assert_eq!(new_buffer_hunks.as_slice(), &[]); - eprintln!(">>>>>>>> modify"); cx.update_window_entity(&buffer_editor, |buffer_editor, window, cx| { buffer_editor.set_text("different\n", window, cx); buffer_editor.save(false, project.clone(), window, cx) @@ -1554,8 +1552,8 @@ mod tests { cx, &" - original - + ˇdifferent - " + + different + ˇ" .unindent(), ); } diff --git a/crates/project/src/git_store.rs b/crates/project/src/git_store.rs index 53e2f1a10a..72aafd7c09 100644 --- a/crates/project/src/git_store.rs +++ b/crates/project/src/git_store.rs @@ -38,6 +38,7 @@ use language::{ proto::{deserialize_version, serialize_version}, }; use parking_lot::Mutex; +use postage::stream::Stream as _; use rpc::{ AnyProtoClient, TypedEnvelope, proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update}, @@ -83,15 +84,28 @@ struct SharedDiffs { uncommitted: Option>, } -#[derive(Default)] struct BufferDiffState { unstaged_diff: Option>, uncommitted_diff: Option>, recalculate_diff_task: Option>>, language: Option>, language_registry: Option>, - diff_updated_futures: Vec>, + recalculating_tx: postage::watch::Sender, + + /// These operation counts are used to ensure that head and index text + /// values read from the git repository are up-to-date with any hunk staging + /// operations that have been performed on the BufferDiff. + /// + /// The operation_count is incremented immediately when the user initiates a + /// hunk stage/unstage operation. Then, upon writing the new index text do + /// disk, the `operation_count_as_of_write` is updated to reflect the + /// operation_count that prompted the write. Finally, when reloading + /// index/head text from disk in response to a filesystem event, the + /// `operation_count_as_of_read` is updated to reflect the latest previous + /// write. hunk_staging_operation_count: usize, + hunk_staging_operation_count_as_of_write: usize, + hunk_staging_operation_count_as_of_read: usize, head_text: Option>, index_text: Option>, @@ -299,7 +313,7 @@ pub struct GitJob { #[derive(PartialEq, Eq)] enum GitJobKey { WriteIndex(RepoPath), - BatchReadIndex, + ReloadBufferDiffBases, RefreshStatuses, ReloadGitState, } @@ -551,7 +565,7 @@ impl GitStore { diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation()) { return cx.background_executor().spawn(async move { - task.await?; + task.await; Ok(unstaged_diff) }); } @@ -608,7 +622,7 @@ impl GitStore { diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation()) { return cx.background_executor().spawn(async move { - task.await?; + task.await; Ok(uncommitted_diff) }); } @@ -697,10 +711,13 @@ impl GitStore { } } - let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, 0, cx); + diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx); + let rx = diff_state.wait_for_recalculation(); anyhow::Ok(async move { - rx.await.ok(); + if let Some(rx) = rx { + rx.await; + } Ok(diff) }) }) @@ -1223,13 +1240,10 @@ impl GitStore { for buffer in buffers { if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) { let buffer = buffer.read(cx).text_snapshot(); - futures.push(diff_state.update(cx, |diff_state, cx| { - diff_state.recalculate_diffs( - buffer, - diff_state.hunk_staging_operation_count, - cx, - ) - })); + diff_state.update(cx, |diff_state, cx| { + diff_state.recalculate_diffs(buffer, cx); + futures.extend(diff_state.wait_for_recalculation()); + }); } } async move { @@ -1246,31 +1260,33 @@ impl GitStore { if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event { let buffer_id = diff.read(cx).buffer_id; if let Some(diff_state) = self.diffs.get(&buffer_id) { - diff_state.update(cx, |diff_state, _| { + let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| { diff_state.hunk_staging_operation_count += 1; + diff_state.hunk_staging_operation_count }); - } - if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) { - let recv = repo.update(cx, |repo, cx| { - log::debug!("updating index text for buffer {}", path.display()); - repo.spawn_set_index_text_job( - path, - new_index_text.as_ref().map(|rope| rope.to_string()), - cx, - ) - }); - let diff = diff.downgrade(); - cx.spawn(async move |this, cx| { - if let Ok(Err(error)) = cx.background_spawn(recv).await { - diff.update(cx, |diff, cx| { - diff.clear_pending_hunks(cx); - }) - .ok(); - this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error))) + if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) { + let recv = repo.update(cx, |repo, cx| { + log::debug!("hunks changed for {}", path.display()); + repo.spawn_set_index_text_job( + path, + new_index_text.as_ref().map(|rope| rope.to_string()), + hunk_staging_operation_count, + cx, + ) + }); + let diff = diff.downgrade(); + cx.spawn(async move |this, cx| { + if let Ok(Err(error)) = cx.background_spawn(recv).await { + diff.update(cx, |diff, cx| { + diff.clear_pending_hunks(cx); + }) .ok(); - } - }) - .detach(); + this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error))) + .ok(); + } + }) + .detach(); + } } } } @@ -1284,178 +1300,15 @@ impl GitStore { log::debug!("local worktree repos changed"); debug_assert!(worktree.read(cx).is_local()); - let mut diff_state_updates = HashMap::, Vec<_>>::default(); - for (buffer_id, diff_state) in &self.diffs { - let Some(buffer) = self.buffer_store.read(cx).get(*buffer_id) else { - continue; - }; - let Some(file) = File::from_dyn(buffer.read(cx).file()) else { - continue; - }; - if file.worktree != worktree { - continue; - } - let Some((repo, repo_path)) = - self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx) - else { - continue; - }; - if !changed_repos.iter().any(|update| { - update.old_work_directory_abs_path.as_ref() - == Some(&repo.read(cx).work_directory_abs_path) - || update.new_work_directory_abs_path.as_ref() - == Some(&repo.read(cx).work_directory_abs_path) - }) { - continue; - } - - let diff_state = diff_state.read(cx); - let has_unstaged_diff = diff_state - .unstaged_diff - .as_ref() - .is_some_and(|diff| diff.is_upgradable()); - let has_uncommitted_diff = diff_state - .uncommitted_diff - .as_ref() - .is_some_and(|set| set.is_upgradable()); - - let update = ( - buffer, - repo_path, - has_unstaged_diff.then(|| diff_state.index_text.clone()), - has_uncommitted_diff.then(|| diff_state.head_text.clone()), - diff_state.hunk_staging_operation_count, - ); - diff_state_updates.entry(repo).or_default().push(update); - } - - if diff_state_updates.is_empty() { - return; - } - - for (repo, repo_diff_state_updates) in diff_state_updates.into_iter() { - let git_store = cx.weak_entity(); - - let _ = repo.update(cx, |repo, _| { - repo.send_keyed_job( - Some(GitJobKey::BatchReadIndex), - None, - |state, mut cx| async move { - let RepositoryState::Local { backend, .. } = state else { - log::error!("tried to recompute diffs for a non-local repository"); - return; - }; - let mut diff_bases_changes_by_buffer = Vec::new(); - for ( - buffer, - repo_path, - current_index_text, - current_head_text, - hunk_staging_operation_count, - ) in &repo_diff_state_updates - { - let index_text = if current_index_text.is_some() { - backend.load_index_text(repo_path.clone()).await - } else { - None - }; - let head_text = if current_head_text.is_some() { - backend.load_committed_text(repo_path.clone()).await - } else { - None - }; - - // Avoid triggering a diff update if the base text has not changed. - if let Some((current_index, current_head)) = - current_index_text.as_ref().zip(current_head_text.as_ref()) - { - if current_index.as_deref() == index_text.as_ref() - && current_head.as_deref() == head_text.as_ref() - { - continue; - } - } - - let diff_bases_change = - match (current_index_text.is_some(), current_head_text.is_some()) { - (true, true) => Some(if index_text == head_text { - DiffBasesChange::SetBoth(head_text) - } else { - DiffBasesChange::SetEach { - index: index_text, - head: head_text, - } - }), - (true, false) => Some(DiffBasesChange::SetIndex(index_text)), - (false, true) => Some(DiffBasesChange::SetHead(head_text)), - (false, false) => None, - }; - - diff_bases_changes_by_buffer.push(( - buffer, - diff_bases_change, - *hunk_staging_operation_count, - )) - } - - git_store - .update(&mut cx, |git_store, cx| { - for (buffer, diff_bases_change, hunk_staging_operation_count) in - diff_bases_changes_by_buffer - { - let Some(diff_state) = - git_store.diffs.get(&buffer.read(cx).remote_id()) - else { - continue; - }; - let Some(diff_bases_change) = diff_bases_change else { - continue; - }; - - let downstream_client = git_store.downstream_client(); - diff_state.update(cx, |diff_state, cx| { - use proto::update_diff_bases::Mode; - - let buffer = buffer.read(cx); - if let Some((client, project_id)) = downstream_client { - let (staged_text, committed_text, mode) = - match diff_bases_change.clone() { - DiffBasesChange::SetIndex(index) => { - (index, None, Mode::IndexOnly) - } - DiffBasesChange::SetHead(head) => { - (None, head, Mode::HeadOnly) - } - DiffBasesChange::SetEach { index, head } => { - (index, head, Mode::IndexAndHead) - } - DiffBasesChange::SetBoth(text) => { - (None, text, Mode::IndexMatchesHead) - } - }; - let message = proto::UpdateDiffBases { - project_id: project_id.to_proto(), - buffer_id: buffer.remote_id().to_proto(), - staged_text, - committed_text, - mode: mode as i32, - }; - - client.send(message).log_err(); - } - - let _ = diff_state.diff_bases_changed( - buffer.text_snapshot(), - diff_bases_change, - hunk_staging_operation_count, - cx, - ); - }); - } - }) - .ok(); - }, - ) + for repository in self.repositories.values() { + repository.update(cx, |repository, cx| { + let repo_abs_path = &repository.work_directory_abs_path; + if changed_repos.iter().any(|update| { + update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path) + || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path) + }) { + repository.reload_buffer_diff_bases(cx); + } }); } } @@ -1775,12 +1628,28 @@ impl GitStore { ) -> Result { let repository_id = RepositoryId::from_proto(envelope.payload.repository_id); let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?; + let repo_path = RepoPath::from_str(&envelope.payload.path); + + let hunk_staging_operation_count = this + .read_with(&cx, |this, cx| { + let project_path = repository_handle + .read(cx) + .repo_path_to_project_path(&repo_path, cx)?; + let buffer_id = this + .buffer_store + .read(cx) + .buffer_id_for_project_path(&project_path)?; + let diff_state = this.diffs.get(buffer_id)?; + Some(diff_state.read(cx).hunk_staging_operation_count) + })? + .ok_or_else(|| anyhow!("unknown buffer"))?; repository_handle .update(&mut cx, |repository_handle, cx| { repository_handle.spawn_set_index_text_job( - RepoPath::from_str(&envelope.payload.path), + repo_path, envelope.payload.text, + hunk_staging_operation_count, cx, ) })? @@ -2176,6 +2045,8 @@ impl GitStore { if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) { let buffer = buffer.read(cx).text_snapshot(); diff_state.update(cx, |diff_state, cx| { + diff_state.hunk_staging_operation_count_as_of_read = + diff_state.hunk_staging_operation_count_as_of_write; diff_state.handle_base_texts_updated(buffer, request.payload, cx); }) } @@ -2258,11 +2129,7 @@ impl BufferDiffState { fn buffer_language_changed(&mut self, buffer: Entity, cx: &mut Context) { self.language = buffer.read(cx).language().cloned(); self.language_changed = true; - let _ = self.recalculate_diffs( - buffer.read(cx).text_snapshot(), - self.hunk_staging_operation_count, - cx, - ); + let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx); } fn unstaged_diff(&self) -> Option> { @@ -2295,46 +2162,47 @@ impl BufferDiffState { }, }; - let _ = self.diff_bases_changed( - buffer, - diff_bases_change, - self.hunk_staging_operation_count, - cx, - ); + self.diff_bases_changed(buffer, Some(diff_bases_change), cx); } - pub fn wait_for_recalculation(&mut self) -> Option> { - if self.diff_updated_futures.is_empty() { - return None; + pub fn wait_for_recalculation(&mut self) -> Option + use<>> { + if *self.recalculating_tx.borrow() { + let mut rx = self.recalculating_tx.subscribe(); + return Some(async move { + loop { + let is_recalculating = rx.recv().await; + if is_recalculating != Some(true) { + break; + } + } + }); + } else { + None } - let (tx, rx) = oneshot::channel(); - self.diff_updated_futures.push(tx); - Some(rx) } fn diff_bases_changed( &mut self, buffer: text::BufferSnapshot, - diff_bases_change: DiffBasesChange, - prev_hunk_staging_operation_count: usize, + diff_bases_change: Option, cx: &mut Context, - ) -> oneshot::Receiver<()> { + ) { match diff_bases_change { - DiffBasesChange::SetIndex(index) => { + Some(DiffBasesChange::SetIndex(index)) => { self.index_text = index.map(|mut index| { text::LineEnding::normalize(&mut index); Arc::new(index) }); self.index_changed = true; } - DiffBasesChange::SetHead(head) => { + Some(DiffBasesChange::SetHead(head)) => { self.head_text = head.map(|mut head| { text::LineEnding::normalize(&mut head); Arc::new(head) }); self.head_changed = true; } - DiffBasesChange::SetBoth(text) => { + Some(DiffBasesChange::SetBoth(text)) => { let text = text.map(|mut text| { text::LineEnding::normalize(&mut text); Arc::new(text) @@ -2344,7 +2212,7 @@ impl BufferDiffState { self.head_changed = true; self.index_changed = true; } - DiffBasesChange::SetEach { index, head } => { + Some(DiffBasesChange::SetEach { index, head }) => { self.index_text = index.map(|mut index| { text::LineEnding::normalize(&mut index); Arc::new(index) @@ -2356,20 +2224,14 @@ impl BufferDiffState { }); self.head_changed = true; } + None => {} } - self.recalculate_diffs(buffer, prev_hunk_staging_operation_count, cx) + self.recalculate_diffs(buffer, cx) } - fn recalculate_diffs( - &mut self, - buffer: text::BufferSnapshot, - prev_hunk_staging_operation_count: usize, - cx: &mut Context, - ) -> oneshot::Receiver<()> { - log::debug!("recalculate diffs"); - let (tx, rx) = oneshot::channel(); - self.diff_updated_futures.push(tx); + fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context) { + *self.recalculating_tx.borrow_mut() = true; let language = self.language.clone(); let language_registry = self.language_registry.clone(); @@ -2380,12 +2242,18 @@ impl BufferDiffState { let index_changed = self.index_changed; let head_changed = self.head_changed; let language_changed = self.language_changed; + let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_read; let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) { (Some(index), Some(head)) => Arc::ptr_eq(index, head), (None, None) => true, _ => false, }; self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| { + log::debug!( + "start recalculating diffs for buffer {}", + buffer.remote_id() + ); + let mut new_unstaged_diff = None; if let Some(unstaged_diff) = &unstaged_diff { new_unstaged_diff = Some( @@ -2424,10 +2292,28 @@ impl BufferDiffState { } } - if this.update(cx, |this, _| { - this.hunk_staging_operation_count > prev_hunk_staging_operation_count - })? { - eprintln!("early return"); + let cancel = this.update(cx, |this, _| { + // This checks whether all pending stage/unstage operations + // have quiesced (i.e. both the corresponding write and the + // read of that write have completed). If not, then we cancel + // this recalculation attempt to avoid invalidating pending + // state too quickly; another recalculation will come along + // later and clear the pending state once the state of the index has settled. + if this.hunk_staging_operation_count > prev_hunk_staging_operation_count { + *this.recalculating_tx.borrow_mut() = false; + true + } else { + false + } + })?; + if cancel { + log::debug!( + concat!( + "aborting recalculating diffs for buffer {}", + "due to subsequent hunk operations", + ), + buffer.remote_id() + ); return Ok(()); } @@ -2438,7 +2324,7 @@ impl BufferDiffState { if language_changed { diff.language_changed(cx); } - diff.set_snapshot(new_unstaged_diff, &buffer, None, cx) + diff.set_snapshot(new_unstaged_diff, &buffer, cx) })? } else { None @@ -2451,25 +2337,53 @@ impl BufferDiffState { if language_changed { diff.language_changed(cx); } - diff.set_snapshot(new_uncommitted_diff, &buffer, unstaged_changed_range, cx); + diff.set_snapshot_with_secondary( + new_uncommitted_diff, + &buffer, + unstaged_changed_range, + true, + cx, + ); })?; } + log::debug!( + "finished recalculating diffs for buffer {}", + buffer.remote_id() + ); + if let Some(this) = this.upgrade() { this.update(cx, |this, _| { this.index_changed = false; this.head_changed = false; this.language_changed = false; - for tx in this.diff_updated_futures.drain(..) { - tx.send(()).ok(); - } + *this.recalculating_tx.borrow_mut() = false; })?; } Ok(()) })); + } +} - rx +impl Default for BufferDiffState { + fn default() -> Self { + Self { + unstaged_diff: Default::default(), + uncommitted_diff: Default::default(), + recalculate_diff_task: Default::default(), + language: Default::default(), + language_registry: Default::default(), + recalculating_tx: postage::watch::channel_with(false).0, + hunk_staging_operation_count: 0, + hunk_staging_operation_count_as_of_read: 0, + hunk_staging_operation_count_as_of_write: 0, + head_text: Default::default(), + index_text: Default::default(), + head_changed: Default::default(), + index_changed: Default::default(), + language_changed: Default::default(), + } } } @@ -2702,6 +2616,170 @@ impl Repository { self.git_store.upgrade() } + fn reload_buffer_diff_bases(&mut self, cx: &mut Context) { + let this = cx.weak_entity(); + let git_store = self.git_store.clone(); + let _ = self.send_keyed_job( + Some(GitJobKey::ReloadBufferDiffBases), + None, + |state, mut cx| async move { + let RepositoryState::Local { backend, .. } = state else { + log::error!("tried to recompute diffs for a non-local repository"); + return Ok(()); + }; + + let Some(this) = this.upgrade() else { + return Ok(()); + }; + + let repo_diff_state_updates = this.update(&mut cx, |this, cx| { + git_store.update(cx, |git_store, cx| { + git_store + .diffs + .iter() + .filter_map(|(buffer_id, diff_state)| { + let buffer_store = git_store.buffer_store.read(cx); + let buffer = buffer_store.get(*buffer_id)?; + let file = File::from_dyn(buffer.read(cx).file())?; + let abs_path = + file.worktree.read(cx).absolutize(&file.path).ok()?; + let repo_path = this.abs_path_to_repo_path(&abs_path)?; + log::debug!( + "start reload diff bases for repo path {}", + repo_path.0.display() + ); + diff_state.update(cx, |diff_state, _| { + diff_state.hunk_staging_operation_count_as_of_read = + diff_state.hunk_staging_operation_count_as_of_write; + + let has_unstaged_diff = diff_state + .unstaged_diff + .as_ref() + .is_some_and(|diff| diff.is_upgradable()); + let has_uncommitted_diff = diff_state + .uncommitted_diff + .as_ref() + .is_some_and(|set| set.is_upgradable()); + + Some(( + buffer, + repo_path, + has_unstaged_diff.then(|| diff_state.index_text.clone()), + has_uncommitted_diff.then(|| diff_state.head_text.clone()), + )) + }) + }) + .collect::>() + }) + })??; + + let buffer_diff_base_changes = cx + .background_spawn(async move { + let mut changes = Vec::new(); + for (buffer, repo_path, current_index_text, current_head_text) in + &repo_diff_state_updates + { + let index_text = if current_index_text.is_some() { + backend.load_index_text(repo_path.clone()).await + } else { + None + }; + let head_text = if current_head_text.is_some() { + backend.load_committed_text(repo_path.clone()).await + } else { + None + }; + + let change = + match (current_index_text.as_ref(), current_head_text.as_ref()) { + (Some(current_index), Some(current_head)) => { + let index_changed = + index_text.as_ref() != current_index.as_deref(); + let head_changed = + head_text.as_ref() != current_head.as_deref(); + if index_changed && head_changed { + if index_text == head_text { + Some(DiffBasesChange::SetBoth(head_text)) + } else { + Some(DiffBasesChange::SetEach { + index: index_text, + head: head_text, + }) + } + } else if index_changed { + Some(DiffBasesChange::SetIndex(index_text)) + } else if head_changed { + Some(DiffBasesChange::SetHead(head_text)) + } else { + None + } + } + (Some(current_index), None) => { + let index_changed = + index_text.as_ref() != current_index.as_deref(); + index_changed + .then_some(DiffBasesChange::SetIndex(index_text)) + } + (None, Some(current_head)) => { + let head_changed = + head_text.as_ref() != current_head.as_deref(); + head_changed.then_some(DiffBasesChange::SetHead(head_text)) + } + (None, None) => None, + }; + + changes.push((buffer.clone(), change)) + } + changes + }) + .await; + + git_store.update(&mut cx, |git_store, cx| { + for (buffer, diff_bases_change) in buffer_diff_base_changes { + let buffer_snapshot = buffer.read(cx).text_snapshot(); + let buffer_id = buffer_snapshot.remote_id(); + let Some(diff_state) = git_store.diffs.get(&buffer_id) else { + continue; + }; + + let downstream_client = git_store.downstream_client(); + diff_state.update(cx, |diff_state, cx| { + use proto::update_diff_bases::Mode; + + if let Some((diff_bases_change, (client, project_id))) = + diff_bases_change.clone().zip(downstream_client) + { + let (staged_text, committed_text, mode) = match diff_bases_change { + DiffBasesChange::SetIndex(index) => { + (index, None, Mode::IndexOnly) + } + DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly), + DiffBasesChange::SetEach { index, head } => { + (index, head, Mode::IndexAndHead) + } + DiffBasesChange::SetBoth(text) => { + (None, text, Mode::IndexMatchesHead) + } + }; + client + .send(proto::UpdateDiffBases { + project_id: project_id.to_proto(), + buffer_id: buffer_id.to_proto(), + staged_text, + committed_text, + mode: mode as i32, + }) + .log_err(); + } + + diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx); + }); + } + }) + }, + ); + } + pub fn send_job( &mut self, status: Option, @@ -3416,14 +3494,17 @@ impl Repository { &mut self, path: RepoPath, content: Option, - _cx: &mut App, + hunk_staging_operation_count: usize, + cx: &mut Context, ) -> oneshot::Receiver> { let id = self.id; - + let this = cx.weak_entity(); + let git_store = self.git_store.clone(); self.send_keyed_job( Some(GitJobKey::WriteIndex(path.clone())), None, - move |git_repo, _cx| async move { + move |git_repo, mut cx| async move { + log::debug!("start updating index text for buffer {}", path.display()); match git_repo { RepositoryState::Local { backend, @@ -3431,8 +3512,8 @@ impl Repository { .. } => { backend - .set_index_text(path, content, environment.clone()) - .await + .set_index_text(path.clone(), content, environment.clone()) + .await?; } RepositoryState::Remote { project_id, client } => { client @@ -3443,9 +3524,27 @@ impl Repository { text: content, }) .await?; - Ok(()) } } + log::debug!("finish updating index text for buffer {}", path.display()); + + let project_path = this + .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx)) + .ok() + .flatten(); + git_store.update(&mut cx, |git_store, cx| { + let buffer_id = git_store + .buffer_store + .read(cx) + .buffer_id_for_project_path(&project_path?)?; + let diff_state = git_store.diffs.get(buffer_id)?; + diff_state.update(cx, |diff_state, _| { + diff_state.hunk_staging_operation_count_as_of_write = + hunk_staging_operation_count; + }); + Some(()) + })?; + Ok(()) }, ) } diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index b499bec710..38088ba684 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -5,7 +5,8 @@ use crate::{ *, }; use buffer_diff::{ - BufferDiffEvent, DiffHunkSecondaryStatus, DiffHunkStatus, DiffHunkStatusKind, assert_hunks, + BufferDiffEvent, CALCULATE_DIFF_TASK, DiffHunkSecondaryStatus, DiffHunkStatus, + DiffHunkStatusKind, assert_hunks, }; use fs::FakeFs; use futures::{StreamExt, future}; @@ -30,10 +31,11 @@ use parking_lot::Mutex; use paths::{config_dir, tasks_file}; use postage::stream::Stream as _; use pretty_assertions::{assert_eq, assert_matches}; +use rand::{Rng as _, rngs::StdRng}; use serde_json::json; #[cfg(not(windows))] use std::os; -use std::{mem, num::NonZeroU32, ops::Range, str::FromStr, sync::OnceLock, task::Poll}; +use std::{env, mem, num::NonZeroU32, ops::Range, str::FromStr, sync::OnceLock, task::Poll}; use task::{ResolvedTask, TaskContext}; use unindent::Unindent as _; use util::{ @@ -6605,7 +6607,7 @@ async fn test_staging_hunks(cx: &mut gpui::TestAppContext) { } = event { let changed_range = changed_range.to_point(&snapshot); - assert_eq!(changed_range, Point::new(0, 0)..Point::new(5, 0)); + assert_eq!(changed_range, Point::new(0, 0)..Point::new(4, 0)); } else { panic!("Unexpected event {event:?}"); } @@ -6966,49 +6968,55 @@ async fn test_staging_hunks_with_delayed_fs_event(cx: &mut gpui::TestAppContext) } #[gpui::test] -async fn test_staging_lots_of_hunks_fast(cx: &mut gpui::TestAppContext) { +async fn test_staging_random_hunks( + mut rng: StdRng, + executor: BackgroundExecutor, + cx: &mut gpui::TestAppContext, +) { + let operations = env::var("OPERATIONS") + .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) + .unwrap_or(20); + + // Try to induce races between diff recalculation and index writes. + if rng.gen_bool(0.5) { + executor.deprioritize(*CALCULATE_DIFF_TASK); + } + use DiffHunkSecondaryStatus::*; init_test(cx); - let different_lines = (0..500) - .step_by(5) - .map(|i| format!("diff {}\n", i)) - .collect::>(); - let committed_contents = (0..500).map(|i| format!("{}\n", i)).collect::(); - let file_contents = (0..500) - .map(|i| { - if i % 5 == 0 { - different_lines[i / 5].clone() - } else { - format!("{}\n", i) - } + let committed_text = (0..30).map(|i| format!("line {i}\n")).collect::(); + let index_text = committed_text.clone(); + let buffer_text = (0..30) + .map(|i| match i % 5 { + 0 => format!("line {i} (modified)\n"), + _ => format!("line {i}\n"), }) .collect::(); let fs = FakeFs::new(cx.background_executor.clone()); fs.insert_tree( - "/dir", + path!("/dir"), json!({ ".git": {}, - "file.txt": file_contents.clone() + "file.txt": buffer_text.clone() }), ) .await; - fs.set_head_for_repo( - "/dir/.git".as_ref(), - &[("file.txt".into(), committed_contents.clone())], + path!("/dir/.git").as_ref(), + &[("file.txt".into(), committed_text.clone())], ); fs.set_index_for_repo( - "/dir/.git".as_ref(), - &[("file.txt".into(), committed_contents.clone())], + path!("/dir/.git").as_ref(), + &[("file.txt".into(), index_text.clone())], ); + let repo = fs.open_repo(path!("/dir/.git").as_ref()).unwrap(); - let project = Project::test(fs.clone(), ["/dir".as_ref()], cx).await; - + let project = Project::test(fs.clone(), [path!("/dir").as_ref()], cx).await; let buffer = project .update(cx, |project, cx| { - project.open_local_buffer("/dir/file.txt", cx) + project.open_local_buffer(path!("/dir/file.txt"), cx) }) .await .unwrap(); @@ -7020,94 +7028,60 @@ async fn test_staging_lots_of_hunks_fast(cx: &mut gpui::TestAppContext) { .await .unwrap(); - let mut expected_hunks: Vec<(Range, String, String, DiffHunkStatus)> = (0..500) - .step_by(5) - .map(|i| { - ( - i as u32..i as u32 + 1, - format!("{}\n", i), - different_lines[i / 5].clone(), - DiffHunkStatus::modified(HasSecondaryHunk), - ) - }) - .collect(); + let mut hunks = + uncommitted_diff.update(cx, |diff, cx| diff.hunks(&snapshot, cx).collect::>()); + assert_eq!(hunks.len(), 6); - // The hunks are initially unstaged - uncommitted_diff.read_with(cx, |diff, cx| { - assert_hunks( - diff.hunks(&snapshot, cx), - &snapshot, - &diff.base_text_string().unwrap(), - &expected_hunks, - ); - }); + for _i in 0..operations { + let hunk_ix = rng.gen_range(0..hunks.len()); + let hunk = &mut hunks[hunk_ix]; + let row = hunk.range.start.row; - for (_, _, _, status) in expected_hunks.iter_mut() { - *status = DiffHunkStatus::modified(SecondaryHunkRemovalPending); - } - - // Stage every hunk with a different call - uncommitted_diff.update(cx, |diff, cx| { - let hunks = diff.hunks(&snapshot, cx).collect::>(); - for hunk in hunks { - diff.stage_or_unstage_hunks(true, &[hunk], &snapshot, true, cx); + if hunk.status().has_secondary_hunk() { + log::info!("staging hunk at {row}"); + uncommitted_diff.update(cx, |diff, cx| { + diff.stage_or_unstage_hunks(true, &[hunk.clone()], &snapshot, true, cx); + }); + hunk.secondary_status = SecondaryHunkRemovalPending; + } else { + log::info!("unstaging hunk at {row}"); + uncommitted_diff.update(cx, |diff, cx| { + diff.stage_or_unstage_hunks(false, &[hunk.clone()], &snapshot, true, cx); + }); + hunk.secondary_status = SecondaryHunkAdditionPending; } - assert_hunks( - diff.hunks(&snapshot, cx), - &snapshot, - &diff.base_text_string().unwrap(), - &expected_hunks, - ); - }); - - // If we wait, we'll have no pending hunks - cx.run_until_parked(); - for (_, _, _, status) in expected_hunks.iter_mut() { - *status = DiffHunkStatus::modified(NoSecondaryHunk); - } - - uncommitted_diff.update(cx, |diff, cx| { - assert_hunks( - diff.hunks(&snapshot, cx), - &snapshot, - &diff.base_text_string().unwrap(), - &expected_hunks, - ); - }); - - for (_, _, _, status) in expected_hunks.iter_mut() { - *status = DiffHunkStatus::modified(SecondaryHunkAdditionPending); - } - - // Unstage every hunk with a different call - uncommitted_diff.update(cx, |diff, cx| { - let hunks = diff.hunks(&snapshot, cx).collect::>(); - for hunk in hunks { - diff.stage_or_unstage_hunks(false, &[hunk], &snapshot, true, cx); + for _ in 0..rng.gen_range(0..10) { + log::info!("yielding"); + cx.executor().simulate_random_delay().await; } - - assert_hunks( - diff.hunks(&snapshot, cx), - &snapshot, - &diff.base_text_string().unwrap(), - &expected_hunks, - ); - }); - - // If we wait, we'll have no pending hunks, again - cx.run_until_parked(); - for (_, _, _, status) in expected_hunks.iter_mut() { - *status = DiffHunkStatus::modified(HasSecondaryHunk); } + cx.executor().run_until_parked(); + + for hunk in &mut hunks { + if hunk.secondary_status == SecondaryHunkRemovalPending { + hunk.secondary_status = NoSecondaryHunk; + } else if hunk.secondary_status == SecondaryHunkAdditionPending { + hunk.secondary_status = HasSecondaryHunk; + } + } + + log::info!( + "index text:\n{}", + repo.load_index_text("file.txt".into()).await.unwrap() + ); + uncommitted_diff.update(cx, |diff, cx| { - assert_hunks( - diff.hunks(&snapshot, cx), - &snapshot, - &diff.base_text_string().unwrap(), - &expected_hunks, - ); + let expected_hunks = hunks + .iter() + .map(|hunk| (hunk.range.start.row, hunk.secondary_status)) + .collect::>(); + let actual_hunks = diff + .hunks(&snapshot, cx) + .map(|hunk| (hunk.range.start.row, hunk.secondary_status)) + .collect::>(); + assert_eq!(actual_hunks, expected_hunks); }); }