From 011f823f334af0cae6d9c8ea958dc45601653a72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos?= Date: Mon, 17 Mar 2025 14:02:32 -0300 Subject: [PATCH] Move buffer diff storage from `BufferStore` to `GitStore` (#26795) Release Notes: - N/A --------- Co-authored-by: Max Brunsfeld Co-authored-by: max --- crates/client/src/user.rs | 6 + .../random_project_collaboration_tests.rs | 4 +- crates/fs/src/fs.rs | 6 +- crates/git/Cargo.toml | 1 + crates/git/src/fake_repository.rs | 294 +++++ crates/git/src/git.rs | 15 +- crates/git/src/repository.rs | 337 +----- crates/project/src/buffer_store.rs | 884 +------------- crates/project/src/git.rs | 1074 +++++++++++++++-- crates/project/src/project.rs | 35 +- crates/project/src/project_tests.rs | 10 +- crates/remote_server/src/headless_project.rs | 7 +- crates/text/src/text.rs | 1 + 13 files changed, 1395 insertions(+), 1279 deletions(-) create mode 100644 crates/git/src/fake_repository.rs diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index c3f21e88c5..9a6cbc5542 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -29,6 +29,12 @@ impl std::fmt::Display for ChannelId { #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] pub struct ProjectId(pub u64); +impl ProjectId { + pub fn to_proto(&self) -> u64 { + self.0 + } +} + #[derive( Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize, )] diff --git a/crates/collab/src/tests/random_project_collaboration_tests.rs b/crates/collab/src/tests/random_project_collaboration_tests.rs index 655def73b9..aacacf9cca 100644 --- a/crates/collab/src/tests/random_project_collaboration_tests.rs +++ b/crates/collab/src/tests/random_project_collaboration_tests.rs @@ -1337,7 +1337,7 @@ impl RandomizedTest for ProjectCollaborationTest { let host_diff_base = host_project.read_with(host_cx, |project, cx| { project - .buffer_store() + .git_store() .read(cx) .get_unstaged_diff(host_buffer.read(cx).remote_id(), cx) .unwrap() @@ -1346,7 +1346,7 @@ impl RandomizedTest for ProjectCollaborationTest { }); let guest_diff_base = guest_project.read_with(client_cx, |project, cx| { project - .buffer_store() + .git_store() .read(cx) .get_unstaged_diff(guest_buffer.read(cx).remote_id(), cx) .unwrap() diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index 46df19722c..fe27c007c0 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -52,7 +52,7 @@ use util::ResultExt; #[cfg(any(test, feature = "test-support"))] use collections::{btree_map, BTreeMap}; #[cfg(any(test, feature = "test-support"))] -use git::repository::FakeGitRepositoryState; +use git::FakeGitRepositoryState; #[cfg(any(test, feature = "test-support"))] use parking_lot::Mutex; #[cfg(any(test, feature = "test-support"))] @@ -885,7 +885,7 @@ enum FakeFsEntry { mtime: MTime, len: u64, entries: BTreeMap>>, - git_repo_state: Option>>, + git_repo_state: Option>>, }, Symlink { target: PathBuf, @@ -2095,7 +2095,7 @@ impl Fs for FakeFs { ))) }) .clone(); - Some(git::repository::FakeGitRepository::open(state)) + Some(git::FakeGitRepository::open(state)) } else { None } diff --git a/crates/git/Cargo.toml b/crates/git/Cargo.toml index 6458a796bc..c32fe3491f 100644 --- a/crates/git/Cargo.toml +++ b/crates/git/Cargo.toml @@ -42,3 +42,4 @@ pretty_assertions.workspace = true serde_json.workspace = true text = { workspace = true, features = ["test-support"] } unindent.workspace = true +gpui = { workspace = true, features = ["test-support"] } diff --git a/crates/git/src/fake_repository.rs b/crates/git/src/fake_repository.rs new file mode 100644 index 0000000000..39ec7a2532 --- /dev/null +++ b/crates/git/src/fake_repository.rs @@ -0,0 +1,294 @@ +use crate::{ + blame::Blame, + repository::{ + Branch, CommitDetails, DiffType, GitRepository, PushOptions, Remote, RemoteCommandOutput, + RepoPath, ResetMode, + }, + status::{FileStatus, GitStatus}, +}; +use anyhow::{Context, Result}; +use askpass::AskPassSession; +use collections::{HashMap, HashSet}; +use futures::{future::BoxFuture, FutureExt as _}; +use gpui::{AsyncApp, SharedString}; +use parking_lot::Mutex; +use rope::Rope; +use std::{path::PathBuf, sync::Arc}; + +#[derive(Debug, Clone)] +pub struct FakeGitRepository { + state: Arc>, +} + +#[derive(Debug, Clone)] +pub struct FakeGitRepositoryState { + pub path: PathBuf, + pub event_emitter: smol::channel::Sender, + pub head_contents: HashMap, + pub index_contents: HashMap, + pub blames: HashMap, + pub statuses: HashMap, + pub current_branch_name: Option, + pub branches: HashSet, + pub simulated_index_write_error_message: Option, +} + +impl FakeGitRepository { + pub fn open(state: Arc>) -> Arc { + Arc::new(FakeGitRepository { state }) + } +} + +impl FakeGitRepositoryState { + pub fn new(path: PathBuf, event_emitter: smol::channel::Sender) -> Self { + FakeGitRepositoryState { + path, + event_emitter, + head_contents: Default::default(), + index_contents: Default::default(), + blames: Default::default(), + statuses: Default::default(), + current_branch_name: Default::default(), + branches: Default::default(), + simulated_index_write_error_message: None, + } + } +} + +impl GitRepository for FakeGitRepository { + fn reload_index(&self) {} + + fn load_index_text(&self, path: RepoPath, _: AsyncApp) -> BoxFuture> { + let state = self.state.lock(); + let content = state.index_contents.get(path.as_ref()).cloned(); + async { content }.boxed() + } + + fn load_committed_text(&self, path: RepoPath, _: AsyncApp) -> BoxFuture> { + let state = self.state.lock(); + let content = state.head_contents.get(path.as_ref()).cloned(); + async { content }.boxed() + } + + fn set_index_text( + &self, + path: RepoPath, + content: Option, + _env: HashMap, + cx: AsyncApp, + ) -> BoxFuture> { + let state = self.state.clone(); + let executor = cx.background_executor().clone(); + async move { + executor.simulate_random_delay().await; + + let mut state = state.lock(); + if let Some(message) = state.simulated_index_write_error_message.clone() { + return Err(anyhow::anyhow!(message)); + } + + if let Some(content) = content { + state.index_contents.insert(path.clone(), content); + } else { + state.index_contents.remove(&path); + } + state + .event_emitter + .try_send(state.path.clone()) + .expect("Dropped repo change event"); + + Ok(()) + } + .boxed() + } + + fn remote_url(&self, _name: &str) -> Option { + None + } + + fn head_sha(&self) -> Option { + None + } + + fn merge_head_shas(&self) -> Vec { + vec![] + } + + fn show(&self, _: String, _: AsyncApp) -> BoxFuture> { + unimplemented!() + } + + fn reset(&self, _: String, _: ResetMode, _: HashMap) -> BoxFuture> { + unimplemented!() + } + + fn checkout_files( + &self, + _: String, + _: Vec, + _: HashMap, + ) -> BoxFuture> { + unimplemented!() + } + + fn path(&self) -> PathBuf { + let state = self.state.lock(); + state.path.clone() + } + + fn main_repository_path(&self) -> PathBuf { + self.path() + } + + fn status(&self, path_prefixes: &[RepoPath]) -> Result { + let state = self.state.lock(); + + let mut entries = state + .statuses + .iter() + .filter_map(|(repo_path, status)| { + if path_prefixes + .iter() + .any(|path_prefix| repo_path.0.starts_with(path_prefix)) + { + Some((repo_path.to_owned(), *status)) + } else { + None + } + }) + .collect::>(); + entries.sort_unstable_by(|(a, _), (b, _)| a.cmp(&b)); + + Ok(GitStatus { + entries: entries.into(), + }) + } + + fn branches(&self) -> BoxFuture>> { + let state = self.state.lock(); + let current_branch = &state.current_branch_name; + let result = Ok(state + .branches + .iter() + .map(|branch_name| Branch { + is_head: Some(branch_name) == current_branch.as_ref(), + name: branch_name.into(), + most_recent_commit: None, + upstream: None, + }) + .collect()); + + async { result }.boxed() + } + + fn change_branch(&self, name: String, _: AsyncApp) -> BoxFuture> { + let mut state = self.state.lock(); + state.current_branch_name = Some(name.to_owned()); + state + .event_emitter + .try_send(state.path.clone()) + .expect("Dropped repo change event"); + async { Ok(()) }.boxed() + } + + fn create_branch(&self, name: String, _: AsyncApp) -> BoxFuture> { + let mut state = self.state.lock(); + state.branches.insert(name.to_owned()); + state + .event_emitter + .try_send(state.path.clone()) + .expect("Dropped repo change event"); + async { Ok(()) }.boxed() + } + + fn blame( + &self, + path: RepoPath, + _content: Rope, + _cx: AsyncApp, + ) -> BoxFuture> { + let state = self.state.lock(); + let result = state + .blames + .get(&path) + .with_context(|| format!("failed to get blame for {:?}", path.0)) + .cloned(); + async { result }.boxed() + } + + fn stage_paths( + &self, + _paths: Vec, + _env: HashMap, + _cx: AsyncApp, + ) -> BoxFuture> { + unimplemented!() + } + + fn unstage_paths( + &self, + _paths: Vec, + _env: HashMap, + _cx: AsyncApp, + ) -> BoxFuture> { + unimplemented!() + } + + fn commit( + &self, + _message: SharedString, + _name_and_email: Option<(SharedString, SharedString)>, + _env: HashMap, + _: AsyncApp, + ) -> BoxFuture> { + unimplemented!() + } + + fn push( + &self, + _branch: String, + _remote: String, + _options: Option, + _ask_pass: AskPassSession, + _env: HashMap, + _cx: AsyncApp, + ) -> BoxFuture> { + unimplemented!() + } + + fn pull( + &self, + _branch: String, + _remote: String, + _ask_pass: AskPassSession, + _env: HashMap, + _cx: AsyncApp, + ) -> BoxFuture> { + unimplemented!() + } + + fn fetch( + &self, + _ask_pass: AskPassSession, + _env: HashMap, + _cx: AsyncApp, + ) -> BoxFuture> { + unimplemented!() + } + + fn get_remotes( + &self, + _branch: Option, + _cx: AsyncApp, + ) -> BoxFuture>> { + unimplemented!() + } + + fn check_for_pushed_commit(&self, _cx: AsyncApp) -> BoxFuture>> { + unimplemented!() + } + + fn diff(&self, _diff: DiffType, _cx: AsyncApp) -> BoxFuture> { + unimplemented!() + } +} diff --git a/crates/git/src/git.rs b/crates/git/src/git.rs index d108e6719c..4eb1c09250 100644 --- a/crates/git/src/git.rs +++ b/crates/git/src/git.rs @@ -5,20 +5,25 @@ mod remote; pub mod repository; pub mod status; +#[cfg(any(test, feature = "test-support"))] +mod fake_repository; + +#[cfg(any(test, feature = "test-support"))] +pub use fake_repository::*; + +pub use crate::hosting_provider::*; +pub use crate::remote::*; use anyhow::{anyhow, Context as _, Result}; +pub use git2 as libgit; use gpui::action_with_deprecated_aliases; use gpui::actions; +pub use repository::WORK_DIRECTORY_REPO_PATH; use serde::{Deserialize, Serialize}; use std::ffi::OsStr; use std::fmt; use std::str::FromStr; use std::sync::LazyLock; -pub use crate::hosting_provider::*; -pub use crate::remote::*; -pub use git2 as libgit; -pub use repository::WORK_DIRECTORY_REPO_PATH; - pub static DOT_GIT: LazyLock<&'static OsStr> = LazyLock::new(|| OsStr::new(".git")); pub static GITIGNORE: LazyLock<&'static OsStr> = LazyLock::new(|| OsStr::new(".gitignore")); pub static FSMONITOR_DAEMON: LazyLock<&'static OsStr> = diff --git a/crates/git/src/repository.rs b/crates/git/src/repository.rs index bec3822285..ce0df6ab42 100644 --- a/crates/git/src/repository.rs +++ b/crates/git/src/repository.rs @@ -1,9 +1,8 @@ -use crate::status::FileStatus; +use crate::status::GitStatus; use crate::SHORT_SHA_LENGTH; -use crate::{blame::Blame, status::GitStatus}; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Context as _, Result}; use askpass::{AskPassResult, AskPassSession}; -use collections::{HashMap, HashSet}; +use collections::HashMap; use futures::future::BoxFuture; use futures::{select_biased, AsyncWriteExt, FutureExt as _}; use git2::BranchType; @@ -13,11 +12,12 @@ use rope::Rope; use schemars::JsonSchema; use serde::Deserialize; use std::borrow::Borrow; +use std::path::Component; use std::process::Stdio; use std::sync::LazyLock; use std::{ cmp::Ordering, - path::{Component, Path, PathBuf}, + path::{Path, PathBuf}, sync::Arc, }; use sum_tree::MapSeekTarget; @@ -1056,304 +1056,6 @@ async fn run_remote_command( } } -#[derive(Debug, Clone)] -pub struct FakeGitRepository { - state: Arc>, -} - -#[derive(Debug, Clone)] -pub struct FakeGitRepositoryState { - pub path: PathBuf, - pub event_emitter: smol::channel::Sender, - pub head_contents: HashMap, - pub index_contents: HashMap, - pub blames: HashMap, - pub statuses: HashMap, - pub current_branch_name: Option, - pub branches: HashSet, - pub simulated_index_write_error_message: Option, -} - -impl FakeGitRepository { - pub fn open(state: Arc>) -> Arc { - Arc::new(FakeGitRepository { state }) - } -} - -impl FakeGitRepositoryState { - pub fn new(path: PathBuf, event_emitter: smol::channel::Sender) -> Self { - FakeGitRepositoryState { - path, - event_emitter, - head_contents: Default::default(), - index_contents: Default::default(), - blames: Default::default(), - statuses: Default::default(), - current_branch_name: Default::default(), - branches: Default::default(), - simulated_index_write_error_message: None, - } - } -} - -impl GitRepository for FakeGitRepository { - fn reload_index(&self) {} - - fn load_index_text(&self, path: RepoPath, _: AsyncApp) -> BoxFuture> { - let state = self.state.lock(); - let content = state.index_contents.get(path.as_ref()).cloned(); - async { content }.boxed() - } - - fn load_committed_text(&self, path: RepoPath, _: AsyncApp) -> BoxFuture> { - let state = self.state.lock(); - let content = state.head_contents.get(path.as_ref()).cloned(); - async { content }.boxed() - } - - fn set_index_text( - &self, - path: RepoPath, - content: Option, - _env: HashMap, - _cx: AsyncApp, - ) -> BoxFuture> { - let mut state = self.state.lock(); - if let Some(message) = state.simulated_index_write_error_message.clone() { - return async { Err(anyhow::anyhow!(message)) }.boxed(); - } - if let Some(content) = content { - state.index_contents.insert(path.clone(), content); - } else { - state.index_contents.remove(&path); - } - state - .event_emitter - .try_send(state.path.clone()) - .expect("Dropped repo change event"); - async { Ok(()) }.boxed() - } - - fn remote_url(&self, _name: &str) -> Option { - None - } - - fn head_sha(&self) -> Option { - None - } - - fn merge_head_shas(&self) -> Vec { - vec![] - } - - fn show(&self, _: String, _: AsyncApp) -> BoxFuture> { - unimplemented!() - } - - fn reset(&self, _: String, _: ResetMode, _: HashMap) -> BoxFuture> { - unimplemented!() - } - - fn checkout_files( - &self, - _: String, - _: Vec, - _: HashMap, - ) -> BoxFuture> { - unimplemented!() - } - - fn path(&self) -> PathBuf { - let state = self.state.lock(); - state.path.clone() - } - - fn main_repository_path(&self) -> PathBuf { - self.path() - } - - fn status(&self, path_prefixes: &[RepoPath]) -> Result { - let state = self.state.lock(); - - let mut entries = state - .statuses - .iter() - .filter_map(|(repo_path, status)| { - if path_prefixes - .iter() - .any(|path_prefix| repo_path.0.starts_with(path_prefix)) - { - Some((repo_path.to_owned(), *status)) - } else { - None - } - }) - .collect::>(); - entries.sort_unstable_by(|(a, _), (b, _)| a.cmp(&b)); - - Ok(GitStatus { - entries: entries.into(), - }) - } - - fn branches(&self) -> BoxFuture>> { - let state = self.state.lock(); - let current_branch = &state.current_branch_name; - let result = Ok(state - .branches - .iter() - .map(|branch_name| Branch { - is_head: Some(branch_name) == current_branch.as_ref(), - name: branch_name.into(), - most_recent_commit: None, - upstream: None, - }) - .collect()); - - async { result }.boxed() - } - - fn change_branch(&self, name: String, _: AsyncApp) -> BoxFuture> { - let mut state = self.state.lock(); - state.current_branch_name = Some(name.to_owned()); - state - .event_emitter - .try_send(state.path.clone()) - .expect("Dropped repo change event"); - async { Ok(()) }.boxed() - } - - fn create_branch(&self, name: String, _: AsyncApp) -> BoxFuture> { - let mut state = self.state.lock(); - state.branches.insert(name.to_owned()); - state - .event_emitter - .try_send(state.path.clone()) - .expect("Dropped repo change event"); - async { Ok(()) }.boxed() - } - - fn blame( - &self, - path: RepoPath, - _content: Rope, - _cx: AsyncApp, - ) -> BoxFuture> { - let state = self.state.lock(); - let result = state - .blames - .get(&path) - .with_context(|| format!("failed to get blame for {:?}", path.0)) - .cloned(); - async { result }.boxed() - } - - fn stage_paths( - &self, - _paths: Vec, - _env: HashMap, - _cx: AsyncApp, - ) -> BoxFuture> { - unimplemented!() - } - - fn unstage_paths( - &self, - _paths: Vec, - _env: HashMap, - _cx: AsyncApp, - ) -> BoxFuture> { - unimplemented!() - } - - fn commit( - &self, - _message: SharedString, - _name_and_email: Option<(SharedString, SharedString)>, - _env: HashMap, - _: AsyncApp, - ) -> BoxFuture> { - unimplemented!() - } - - fn push( - &self, - _branch: String, - _remote: String, - _options: Option, - _ask_pass: AskPassSession, - _env: HashMap, - _cx: AsyncApp, - ) -> BoxFuture> { - unimplemented!() - } - - fn pull( - &self, - _branch: String, - _remote: String, - _ask_pass: AskPassSession, - _env: HashMap, - _cx: AsyncApp, - ) -> BoxFuture> { - unimplemented!() - } - - fn fetch( - &self, - _ask_pass: AskPassSession, - _env: HashMap, - _cx: AsyncApp, - ) -> BoxFuture> { - unimplemented!() - } - - fn get_remotes( - &self, - _branch: Option, - _cx: AsyncApp, - ) -> BoxFuture>> { - unimplemented!() - } - - fn check_for_pushed_commit(&self, _cx: AsyncApp) -> BoxFuture>> { - unimplemented!() - } - - fn diff(&self, _diff: DiffType, _cx: AsyncApp) -> BoxFuture> { - unimplemented!() - } -} - -fn check_path_to_repo_path_errors(relative_file_path: &Path) -> Result<()> { - match relative_file_path.components().next() { - None => anyhow::bail!("repo path should not be empty"), - Some(Component::Prefix(_)) => anyhow::bail!( - "repo path `{}` should be relative, not a windows prefix", - relative_file_path.to_string_lossy() - ), - Some(Component::RootDir) => { - anyhow::bail!( - "repo path `{}` should be relative", - relative_file_path.to_string_lossy() - ) - } - Some(Component::CurDir) => { - anyhow::bail!( - "repo path `{}` should not start with `.`", - relative_file_path.to_string_lossy() - ) - } - Some(Component::ParentDir) => { - anyhow::bail!( - "repo path `{}` should not start with `..`", - relative_file_path.to_string_lossy() - ) - } - _ => Ok(()), - } -} - pub static WORK_DIRECTORY_REPO_PATH: LazyLock = LazyLock::new(|| RepoPath(Path::new("").into())); @@ -1526,6 +1228,35 @@ fn parse_upstream_track(upstream_track: &str) -> Result { })) } +fn check_path_to_repo_path_errors(relative_file_path: &Path) -> Result<()> { + match relative_file_path.components().next() { + None => anyhow::bail!("repo path should not be empty"), + Some(Component::Prefix(_)) => anyhow::bail!( + "repo path `{}` should be relative, not a windows prefix", + relative_file_path.to_string_lossy() + ), + Some(Component::RootDir) => { + anyhow::bail!( + "repo path `{}` should be relative", + relative_file_path.to_string_lossy() + ) + } + Some(Component::CurDir) => { + anyhow::bail!( + "repo path `{}` should not start with `.`", + relative_file_path.to_string_lossy() + ) + } + Some(Component::ParentDir) => { + anyhow::bail!( + "repo path `{}` should not start with `..`", + relative_file_path.to_string_lossy() + ) + } + _ => Ok(()), + } +} + #[test] fn test_branches_parsing() { // suppress "help: octal escapes are not supported, `\0` is always null" diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 8e50012296..d9f775bafd 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -6,7 +6,6 @@ use crate::{ }; use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry}; use anyhow::{anyhow, bail, Context as _, Result}; -use buffer_diff::BufferDiff; use client::Client; use collections::{hash_map, HashMap, HashSet}; use fs::Fs; @@ -20,7 +19,7 @@ use language::{ deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version, split_operations, }, - Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation, + Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation, }; use rpc::{ proto::{self, ToProto}, @@ -38,22 +37,13 @@ use std::{ }; use text::BufferId; use util::{debug_panic, maybe, ResultExt as _, TryFutureExt}; -use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId}; - -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -enum DiffKind { - Unstaged, - Uncommitted, -} +use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId}; /// A set of open buffers. pub struct BufferStore { state: BufferStoreState, #[allow(clippy::type_complexity)] loading_buffers: HashMap, Arc>>>>, - #[allow(clippy::type_complexity)] - loading_diffs: - HashMap<(BufferId, DiffKind), Shared, Arc>>>>, worktree_store: Entity, opened_buffers: HashMap, downstream_client: Option<(AnyProtoClient, u64)>, @@ -63,238 +53,9 @@ pub struct BufferStore { #[derive(Hash, Eq, PartialEq, Clone)] struct SharedBuffer { buffer: Entity, - diff: Option>, lsp_handle: Option, } -#[derive(Default)] -struct BufferDiffState { - unstaged_diff: Option>, - uncommitted_diff: Option>, - recalculate_diff_task: Option>>, - language: Option>, - language_registry: Option>, - diff_updated_futures: Vec>, - - head_text: Option>, - index_text: Option>, - head_changed: bool, - index_changed: bool, - language_changed: bool, -} - -#[derive(Clone, Debug)] -enum DiffBasesChange { - SetIndex(Option), - SetHead(Option), - SetEach { - index: Option, - head: Option, - }, - SetBoth(Option), -} - -impl BufferDiffState { - fn buffer_language_changed(&mut self, buffer: Entity, cx: &mut Context) { - self.language = buffer.read(cx).language().cloned(); - self.language_changed = true; - let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx); - } - - fn unstaged_diff(&self) -> Option> { - self.unstaged_diff.as_ref().and_then(|set| set.upgrade()) - } - - fn uncommitted_diff(&self) -> Option> { - self.uncommitted_diff.as_ref().and_then(|set| set.upgrade()) - } - - fn handle_base_texts_updated( - &mut self, - buffer: text::BufferSnapshot, - message: proto::UpdateDiffBases, - cx: &mut Context, - ) { - use proto::update_diff_bases::Mode; - - let Some(mode) = Mode::from_i32(message.mode) else { - return; - }; - - let diff_bases_change = match mode { - Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text), - Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text), - Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text), - Mode::IndexAndHead => DiffBasesChange::SetEach { - index: message.staged_text, - head: message.committed_text, - }, - }; - - let _ = self.diff_bases_changed(buffer, diff_bases_change, cx); - } - - pub fn wait_for_recalculation(&mut self) -> Option> { - if self.diff_updated_futures.is_empty() { - return None; - } - let (tx, rx) = oneshot::channel(); - self.diff_updated_futures.push(tx); - Some(rx) - } - - fn diff_bases_changed( - &mut self, - buffer: text::BufferSnapshot, - diff_bases_change: DiffBasesChange, - cx: &mut Context, - ) -> oneshot::Receiver<()> { - match diff_bases_change { - DiffBasesChange::SetIndex(index) => { - self.index_text = index.map(|mut index| { - text::LineEnding::normalize(&mut index); - Arc::new(index) - }); - self.index_changed = true; - } - DiffBasesChange::SetHead(head) => { - self.head_text = head.map(|mut head| { - text::LineEnding::normalize(&mut head); - Arc::new(head) - }); - self.head_changed = true; - } - DiffBasesChange::SetBoth(text) => { - let text = text.map(|mut text| { - text::LineEnding::normalize(&mut text); - Arc::new(text) - }); - self.head_text = text.clone(); - self.index_text = text; - self.head_changed = true; - self.index_changed = true; - } - DiffBasesChange::SetEach { index, head } => { - self.index_text = index.map(|mut index| { - text::LineEnding::normalize(&mut index); - Arc::new(index) - }); - self.index_changed = true; - self.head_text = head.map(|mut head| { - text::LineEnding::normalize(&mut head); - Arc::new(head) - }); - self.head_changed = true; - } - } - - self.recalculate_diffs(buffer, cx) - } - - fn recalculate_diffs( - &mut self, - buffer: text::BufferSnapshot, - cx: &mut Context, - ) -> oneshot::Receiver<()> { - log::debug!("recalculate diffs"); - let (tx, rx) = oneshot::channel(); - self.diff_updated_futures.push(tx); - - let language = self.language.clone(); - let language_registry = self.language_registry.clone(); - let unstaged_diff = self.unstaged_diff(); - let uncommitted_diff = self.uncommitted_diff(); - let head = self.head_text.clone(); - let index = self.index_text.clone(); - let index_changed = self.index_changed; - let head_changed = self.head_changed; - let language_changed = self.language_changed; - let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) { - (Some(index), Some(head)) => Arc::ptr_eq(index, head), - (None, None) => true, - _ => false, - }; - self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move { - let mut new_unstaged_diff = None; - if let Some(unstaged_diff) = &unstaged_diff { - new_unstaged_diff = Some( - BufferDiff::update_diff( - unstaged_diff.clone(), - buffer.clone(), - index, - index_changed, - language_changed, - language.clone(), - language_registry.clone(), - &mut cx, - ) - .await?, - ); - } - - let mut new_uncommitted_diff = None; - if let Some(uncommitted_diff) = &uncommitted_diff { - new_uncommitted_diff = if index_matches_head { - new_unstaged_diff.clone() - } else { - Some( - BufferDiff::update_diff( - uncommitted_diff.clone(), - buffer.clone(), - head, - head_changed, - language_changed, - language.clone(), - language_registry.clone(), - &mut cx, - ) - .await?, - ) - } - } - - let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) = - unstaged_diff.as_ref().zip(new_unstaged_diff.clone()) - { - unstaged_diff.update(&mut cx, |diff, cx| { - diff.set_snapshot(&buffer, new_unstaged_diff, language_changed, None, cx) - })? - } else { - None - }; - - if let Some((uncommitted_diff, new_uncommitted_diff)) = - uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone()) - { - uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| { - uncommitted_diff.set_snapshot( - &buffer, - new_uncommitted_diff, - language_changed, - unstaged_changed_range, - cx, - ); - })?; - } - - if let Some(this) = this.upgrade() { - this.update(&mut cx, |this, _| { - this.index_changed = false; - this.head_changed = false; - this.language_changed = false; - for tx in this.diff_updated_futures.drain(..) { - tx.send(()).ok(); - } - })?; - } - - Ok(()) - })); - - rx - } -} - enum BufferStoreState { Local(LocalBufferStore), Remote(RemoteBufferStore), @@ -318,16 +79,13 @@ struct LocalBufferStore { } enum OpenBuffer { - Complete { - buffer: WeakEntity, - diff_state: Entity, - }, + Complete { buffer: WeakEntity }, Operations(Vec), } pub enum BufferStoreEvent { BufferAdded(Entity), - BufferDiffAdded(Entity), + SharedBufferClosed(proto::PeerId, BufferId), BufferDropped(BufferId), BufferChangedFilePath { buffer: Entity, @@ -341,48 +99,6 @@ pub struct ProjectTransaction(pub HashMap, language::Transaction> impl EventEmitter for BufferStore {} impl RemoteBufferStore { - fn open_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Task>> { - let project_id = self.project_id; - let client = self.upstream_client.clone(); - cx.background_spawn(async move { - let response = client - .request(proto::OpenUnstagedDiff { - project_id, - buffer_id: buffer_id.to_proto(), - }) - .await?; - Ok(response.staged_text) - }) - } - - fn open_uncommitted_diff( - &self, - buffer_id: BufferId, - cx: &App, - ) -> Task> { - use proto::open_uncommitted_diff_response::Mode; - - let project_id = self.project_id; - let client = self.upstream_client.clone(); - cx.background_spawn(async move { - let response = client - .request(proto::OpenUncommittedDiff { - project_id, - buffer_id: buffer_id.to_proto(), - }) - .await?; - let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?; - let bases = match mode { - Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text), - Mode::IndexAndHead => DiffBasesChange::SetEach { - head: response.committed_text, - index: response.staged_text, - }, - }; - Ok(bases) - }) - } - pub fn wait_for_remote_buffer( &mut self, id: BufferId, @@ -647,41 +363,6 @@ impl RemoteBufferStore { } impl LocalBufferStore { - fn worktree_for_buffer( - &self, - buffer: &Entity, - cx: &App, - ) -> Option<(Entity, Arc)> { - let file = buffer.read(cx).file()?; - let worktree_id = file.worktree_id(cx); - let path = file.path().clone(); - let worktree = self - .worktree_store - .read(cx) - .worktree_for_id(worktree_id, cx)?; - Some((worktree, path)) - } - - fn load_staged_text(&self, buffer: &Entity, cx: &App) -> Task>> { - if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) { - worktree.read(cx).load_staged_file(path.as_ref(), cx) - } else { - return Task::ready(Err(anyhow!("no such worktree"))); - } - } - - fn load_committed_text( - &self, - buffer: &Entity, - cx: &App, - ) -> Task>> { - if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) { - worktree.read(cx).load_committed_file(path.as_ref(), cx) - } else { - Task::ready(Err(anyhow!("no such worktree"))) - } - } - fn save_local_buffer( &self, buffer_handle: Entity, @@ -751,14 +432,6 @@ impl LocalBufferStore { worktree::Event::UpdatedEntries(changes) => { Self::local_worktree_entries_changed(this, &worktree, changes, cx); } - worktree::Event::UpdatedGitRepositories(updated_repos) => { - Self::local_worktree_git_repos_changed( - this, - worktree.clone(), - updated_repos, - cx, - ) - } _ => {} } } @@ -785,170 +458,6 @@ impl LocalBufferStore { } } - fn local_worktree_git_repos_changed( - this: &mut BufferStore, - worktree_handle: Entity, - changed_repos: &UpdatedGitRepositoriesSet, - cx: &mut Context, - ) { - debug_assert!(worktree_handle.read(cx).is_local()); - - let mut diff_state_updates = Vec::new(); - for buffer in this.opened_buffers.values() { - let OpenBuffer::Complete { buffer, diff_state } = buffer else { - continue; - }; - let Some(buffer) = buffer.upgrade() else { - continue; - }; - let Some(file) = File::from_dyn(buffer.read(cx).file()) else { - continue; - }; - if file.worktree != worktree_handle { - continue; - } - let diff_state = diff_state.read(cx); - if changed_repos - .iter() - .any(|(work_dir, _)| file.path.starts_with(work_dir)) - { - let has_unstaged_diff = diff_state - .unstaged_diff - .as_ref() - .is_some_and(|diff| diff.is_upgradable()); - let has_uncommitted_diff = diff_state - .uncommitted_diff - .as_ref() - .is_some_and(|set| set.is_upgradable()); - diff_state_updates.push(( - buffer, - file.path.clone(), - has_unstaged_diff.then(|| diff_state.index_text.clone()), - has_uncommitted_diff.then(|| diff_state.head_text.clone()), - )); - } - } - - if diff_state_updates.is_empty() { - return; - } - - cx.spawn(move |this, mut cx| async move { - let snapshot = - worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?; - let diff_bases_changes_by_buffer = cx - .spawn(async move |cx| { - let mut results = Vec::new(); - for (buffer, path, current_index_text, current_head_text) in diff_state_updates - { - let Some(local_repo) = snapshot.local_repo_for_path(&path) else { - continue; - }; - let Some(relative_path) = local_repo.relativize(&path).ok() else { - continue; - }; - let index_text = if current_index_text.is_some() { - local_repo - .repo() - .load_index_text(relative_path.clone(), cx.clone()) - .await - } else { - None - }; - let head_text = if current_head_text.is_some() { - local_repo - .repo() - .load_committed_text(relative_path, cx.clone()) - .await - } else { - None - }; - - // Avoid triggering a diff update if the base text has not changed. - if let Some((current_index, current_head)) = - current_index_text.as_ref().zip(current_head_text.as_ref()) - { - if current_index.as_deref() == index_text.as_ref() - && current_head.as_deref() == head_text.as_ref() - { - continue; - } - } - - let diff_bases_change = - match (current_index_text.is_some(), current_head_text.is_some()) { - (true, true) => Some(if index_text == head_text { - DiffBasesChange::SetBoth(head_text) - } else { - DiffBasesChange::SetEach { - index: index_text, - head: head_text, - } - }), - (true, false) => Some(DiffBasesChange::SetIndex(index_text)), - (false, true) => Some(DiffBasesChange::SetHead(head_text)), - (false, false) => None, - }; - - results.push((buffer, diff_bases_change)) - } - - results - }) - .await; - - this.update(&mut cx, |this, cx| { - for (buffer, diff_bases_change) in diff_bases_changes_by_buffer { - let Some(OpenBuffer::Complete { diff_state, .. }) = - this.opened_buffers.get_mut(&buffer.read(cx).remote_id()) - else { - continue; - }; - let Some(diff_bases_change) = diff_bases_change else { - continue; - }; - - diff_state.update(cx, |diff_state, cx| { - use proto::update_diff_bases::Mode; - - let buffer = buffer.read(cx); - if let Some((client, project_id)) = this.downstream_client.as_ref() { - let buffer_id = buffer.remote_id().to_proto(); - let (staged_text, committed_text, mode) = match diff_bases_change - .clone() - { - DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly), - DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly), - DiffBasesChange::SetEach { index, head } => { - (index, head, Mode::IndexAndHead) - } - DiffBasesChange::SetBoth(text) => { - (None, text, Mode::IndexMatchesHead) - } - }; - let message = proto::UpdateDiffBases { - project_id: *project_id, - buffer_id, - staged_text, - committed_text, - mode: mode as i32, - }; - - client.send(message).log_err(); - } - - let _ = diff_state.diff_bases_changed( - buffer.text_snapshot(), - diff_bases_change, - cx, - ); - }); - } - }) - }) - .detach_and_log_err(cx); - } - fn local_worktree_entry_changed( this: &mut BufferStore, entry_id: ProjectEntryId, @@ -1246,9 +755,6 @@ impl BufferStore { client.add_entity_request_handler(Self::handle_blame_buffer); client.add_entity_request_handler(Self::handle_reload_buffers); client.add_entity_request_handler(Self::handle_get_permalink_to_line); - client.add_entity_request_handler(Self::handle_open_unstaged_diff); - client.add_entity_request_handler(Self::handle_open_uncommitted_diff); - client.add_entity_message_handler(Self::handle_update_diff_bases); } /// Creates a buffer store, optionally retaining its buffers. @@ -1269,7 +775,6 @@ impl BufferStore { opened_buffers: Default::default(), shared_buffers: Default::default(), loading_buffers: Default::default(), - loading_diffs: Default::default(), worktree_store, } } @@ -1292,7 +797,6 @@ impl BufferStore { downstream_client: None, opened_buffers: Default::default(), loading_buffers: Default::default(), - loading_diffs: Default::default(), shared_buffers: Default::default(), worktree_store, } @@ -1364,198 +868,19 @@ impl BufferStore { cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) }) } - pub fn open_unstaged_diff( - &mut self, - buffer: Entity, - cx: &mut Context, - ) -> Task>> { - let buffer_id = buffer.read(cx).remote_id(); - if let Some(OpenBuffer::Complete { diff_state, .. }) = self.opened_buffers.get(&buffer_id) { - if let Some(unstaged_diff) = diff_state - .read(cx) - .unstaged_diff - .as_ref() - .and_then(|weak| weak.upgrade()) - { - if let Some(task) = - diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation()) - { - return cx.background_executor().spawn(async move { - task.await?; - Ok(unstaged_diff) - }); - } - return Task::ready(Ok(unstaged_diff)); - } - } - - let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) { - hash_map::Entry::Occupied(e) => e.get().clone(), - hash_map::Entry::Vacant(entry) => { - let staged_text = match &self.state { - BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx), - BufferStoreState::Remote(this) => this.open_unstaged_diff(buffer_id, cx), - }; - - entry - .insert( - cx.spawn(move |this, cx| async move { - Self::open_diff_internal( - this, - DiffKind::Unstaged, - staged_text.await.map(DiffBasesChange::SetIndex), - buffer, - cx, - ) - .await - .map_err(Arc::new) - }) - .shared(), - ) - .clone() - } - }; - - cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) }) - } - - pub fn open_uncommitted_diff( - &mut self, - buffer: Entity, - cx: &mut Context, - ) -> Task>> { - let buffer_id = buffer.read(cx).remote_id(); - - if let Some(OpenBuffer::Complete { diff_state, .. }) = self.opened_buffers.get(&buffer_id) { - if let Some(uncommitted_diff) = diff_state - .read(cx) - .uncommitted_diff - .as_ref() - .and_then(|weak| weak.upgrade()) - { - if let Some(task) = - diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation()) - { - return cx.background_executor().spawn(async move { - task.await?; - Ok(uncommitted_diff) - }); - } - return Task::ready(Ok(uncommitted_diff)); - } - } - - let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) { - hash_map::Entry::Occupied(e) => e.get().clone(), - hash_map::Entry::Vacant(entry) => { - let changes = match &self.state { - BufferStoreState::Local(this) => { - let committed_text = this.load_committed_text(&buffer, cx); - let staged_text = this.load_staged_text(&buffer, cx); - cx.background_spawn(async move { - let committed_text = committed_text.await?; - let staged_text = staged_text.await?; - let diff_bases_change = if committed_text == staged_text { - DiffBasesChange::SetBoth(committed_text) - } else { - DiffBasesChange::SetEach { - index: staged_text, - head: committed_text, - } - }; - Ok(diff_bases_change) - }) - } - BufferStoreState::Remote(this) => this.open_uncommitted_diff(buffer_id, cx), - }; - - entry - .insert( - cx.spawn(move |this, cx| async move { - Self::open_diff_internal( - this, - DiffKind::Uncommitted, - changes.await, - buffer, - cx, - ) - .await - .map_err(Arc::new) - }) - .shared(), - ) - .clone() - } - }; - - cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) }) - } - - async fn open_diff_internal( - this: WeakEntity, - kind: DiffKind, - texts: Result, - buffer_entity: Entity, - mut cx: AsyncApp, - ) -> Result> { - let diff_bases_change = match texts { - Err(e) => { - this.update(&mut cx, |this, cx| { - let buffer = buffer_entity.read(cx); - let buffer_id = buffer.remote_id(); - this.loading_diffs.remove(&(buffer_id, kind)); - })?; - return Err(e); - } - Ok(change) => change, - }; - - this.update(&mut cx, |this, cx| { - let buffer = buffer_entity.read(cx); - let buffer_id = buffer.remote_id(); - let language = buffer.language().cloned(); - let language_registry = buffer.language_registry(); - let text_snapshot = buffer.text_snapshot(); - this.loading_diffs.remove(&(buffer_id, kind)); - - if let Some(OpenBuffer::Complete { diff_state, .. }) = - this.opened_buffers.get_mut(&buffer_id) - { - let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx)); - cx.emit(BufferStoreEvent::BufferDiffAdded(diff.clone())); - diff_state.update(cx, |diff_state, cx| { - diff_state.language = language; - diff_state.language_registry = language_registry; - - match kind { - DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()), - DiffKind::Uncommitted => { - let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() { - diff - } else { - let unstaged_diff = - cx.new(|cx| BufferDiff::new(&text_snapshot, cx)); - diff_state.unstaged_diff = Some(unstaged_diff.downgrade()); - unstaged_diff - }; - - diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff)); - diff_state.uncommitted_diff = Some(diff.downgrade()) - } - }; - - let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx); - - Ok(async move { - rx.await.ok(); - Ok(diff) - }) - }) - } else { - Err(anyhow!("buffer was closed")) - } - })?? - .await + pub(crate) fn worktree_for_buffer( + &self, + buffer: &Entity, + cx: &App, + ) -> Option<(Entity, Arc)> { + let file = buffer.read(cx).file()?; + let worktree_id = file.worktree_id(cx); + let path = file.path().clone(); + let worktree = self + .worktree_store + .read(cx) + .worktree_for_id(worktree_id, cx)?; + Some((worktree, path)) } pub fn create_buffer(&mut self, cx: &mut Context) -> Task>> { @@ -1765,17 +1090,10 @@ impl BufferStore { fn add_buffer(&mut self, buffer_entity: Entity, cx: &mut Context) -> Result<()> { let buffer = buffer_entity.read(cx); - let language = buffer.language().cloned(); - let language_registry = buffer.language_registry(); let remote_id = buffer.remote_id(); let is_remote = buffer.replica_id() != 0; let open_buffer = OpenBuffer::Complete { buffer: buffer_entity.downgrade(), - diff_state: cx.new(|_| BufferDiffState { - language, - language_registry, - ..Default::default() - }), }; let handle = cx.entity().downgrade(); @@ -1856,26 +1174,6 @@ impl BufferStore { }) } - pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option> { - if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? { - diff_state.read(cx).unstaged_diff.as_ref()?.upgrade() - } else { - None - } - } - - pub fn get_uncommitted_diff( - &self, - buffer_id: BufferId, - cx: &App, - ) -> Option> { - if let OpenBuffer::Complete { diff_state, .. } = self.opened_buffers.get(&buffer_id)? { - diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade() - } else { - None - } - } - pub fn buffer_version_info(&self, cx: &App) -> (Vec, Vec) { let buffers = self .buffers() @@ -1983,27 +1281,6 @@ impl BufferStore { rx } - pub fn recalculate_buffer_diffs( - &mut self, - buffers: Vec>, - cx: &mut Context, - ) -> impl Future { - let mut futures = Vec::new(); - for buffer in buffers { - if let Some(OpenBuffer::Complete { diff_state, .. }) = - self.opened_buffers.get_mut(&buffer.read(cx).remote_id()) - { - let buffer = buffer.read(cx).text_snapshot(); - futures.push(diff_state.update(cx, |diff_state, cx| { - diff_state.recalculate_diffs(buffer, cx) - })); - } - } - async move { - futures::future::join_all(futures).await; - } - } - fn on_buffer_event( &mut self, buffer: Entity, @@ -2031,16 +1308,7 @@ impl BufferStore { }) .log_err(); } - BufferEvent::LanguageChanged => { - let buffer_id = buffer.read(cx).remote_id(); - if let Some(OpenBuffer::Complete { diff_state, .. }) = - self.opened_buffers.get(&buffer_id) - { - diff_state.update(cx, |diff_state, cx| { - diff_state.buffer_language_changed(buffer, cx); - }); - } - } + BufferEvent::LanguageChanged => {} _ => {} } } @@ -2115,7 +1383,6 @@ impl BufferStore { .entry(buffer_id) .or_insert_with(|| SharedBuffer { buffer: buffer.clone(), - diff: None, lsp_handle: None, }); @@ -2295,9 +1562,10 @@ impl BufferStore { ) -> Result<()> { let peer_id = envelope.sender_id; let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - this.update(&mut cx, |this, _| { + this.update(&mut cx, |this, cx| { if let Some(shared) = this.shared_buffers.get_mut(&peer_id) { if shared.remove(&buffer_id).is_some() { + cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id)); if shared.is_empty() { this.shared_buffers.remove(&peer_id); } @@ -2419,117 +1687,6 @@ impl BufferStore { }) } - pub async fn handle_open_unstaged_diff( - this: Entity, - request: TypedEnvelope, - mut cx: AsyncApp, - ) -> Result { - let buffer_id = BufferId::new(request.payload.buffer_id)?; - let diff = this - .update(&mut cx, |this, cx| { - let buffer = this.get(buffer_id)?; - Some(this.open_unstaged_diff(buffer, cx)) - })? - .ok_or_else(|| anyhow!("no such buffer"))? - .await?; - this.update(&mut cx, |this, _| { - let shared_buffers = this - .shared_buffers - .entry(request.original_sender_id.unwrap_or(request.sender_id)) - .or_default(); - debug_assert!(shared_buffers.contains_key(&buffer_id)); - if let Some(shared) = shared_buffers.get_mut(&buffer_id) { - shared.diff = Some(diff.clone()); - } - })?; - let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?; - Ok(proto::OpenUnstagedDiffResponse { staged_text }) - } - - pub async fn handle_open_uncommitted_diff( - this: Entity, - request: TypedEnvelope, - mut cx: AsyncApp, - ) -> Result { - let buffer_id = BufferId::new(request.payload.buffer_id)?; - let diff = this - .update(&mut cx, |this, cx| { - let buffer = this.get(buffer_id)?; - Some(this.open_uncommitted_diff(buffer, cx)) - })? - .ok_or_else(|| anyhow!("no such buffer"))? - .await?; - this.update(&mut cx, |this, _| { - let shared_buffers = this - .shared_buffers - .entry(request.original_sender_id.unwrap_or(request.sender_id)) - .or_default(); - debug_assert!(shared_buffers.contains_key(&buffer_id)); - if let Some(shared) = shared_buffers.get_mut(&buffer_id) { - shared.diff = Some(diff.clone()); - } - })?; - diff.read_with(&cx, |diff, cx| { - use proto::open_uncommitted_diff_response::Mode; - - let unstaged_diff = diff.secondary_diff(); - let index_snapshot = unstaged_diff.and_then(|diff| { - let diff = diff.read(cx); - diff.base_text_exists().then(|| diff.base_text()) - }); - - let mode; - let staged_text; - let committed_text; - if diff.base_text_exists() { - let committed_snapshot = diff.base_text(); - committed_text = Some(committed_snapshot.text()); - if let Some(index_text) = index_snapshot { - if index_text.remote_id() == committed_snapshot.remote_id() { - mode = Mode::IndexMatchesHead; - staged_text = None; - } else { - mode = Mode::IndexAndHead; - staged_text = Some(index_text.text()); - } - } else { - mode = Mode::IndexAndHead; - staged_text = None; - } - } else { - mode = Mode::IndexAndHead; - committed_text = None; - staged_text = index_snapshot.as_ref().map(|buffer| buffer.text()); - } - - proto::OpenUncommittedDiffResponse { - committed_text, - staged_text, - mode: mode.into(), - } - }) - } - - pub async fn handle_update_diff_bases( - this: Entity, - request: TypedEnvelope, - mut cx: AsyncApp, - ) -> Result<()> { - let buffer_id = BufferId::new(request.payload.buffer_id)?; - this.update(&mut cx, |this, cx| { - if let Some(OpenBuffer::Complete { diff_state, buffer }) = - this.opened_buffers.get_mut(&buffer_id) - { - if let Some(buffer) = buffer.upgrade() { - let buffer = buffer.read(cx).text_snapshot(); - diff_state.update(cx, |diff_state, cx| { - diff_state.handle_base_texts_updated(buffer, request.payload, cx); - }) - } - } - }) - } - pub fn reload_buffers( &self, buffers: HashSet>, @@ -2584,7 +1741,6 @@ impl BufferStore { buffer_id, SharedBuffer { buffer: buffer.clone(), - diff: None, lsp_handle: None, }, ); diff --git a/crates/project/src/git.rs b/crates/project/src/git.rs index f459e2c5c4..f0b4ca10e5 100644 --- a/crates/project/src/git.rs +++ b/crates/project/src/git.rs @@ -3,16 +3,16 @@ use crate::{ worktree_store::{WorktreeStore, WorktreeStoreEvent}, Project, ProjectEnvironment, ProjectItem, ProjectPath, }; -use anyhow::{Context as _, Result}; +use anyhow::{anyhow, Context as _, Result}; use askpass::{AskPassDelegate, AskPassSession}; -use buffer_diff::BufferDiffEvent; +use buffer_diff::{BufferDiff, BufferDiffEvent}; use client::ProjectId; use collections::HashMap; use fs::Fs; use futures::{ channel::{mpsc, oneshot}, - future::OptionFuture, - StreamExt as _, + future::{OptionFuture, Shared}, + FutureExt as _, StreamExt as _, }; use git::repository::DiffType; use git::{ @@ -26,15 +26,15 @@ use gpui::{ App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task, WeakEntity, }; -use language::{Buffer, LanguageRegistry}; +use language::{Buffer, BufferEvent, Language, LanguageRegistry}; use parking_lot::Mutex; use rpc::{ - proto::{self, git_reset, ToProto}, + proto::{self, git_reset, ToProto, SSH_PROJECT_ID}, AnyProtoClient, TypedEnvelope, }; use settings::WorktreeId; use std::{ - collections::VecDeque, + collections::{hash_map, VecDeque}, future::Future, path::{Path, PathBuf}, sync::Arc, @@ -42,27 +42,75 @@ use std::{ use text::BufferId; use util::{debug_panic, maybe, ResultExt}; -use worktree::{ProjectEntryId, RepositoryEntry, StatusEntry, WorkDirectory}; +use worktree::{ + File, ProjectEntryId, RepositoryEntry, StatusEntry, UpdatedGitRepositoriesSet, WorkDirectory, + Worktree, +}; pub struct GitStore { state: GitStoreState, buffer_store: Entity, repositories: Vec>, + #[allow(clippy::type_complexity)] + loading_diffs: + HashMap<(BufferId, DiffKind), Shared, Arc>>>>, + diffs: HashMap>, active_index: Option, update_sender: mpsc::UnboundedSender, + shared_diffs: HashMap>, _subscriptions: [Subscription; 2], } +#[derive(Default)] +struct SharedDiffs { + unstaged: Option>, + uncommitted: Option>, +} + +#[derive(Default)] +struct BufferDiffState { + unstaged_diff: Option>, + uncommitted_diff: Option>, + recalculate_diff_task: Option>>, + language: Option>, + language_registry: Option>, + diff_updated_futures: Vec>, + + head_text: Option>, + index_text: Option>, + head_changed: bool, + index_changed: bool, + language_changed: bool, +} + +#[derive(Clone, Debug)] +enum DiffBasesChange { + SetIndex(Option), + SetHead(Option), + SetEach { + index: Option, + head: Option, + }, + SetBoth(Option), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +enum DiffKind { + Unstaged, + Uncommitted, +} + enum GitStoreState { Local { - client: AnyProtoClient, + downstream_client: Option<(AnyProtoClient, ProjectId)>, environment: Entity, fs: Arc, }, Ssh { - environment: Entity, upstream_client: AnyProtoClient, - project_id: ProjectId, + upstream_project_id: ProjectId, + downstream_client: Option<(AnyProtoClient, ProjectId)>, + environment: Entity, }, Remote { upstream_client: AnyProtoClient, @@ -123,29 +171,18 @@ impl GitStore { buffer_store: Entity, environment: Entity, fs: Arc, - client: AnyProtoClient, - cx: &mut Context<'_, Self>, + cx: &mut Context, ) -> Self { - let update_sender = Self::spawn_git_worker(cx); - let _subscriptions = [ - cx.subscribe(worktree_store, Self::on_worktree_store_event), - cx.subscribe(&buffer_store, Self::on_buffer_store_event), - ]; - - let state = GitStoreState::Local { - client, - environment, - fs, - }; - - GitStore { - state, + Self::new( + worktree_store, buffer_store, - repositories: Vec::new(), - active_index: None, - update_sender, - _subscriptions, - } + GitStoreState::Local { + downstream_client: None, + environment, + fs, + }, + cx, + ) } pub fn remote( @@ -153,27 +190,17 @@ impl GitStore { buffer_store: Entity, upstream_client: AnyProtoClient, project_id: ProjectId, - cx: &mut Context<'_, Self>, + cx: &mut Context, ) -> Self { - let update_sender = Self::spawn_git_worker(cx); - let _subscriptions = [ - cx.subscribe(worktree_store, Self::on_worktree_store_event), - cx.subscribe(&buffer_store, Self::on_buffer_store_event), - ]; - - let state = GitStoreState::Remote { - upstream_client, - project_id, - }; - - GitStore { - state, + Self::new( + worktree_store, buffer_store, - repositories: Vec::new(), - active_index: None, - update_sender, - _subscriptions, - } + GitStoreState::Remote { + upstream_client, + project_id, + }, + cx, + ) } pub fn ssh( @@ -181,8 +208,26 @@ impl GitStore { buffer_store: Entity, environment: Entity, upstream_client: AnyProtoClient, - project_id: ProjectId, - cx: &mut Context<'_, Self>, + cx: &mut Context, + ) -> Self { + Self::new( + worktree_store, + buffer_store, + GitStoreState::Ssh { + upstream_client, + upstream_project_id: ProjectId(SSH_PROJECT_ID), + downstream_client: None, + environment, + }, + cx, + ) + } + + fn new( + worktree_store: &Entity, + buffer_store: Entity, + state: GitStoreState, + cx: &mut Context, ) -> Self { let update_sender = Self::spawn_git_worker(cx); let _subscriptions = [ @@ -190,12 +235,6 @@ impl GitStore { cx.subscribe(&buffer_store, Self::on_buffer_store_event), ]; - let state = GitStoreState::Ssh { - upstream_client, - project_id, - environment, - }; - GitStore { state, buffer_store, @@ -203,6 +242,9 @@ impl GitStore { active_index: None, update_sender, _subscriptions, + loading_diffs: HashMap::default(), + shared_diffs: HashMap::default(), + diffs: HashMap::default(), } } @@ -226,6 +268,50 @@ impl GitStore { client.add_entity_request_handler(Self::handle_askpass); client.add_entity_request_handler(Self::handle_check_for_pushed_commits); client.add_entity_request_handler(Self::handle_git_diff); + client.add_entity_request_handler(Self::handle_open_unstaged_diff); + client.add_entity_request_handler(Self::handle_open_uncommitted_diff); + client.add_entity_message_handler(Self::handle_update_diff_bases); + } + + pub fn is_local(&self) -> bool { + matches!(self.state, GitStoreState::Local { .. }) + } + + pub fn shared(&mut self, remote_id: u64, client: AnyProtoClient, _cx: &mut App) { + match &mut self.state { + GitStoreState::Local { + downstream_client, .. + } + | GitStoreState::Ssh { + downstream_client, .. + } => { + *downstream_client = Some((client, ProjectId(remote_id))); + } + GitStoreState::Remote { .. } => { + debug_panic!("shared called on remote store"); + } + } + } + + pub fn unshared(&mut self, _cx: &mut Context) { + match &mut self.state { + GitStoreState::Local { + downstream_client, .. + } + | GitStoreState::Ssh { + downstream_client, .. + } => { + downstream_client.take(); + } + GitStoreState::Remote { .. } => { + debug_panic!("unshared called on remote store"); + } + } + self.shared_diffs.clear(); + } + + pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) { + self.shared_diffs.remove(peer_id); } pub fn active_repository(&self) -> Option> { @@ -233,15 +319,213 @@ impl GitStore { .map(|index| self.repositories[index].clone()) } - fn client(&self) -> AnyProtoClient { + pub fn open_unstaged_diff( + &mut self, + buffer: Entity, + cx: &mut Context, + ) -> Task>> { + let buffer_id = buffer.read(cx).remote_id(); + if let Some(diff_state) = self.diffs.get(&buffer_id) { + if let Some(unstaged_diff) = diff_state + .read(cx) + .unstaged_diff + .as_ref() + .and_then(|weak| weak.upgrade()) + { + if let Some(task) = + diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation()) + { + return cx.background_executor().spawn(async move { + task.await?; + Ok(unstaged_diff) + }); + } + return Task::ready(Ok(unstaged_diff)); + } + } + + let task = match self.loading_diffs.entry((buffer_id, DiffKind::Unstaged)) { + hash_map::Entry::Occupied(e) => e.get().clone(), + hash_map::Entry::Vacant(entry) => { + let staged_text = self.state.load_staged_text(&buffer, &self.buffer_store, cx); + entry + .insert( + cx.spawn(move |this, cx| async move { + Self::open_diff_internal( + this, + DiffKind::Unstaged, + staged_text.await.map(DiffBasesChange::SetIndex), + buffer, + cx, + ) + .await + .map_err(Arc::new) + }) + .shared(), + ) + .clone() + } + }; + + cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) }) + } + + pub fn open_uncommitted_diff( + &mut self, + buffer: Entity, + cx: &mut Context, + ) -> Task>> { + let buffer_id = buffer.read(cx).remote_id(); + + if let Some(diff_state) = self.diffs.get(&buffer_id) { + if let Some(uncommitted_diff) = diff_state + .read(cx) + .uncommitted_diff + .as_ref() + .and_then(|weak| weak.upgrade()) + { + if let Some(task) = + diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation()) + { + return cx.background_executor().spawn(async move { + task.await?; + Ok(uncommitted_diff) + }); + } + return Task::ready(Ok(uncommitted_diff)); + } + } + + let task = match self.loading_diffs.entry((buffer_id, DiffKind::Uncommitted)) { + hash_map::Entry::Occupied(e) => e.get().clone(), + hash_map::Entry::Vacant(entry) => { + let changes = self + .state + .load_committed_text(&buffer, &self.buffer_store, cx); + + entry + .insert( + cx.spawn(move |this, cx| async move { + Self::open_diff_internal( + this, + DiffKind::Uncommitted, + changes.await, + buffer, + cx, + ) + .await + .map_err(Arc::new) + }) + .shared(), + ) + .clone() + } + }; + + cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) }) + } + + async fn open_diff_internal( + this: WeakEntity, + kind: DiffKind, + texts: Result, + buffer_entity: Entity, + mut cx: AsyncApp, + ) -> Result> { + let diff_bases_change = match texts { + Err(e) => { + this.update(&mut cx, |this, cx| { + let buffer = buffer_entity.read(cx); + let buffer_id = buffer.remote_id(); + this.loading_diffs.remove(&(buffer_id, kind)); + })?; + return Err(e); + } + Ok(change) => change, + }; + + this.update(&mut cx, |this, cx| { + let buffer = buffer_entity.read(cx); + let buffer_id = buffer.remote_id(); + let language = buffer.language().cloned(); + let language_registry = buffer.language_registry(); + let text_snapshot = buffer.text_snapshot(); + this.loading_diffs.remove(&(buffer_id, kind)); + + let diff_state = this + .diffs + .entry(buffer_id) + .or_insert_with(|| cx.new(|_| BufferDiffState::default())); + + let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx)); + + cx.subscribe(&diff, Self::on_buffer_diff_event).detach(); + diff_state.update(cx, |diff_state, cx| { + diff_state.language = language; + diff_state.language_registry = language_registry; + + match kind { + DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()), + DiffKind::Uncommitted => { + let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() { + diff + } else { + let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx)); + diff_state.unstaged_diff = Some(unstaged_diff.downgrade()); + unstaged_diff + }; + + diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff)); + diff_state.uncommitted_diff = Some(diff.downgrade()) + } + } + + let rx = diff_state.diff_bases_changed(text_snapshot, diff_bases_change, cx); + + anyhow::Ok(async move { + rx.await.ok(); + Ok(diff) + }) + }) + })?? + .await + } + + pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option> { + let diff_state = self.diffs.get(&buffer_id)?; + diff_state.read(cx).unstaged_diff.as_ref()?.upgrade() + } + + pub fn get_uncommitted_diff( + &self, + buffer_id: BufferId, + cx: &App, + ) -> Option> { + let diff_state = self.diffs.get(&buffer_id)?; + diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade() + } + + fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> { match &self.state { - GitStoreState::Local { client, .. } => client.clone(), + GitStoreState::Local { + downstream_client, .. + } + | GitStoreState::Ssh { + downstream_client, .. + } => downstream_client.clone(), + GitStoreState::Remote { .. } => None, + } + } + + fn upstream_client(&self) -> Option { + match &self.state { + GitStoreState::Local { .. } => None, GitStoreState::Ssh { upstream_client, .. - } => upstream_client.clone(), - GitStoreState::Remote { + } + | GitStoreState::Remote { upstream_client, .. - } => upstream_client.clone(), + } => Some(upstream_client.clone()), } } @@ -256,7 +540,7 @@ impl GitStore { fn project_id(&self) -> Option { match &self.state { GitStoreState::Local { .. } => None, - GitStoreState::Ssh { project_id, .. } => Some(*project_id), + GitStoreState::Ssh { .. } => Some(ProjectId(proto::SSH_PROJECT_ID)), GitStoreState::Remote { project_id, .. } => Some(*project_id), } } @@ -265,12 +549,12 @@ impl GitStore { &mut self, worktree_store: Entity, event: &WorktreeStoreEvent, - cx: &mut Context<'_, Self>, + cx: &mut Context, ) { let mut new_repositories = Vec::new(); let mut new_active_index = None; let this = cx.weak_entity(); - let client = self.client(); + let upstream_client = self.upstream_client(); let project_id = self.project_id(); worktree_store.update(cx, |worktree_store, cx| { @@ -288,7 +572,10 @@ impl GitStore { ) }) .or_else(|| { - let client = client.clone(); + let client = upstream_client + .clone() + .context("no upstream client") + .log_err()?; let project_id = project_id?; Some(( GitRepo::Remote { @@ -373,33 +660,94 @@ impl GitStore { WorktreeStoreEvent::WorktreeUpdatedGitRepositories(_) => { cx.emit(GitEvent::GitStateUpdated); } + WorktreeStoreEvent::WorktreeAdded(worktree) => { + if self.is_local() { + cx.subscribe(worktree, Self::on_worktree_event).detach(); + } + } _ => { cx.emit(GitEvent::FileSystemUpdated); } } } + fn on_worktree_event( + &mut self, + worktree: Entity, + event: &worktree::Event, + cx: &mut Context, + ) { + if let worktree::Event::UpdatedGitRepositories(changed_repos) = event { + self.local_worktree_git_repos_changed(worktree, changed_repos, cx); + } + } + fn on_buffer_store_event( &mut self, _: Entity, event: &BufferStoreEvent, - cx: &mut Context<'_, Self>, + cx: &mut Context, ) { - if let BufferStoreEvent::BufferDiffAdded(diff) = event { - cx.subscribe(diff, Self::on_buffer_diff_event).detach(); + match event { + BufferStoreEvent::BufferAdded(buffer) => { + cx.subscribe(&buffer, |this, buffer, event, cx| { + if let BufferEvent::LanguageChanged = event { + let buffer_id = buffer.read(cx).remote_id(); + if let Some(diff_state) = this.diffs.get(&buffer_id) { + diff_state.update(cx, |diff_state, cx| { + diff_state.buffer_language_changed(buffer, cx); + }); + } + } + }) + .detach(); + } + BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => { + if let Some(diffs) = self.shared_diffs.get_mut(peer_id) { + diffs.remove(buffer_id); + } + } + BufferStoreEvent::BufferDropped(buffer_id) => { + self.diffs.remove(&buffer_id); + for diffs in self.shared_diffs.values_mut() { + diffs.remove(buffer_id); + } + } + + _ => {} + } + } + + pub fn recalculate_buffer_diffs( + &mut self, + buffers: Vec>, + cx: &mut Context, + ) -> impl Future { + let mut futures = Vec::new(); + for buffer in buffers { + if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) { + let buffer = buffer.read(cx).text_snapshot(); + futures.push(diff_state.update(cx, |diff_state, cx| { + diff_state.recalculate_diffs(buffer, cx) + })); + } + } + async move { + futures::future::join_all(futures).await; } } fn on_buffer_diff_event( - this: &mut GitStore, + &mut self, diff: Entity, event: &BufferDiffEvent, - cx: &mut Context<'_, GitStore>, + cx: &mut Context, ) { if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event { let buffer_id = diff.read(cx).buffer_id; - if let Some((repo, path)) = this.repository_and_path_for_buffer_id(buffer_id, cx) { + if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) { let recv = repo.update(cx, |repo, cx| { + log::debug!("updating index text for buffer {}", path.display()); repo.set_index_text( path, new_index_text.as_ref().map(|rope| rope.to_string()), @@ -425,6 +773,160 @@ impl GitStore { } } + fn local_worktree_git_repos_changed( + &mut self, + worktree: Entity, + changed_repos: &UpdatedGitRepositoriesSet, + cx: &mut Context, + ) { + debug_assert!(worktree.read(cx).is_local()); + + let mut diff_state_updates = Vec::new(); + for (buffer_id, diff_state) in &self.diffs { + let Some(buffer) = self.buffer_store.read(cx).get(*buffer_id) else { + continue; + }; + let Some(file) = File::from_dyn(buffer.read(cx).file()) else { + continue; + }; + if file.worktree != worktree + || !changed_repos + .iter() + .any(|(work_dir, _)| file.path.starts_with(work_dir)) + { + continue; + } + + let diff_state = diff_state.read(cx); + let has_unstaged_diff = diff_state + .unstaged_diff + .as_ref() + .is_some_and(|diff| diff.is_upgradable()); + let has_uncommitted_diff = diff_state + .uncommitted_diff + .as_ref() + .is_some_and(|set| set.is_upgradable()); + diff_state_updates.push(( + buffer, + file.path.clone(), + has_unstaged_diff.then(|| diff_state.index_text.clone()), + has_uncommitted_diff.then(|| diff_state.head_text.clone()), + )); + } + + if diff_state_updates.is_empty() { + return; + } + + cx.spawn(move |this, mut cx| async move { + let snapshot = + worktree.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?; + + let mut diff_bases_changes_by_buffer = Vec::new(); + for (buffer, path, current_index_text, current_head_text) in diff_state_updates { + log::debug!("reloading git state for buffer {}", path.display()); + let Some(local_repo) = snapshot.local_repo_for_path(&path) else { + continue; + }; + let Some(relative_path) = local_repo.relativize(&path).ok() else { + continue; + }; + let index_text = if current_index_text.is_some() { + local_repo + .repo() + .load_index_text(relative_path.clone(), cx.clone()) + .await + } else { + None + }; + let head_text = if current_head_text.is_some() { + local_repo + .repo() + .load_committed_text(relative_path, cx.clone()) + .await + } else { + None + }; + + // Avoid triggering a diff update if the base text has not changed. + if let Some((current_index, current_head)) = + current_index_text.as_ref().zip(current_head_text.as_ref()) + { + if current_index.as_deref() == index_text.as_ref() + && current_head.as_deref() == head_text.as_ref() + { + continue; + } + } + + let diff_bases_change = + match (current_index_text.is_some(), current_head_text.is_some()) { + (true, true) => Some(if index_text == head_text { + DiffBasesChange::SetBoth(head_text) + } else { + DiffBasesChange::SetEach { + index: index_text, + head: head_text, + } + }), + (true, false) => Some(DiffBasesChange::SetIndex(index_text)), + (false, true) => Some(DiffBasesChange::SetHead(head_text)), + (false, false) => None, + }; + + diff_bases_changes_by_buffer.push((buffer, diff_bases_change)) + } + + this.update(&mut cx, |this, cx| { + for (buffer, diff_bases_change) in diff_bases_changes_by_buffer { + let Some(diff_state) = this.diffs.get(&buffer.read(cx).remote_id()) else { + continue; + }; + let Some(diff_bases_change) = diff_bases_change else { + continue; + }; + + let downstream_client = this.downstream_client(); + diff_state.update(cx, |diff_state, cx| { + use proto::update_diff_bases::Mode; + + let buffer = buffer.read(cx); + if let Some((client, project_id)) = downstream_client { + let (staged_text, committed_text, mode) = match diff_bases_change + .clone() + { + DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly), + DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly), + DiffBasesChange::SetEach { index, head } => { + (index, head, Mode::IndexAndHead) + } + DiffBasesChange::SetBoth(text) => { + (None, text, Mode::IndexMatchesHead) + } + }; + let message = proto::UpdateDiffBases { + project_id: project_id.to_proto(), + buffer_id: buffer.remote_id().to_proto(), + staged_text, + committed_text, + mode: mode as i32, + }; + + client.send(message).log_err(); + } + + let _ = diff_state.diff_bases_changed( + buffer.text_snapshot(), + diff_bases_change, + cx, + ); + }); + } + }) + }) + .detach_and_log_err(cx); + } + pub fn all_repositories(&self) -> Vec> { self.repositories.clone() } @@ -459,7 +961,7 @@ impl GitStore { result } - fn spawn_git_worker(cx: &mut Context<'_, GitStore>) -> mpsc::UnboundedSender { + fn spawn_git_worker(cx: &mut Context) -> mpsc::UnboundedSender { let (job_tx, mut job_rx) = mpsc::unbounded::(); cx.spawn(|_, mut cx| async move { @@ -504,12 +1006,13 @@ impl GitStore { } GitStoreState::Ssh { upstream_client, - project_id, + upstream_project_id: project_id, .. } | GitStoreState::Remote { upstream_client, project_id, + .. } => { let client = upstream_client.clone(); let project_id = *project_id; @@ -1014,6 +1517,109 @@ impl GitStore { Ok(proto::GitDiffResponse { diff }) } + pub async fn handle_open_unstaged_diff( + this: Entity, + request: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result { + let buffer_id = BufferId::new(request.payload.buffer_id)?; + let diff = this + .update(&mut cx, |this, cx| { + let buffer = this.buffer_store.read(cx).get(buffer_id)?; + Some(this.open_unstaged_diff(buffer, cx)) + })? + .ok_or_else(|| anyhow!("no such buffer"))? + .await?; + this.update(&mut cx, |this, _| { + let shared_diffs = this + .shared_diffs + .entry(request.original_sender_id.unwrap_or(request.sender_id)) + .or_default(); + shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone()); + })?; + let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?; + Ok(proto::OpenUnstagedDiffResponse { staged_text }) + } + + pub async fn handle_open_uncommitted_diff( + this: Entity, + request: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result { + let buffer_id = BufferId::new(request.payload.buffer_id)?; + let diff = this + .update(&mut cx, |this, cx| { + let buffer = this.buffer_store.read(cx).get(buffer_id)?; + Some(this.open_uncommitted_diff(buffer, cx)) + })? + .ok_or_else(|| anyhow!("no such buffer"))? + .await?; + this.update(&mut cx, |this, _| { + let shared_diffs = this + .shared_diffs + .entry(request.original_sender_id.unwrap_or(request.sender_id)) + .or_default(); + shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone()); + })?; + diff.read_with(&cx, |diff, cx| { + use proto::open_uncommitted_diff_response::Mode; + + let unstaged_diff = diff.secondary_diff(); + let index_snapshot = unstaged_diff.and_then(|diff| { + let diff = diff.read(cx); + diff.base_text_exists().then(|| diff.base_text()) + }); + + let mode; + let staged_text; + let committed_text; + if diff.base_text_exists() { + let committed_snapshot = diff.base_text(); + committed_text = Some(committed_snapshot.text()); + if let Some(index_text) = index_snapshot { + if index_text.remote_id() == committed_snapshot.remote_id() { + mode = Mode::IndexMatchesHead; + staged_text = None; + } else { + mode = Mode::IndexAndHead; + staged_text = Some(index_text.text()); + } + } else { + mode = Mode::IndexAndHead; + staged_text = None; + } + } else { + mode = Mode::IndexAndHead; + committed_text = None; + staged_text = index_snapshot.as_ref().map(|buffer| buffer.text()); + } + + proto::OpenUncommittedDiffResponse { + committed_text, + staged_text, + mode: mode.into(), + } + }) + } + + pub async fn handle_update_diff_bases( + this: Entity, + request: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result<()> { + let buffer_id = BufferId::new(request.payload.buffer_id)?; + this.update(&mut cx, |this, cx| { + if let Some(diff_state) = this.diffs.get_mut(&buffer_id) { + if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) { + let buffer = buffer.read(cx).text_snapshot(); + diff_state.update(cx, |diff_state, cx| { + diff_state.handle_base_texts_updated(buffer, request.payload, cx); + }) + } + } + }) + } + fn repository_for_request( this: &Entity, worktree_id: WorktreeId, @@ -1037,6 +1643,207 @@ impl GitStore { } } +impl BufferDiffState { + fn buffer_language_changed(&mut self, buffer: Entity, cx: &mut Context) { + self.language = buffer.read(cx).language().cloned(); + self.language_changed = true; + let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx); + } + + fn unstaged_diff(&self) -> Option> { + self.unstaged_diff.as_ref().and_then(|set| set.upgrade()) + } + + fn uncommitted_diff(&self) -> Option> { + self.uncommitted_diff.as_ref().and_then(|set| set.upgrade()) + } + + fn handle_base_texts_updated( + &mut self, + buffer: text::BufferSnapshot, + message: proto::UpdateDiffBases, + cx: &mut Context, + ) { + use proto::update_diff_bases::Mode; + + let Some(mode) = Mode::from_i32(message.mode) else { + return; + }; + + let diff_bases_change = match mode { + Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text), + Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text), + Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text), + Mode::IndexAndHead => DiffBasesChange::SetEach { + index: message.staged_text, + head: message.committed_text, + }, + }; + + let _ = self.diff_bases_changed(buffer, diff_bases_change, cx); + } + + pub fn wait_for_recalculation(&mut self) -> Option> { + if self.diff_updated_futures.is_empty() { + return None; + } + let (tx, rx) = oneshot::channel(); + self.diff_updated_futures.push(tx); + Some(rx) + } + + fn diff_bases_changed( + &mut self, + buffer: text::BufferSnapshot, + diff_bases_change: DiffBasesChange, + cx: &mut Context, + ) -> oneshot::Receiver<()> { + match diff_bases_change { + DiffBasesChange::SetIndex(index) => { + self.index_text = index.map(|mut index| { + text::LineEnding::normalize(&mut index); + Arc::new(index) + }); + self.index_changed = true; + } + DiffBasesChange::SetHead(head) => { + self.head_text = head.map(|mut head| { + text::LineEnding::normalize(&mut head); + Arc::new(head) + }); + self.head_changed = true; + } + DiffBasesChange::SetBoth(text) => { + let text = text.map(|mut text| { + text::LineEnding::normalize(&mut text); + Arc::new(text) + }); + self.head_text = text.clone(); + self.index_text = text; + self.head_changed = true; + self.index_changed = true; + } + DiffBasesChange::SetEach { index, head } => { + self.index_text = index.map(|mut index| { + text::LineEnding::normalize(&mut index); + Arc::new(index) + }); + self.index_changed = true; + self.head_text = head.map(|mut head| { + text::LineEnding::normalize(&mut head); + Arc::new(head) + }); + self.head_changed = true; + } + } + + self.recalculate_diffs(buffer, cx) + } + + fn recalculate_diffs( + &mut self, + buffer: text::BufferSnapshot, + cx: &mut Context, + ) -> oneshot::Receiver<()> { + log::debug!("recalculate diffs"); + let (tx, rx) = oneshot::channel(); + self.diff_updated_futures.push(tx); + + let language = self.language.clone(); + let language_registry = self.language_registry.clone(); + let unstaged_diff = self.unstaged_diff(); + let uncommitted_diff = self.uncommitted_diff(); + let head = self.head_text.clone(); + let index = self.index_text.clone(); + let index_changed = self.index_changed; + let head_changed = self.head_changed; + let language_changed = self.language_changed; + let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) { + (Some(index), Some(head)) => Arc::ptr_eq(index, head), + (None, None) => true, + _ => false, + }; + self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move { + let mut new_unstaged_diff = None; + if let Some(unstaged_diff) = &unstaged_diff { + new_unstaged_diff = Some( + BufferDiff::update_diff( + unstaged_diff.clone(), + buffer.clone(), + index, + index_changed, + language_changed, + language.clone(), + language_registry.clone(), + &mut cx, + ) + .await?, + ); + } + + let mut new_uncommitted_diff = None; + if let Some(uncommitted_diff) = &uncommitted_diff { + new_uncommitted_diff = if index_matches_head { + new_unstaged_diff.clone() + } else { + Some( + BufferDiff::update_diff( + uncommitted_diff.clone(), + buffer.clone(), + head, + head_changed, + language_changed, + language.clone(), + language_registry.clone(), + &mut cx, + ) + .await?, + ) + } + } + + let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) = + unstaged_diff.as_ref().zip(new_unstaged_diff.clone()) + { + unstaged_diff.update(&mut cx, |diff, cx| { + diff.set_snapshot(&buffer, new_unstaged_diff, language_changed, None, cx) + })? + } else { + None + }; + + if let Some((uncommitted_diff, new_uncommitted_diff)) = + uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone()) + { + uncommitted_diff.update(&mut cx, |uncommitted_diff, cx| { + uncommitted_diff.set_snapshot( + &buffer, + new_uncommitted_diff, + language_changed, + unstaged_changed_range, + cx, + ); + })?; + } + + if let Some(this) = this.upgrade() { + this.update(&mut cx, |this, _| { + this.index_changed = false; + this.head_changed = false; + this.language_changed = false; + for tx in this.diff_updated_futures.drain(..) { + tx.send(()).ok(); + } + })?; + } + + Ok(()) + })); + + rx + } +} + fn make_remote_delegate( this: Entity, project_id: u64, @@ -1047,7 +1854,10 @@ fn make_remote_delegate( ) -> AskPassDelegate { AskPassDelegate::new(cx, move |prompt, tx, cx| { this.update(cx, |this, cx| { - let response = this.client().request(proto::AskPassRequest { + let Some((client, _)) = this.downstream_client() else { + return; + }; + let response = client.request(proto::AskPassRequest { project_id, worktree_id: worktree_id.to_proto(), work_directory_id: work_directory_id.to_proto(), @@ -1064,7 +1874,115 @@ fn make_remote_delegate( }) } -impl GitRepo {} +impl GitStoreState { + fn load_staged_text( + &self, + buffer: &Entity, + buffer_store: &Entity, + cx: &App, + ) -> Task>> { + match self { + GitStoreState::Local { .. } => { + if let Some((worktree, path)) = + buffer_store.read(cx).worktree_for_buffer(buffer, cx) + { + worktree.read(cx).load_staged_file(path.as_ref(), cx) + } else { + return Task::ready(Err(anyhow!("no such worktree"))); + } + } + GitStoreState::Ssh { + upstream_client, + upstream_project_id: project_id, + .. + } + | GitStoreState::Remote { + upstream_client, + project_id, + } => { + let buffer_id = buffer.read(cx).remote_id(); + let project_id = *project_id; + let client = upstream_client.clone(); + cx.background_spawn(async move { + let response = client + .request(proto::OpenUnstagedDiff { + project_id: project_id.to_proto(), + buffer_id: buffer_id.to_proto(), + }) + .await?; + Ok(response.staged_text) + }) + } + } + } + + fn load_committed_text( + &self, + buffer: &Entity, + buffer_store: &Entity, + cx: &App, + ) -> Task> { + match self { + GitStoreState::Local { .. } => { + if let Some((worktree, path)) = + buffer_store.read(cx).worktree_for_buffer(buffer, cx) + { + let worktree = worktree.read(cx); + let committed_text = worktree.load_committed_file(&path, cx); + let staged_text = worktree.load_staged_file(&path, cx); + cx.background_spawn(async move { + let committed_text = committed_text.await?; + let staged_text = staged_text.await?; + let diff_bases_change = if committed_text == staged_text { + DiffBasesChange::SetBoth(committed_text) + } else { + DiffBasesChange::SetEach { + index: staged_text, + head: committed_text, + } + }; + Ok(diff_bases_change) + }) + } else { + Task::ready(Err(anyhow!("no such worktree"))) + } + } + GitStoreState::Ssh { + upstream_client, + upstream_project_id: project_id, + .. + } + | GitStoreState::Remote { + upstream_client, + project_id, + } => { + use proto::open_uncommitted_diff_response::Mode; + + let buffer_id = buffer.read(cx).remote_id(); + let project_id = *project_id; + let client = upstream_client.clone(); + cx.background_spawn(async move { + let response = client + .request(proto::OpenUncommittedDiff { + project_id: project_id.to_proto(), + buffer_id: buffer_id.to_proto(), + }) + .await?; + let mode = + Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?; + let bases = match mode { + Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text), + Mode::IndexAndHead => DiffBasesChange::SetEach { + head: response.committed_text, + index: response.staged_text, + }, + }; + Ok(bases) + }) + } + } + } +} impl Repository { pub fn git_store(&self) -> Option> { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 0479657e55..f39381b44a 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -665,6 +665,7 @@ impl Hover { enum EntitySubscription { Project(PendingEntitySubscription), BufferStore(PendingEntitySubscription), + GitStore(PendingEntitySubscription), WorktreeStore(PendingEntitySubscription), LspStore(PendingEntitySubscription), SettingsObserver(PendingEntitySubscription), @@ -863,7 +864,6 @@ impl Project { buffer_store.clone(), environment.clone(), fs.clone(), - client.clone().into(), cx, ) }); @@ -992,7 +992,6 @@ impl Project { buffer_store.clone(), environment.clone(), ssh_proto.clone(), - ProjectId(SSH_PROJECT_ID), cx, ) }); @@ -1109,6 +1108,7 @@ impl Project { let subscriptions = [ EntitySubscription::Project(client.subscribe_to_entity::(remote_id)?), EntitySubscription::BufferStore(client.subscribe_to_entity::(remote_id)?), + EntitySubscription::GitStore(client.subscribe_to_entity::(remote_id)?), EntitySubscription::WorktreeStore( client.subscribe_to_entity::(remote_id)?, ), @@ -1137,7 +1137,7 @@ impl Project { async fn from_join_project_response( response: TypedEnvelope, - subscriptions: [EntitySubscription; 5], + subscriptions: [EntitySubscription; 6], client: Arc, run_tasks: bool, user_store: Entity, @@ -1254,7 +1254,7 @@ impl Project { remote_id, replica_id, }, - git_store, + git_store: git_store.clone(), buffers_needing_diff: Default::default(), git_diff_debouncer: DebouncedDelay::new(), terminals: Terminals { @@ -1284,6 +1284,9 @@ impl Project { EntitySubscription::WorktreeStore(subscription) => { subscription.set_entity(&worktree_store, &mut cx) } + EntitySubscription::GitStore(subscription) => { + subscription.set_entity(&git_store, &mut cx) + } EntitySubscription::SettingsObserver(subscription) => { subscription.set_entity(&settings_observer, &mut cx) } @@ -1874,6 +1877,9 @@ impl Project { self.settings_observer.update(cx, |settings_observer, cx| { settings_observer.shared(project_id, self.client.clone().into(), cx) }); + self.git_store.update(cx, |git_store, cx| { + git_store.shared(project_id, self.client.clone().into(), cx) + }); self.client_state = ProjectClientState::Shared { remote_id: project_id, @@ -1955,6 +1961,9 @@ impl Project { self.settings_observer.update(cx, |settings_observer, cx| { settings_observer.unshared(cx); }); + self.git_store.update(cx, |git_store, cx| { + git_store.unshared(cx); + }); self.client .send(proto::UnshareProject { @@ -2180,10 +2189,8 @@ impl Project { if self.is_disconnected(cx) { return Task::ready(Err(anyhow!(ErrorCode::Disconnected))); } - - self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.open_unstaged_diff(buffer, cx) - }) + self.git_store + .update(cx, |git_store, cx| git_store.open_unstaged_diff(buffer, cx)) } pub fn open_uncommitted_diff( @@ -2194,9 +2201,8 @@ impl Project { if self.is_disconnected(cx) { return Task::ready(Err(anyhow!(ErrorCode::Disconnected))); } - - self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.open_uncommitted_diff(buffer, cx) + self.git_store.update(cx, |git_store, cx| { + git_store.open_uncommitted_diff(buffer, cx) }) } @@ -2755,8 +2761,8 @@ impl Project { if buffers.is_empty() { None } else { - Some(this.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.recalculate_buffer_diffs(buffers, cx) + Some(this.git_store.update(cx, |git_store, cx| { + git_store.recalculate_buffer_diffs(buffers, cx) })) } }) @@ -4008,6 +4014,9 @@ impl Project { buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } }); + this.git_store.update(cx, |git_store, _| { + git_store.forget_shared_diffs_for(&peer_id); + }); cx.emit(Event::CollaboratorLeft(peer_id)); Ok(()) diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index fe2a49300a..ae99675363 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -6414,8 +6414,6 @@ async fn test_staging_lots_of_hunks_fast(cx: &mut gpui::TestAppContext) { .await .unwrap(); - let range = Anchor::MIN..snapshot.anchor_after(snapshot.max_point()); - let mut expected_hunks: Vec<(Range, String, String, DiffHunkStatus)> = (0..500) .step_by(5) .map(|i| { @@ -6444,9 +6442,7 @@ async fn test_staging_lots_of_hunks_fast(cx: &mut gpui::TestAppContext) { // Stage every hunk with a different call uncommitted_diff.update(cx, |diff, cx| { - let hunks = diff - .hunks_intersecting_range(range.clone(), &snapshot, cx) - .collect::>(); + let hunks = diff.hunks(&snapshot, cx).collect::>(); for hunk in hunks { diff.stage_or_unstage_hunks(true, &[hunk], &snapshot, true, cx); } @@ -6480,9 +6476,7 @@ async fn test_staging_lots_of_hunks_fast(cx: &mut gpui::TestAppContext) { // Unstage every hunk with a different call uncommitted_diff.update(cx, |diff, cx| { - let hunks = diff - .hunks_intersecting_range(range, &snapshot, cx) - .collect::>(); + let hunks = diff.hunks(&snapshot, cx).collect::>(); for hunk in hunks { diff.stage_or_unstage_hunks(false, &[hunk], &snapshot, true, cx); } diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index 3d32ccbec9..e922fdb5e1 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -89,14 +89,15 @@ impl HeadlessProject { let environment = project::ProjectEnvironment::new(&worktree_store, None, cx); let git_store = cx.new(|cx| { - GitStore::local( + let mut store = GitStore::local( &worktree_store, buffer_store.clone(), environment.clone(), fs.clone(), - session.clone().into(), cx, - ) + ); + store.shared(SSH_PROJECT_ID, session.clone().into(), cx); + store }); let prettier_store = cx.new(|cx| { PrettierStore::new( diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index 654778b75f..51d47d85d9 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -94,6 +94,7 @@ impl BufferId { self.into() } } + impl From for u64 { fn from(id: BufferId) -> Self { id.0.get()