Restructure git diff state management to allow viewing buffers with different diff bases (#21258)

This is a pure refactor of our Git diff state management. Buffers are no
longer are associated with one single diff (the unstaged changes).
Instead, there is an explicit project API for retrieving a buffer's
unstaged changes, and the `Editor` view layer is responsible for
choosing what diff to associate with a buffer.

The reason for this change is that we'll soon want to add multiple "git
diff views" to Zed, one of which will show the *uncommitted* changes for
a buffer. But that view will need to co-exist with other views of the
same buffer, which may want to show the unstaged changes.

### Todo

* [x] Get git gutter and git hunks working with new structure
* [x] Update editor tests to use new APIs
* [x] Update buffer tests
* [x] Restructure remoting/collab protocol
* [x] Update assertions about staged text in
`random_project_collaboration_tests`
* [x] Move buffer tests for git diff management to a new spot, using the
new APIs

Release Notes:

- N/A

---------

Co-authored-by: Richard <richard@zed.dev>
Co-authored-by: Cole <cole@zed.dev>
Co-authored-by: Conrad <conrad@zed.dev>
This commit is contained in:
Max Brunsfeld 2024-12-04 15:02:33 -08:00 committed by GitHub
parent 31796171de
commit a2115e7242
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 1832 additions and 1651 deletions

View file

@ -8,8 +8,8 @@ use anyhow::{anyhow, Context as _, Result};
use client::Client;
use collections::{hash_map, HashMap, HashSet};
use fs::Fs;
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use git::blame::Blame;
use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
use git::{blame::Blame, diff::BufferDiff};
use gpui::{
AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
Task, WeakModel,
@ -25,7 +25,7 @@ use language::{
use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
use smol::channel::Receiver;
use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
use text::BufferId;
use text::{BufferId, LineEnding, Rope};
use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
@ -33,14 +33,29 @@ use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Work
pub struct BufferStore {
state: BufferStoreState,
#[allow(clippy::type_complexity)]
loading_buffers_by_path: HashMap<
ProjectPath,
postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
>,
loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Model<Buffer>, Arc<anyhow::Error>>>>>,
#[allow(clippy::type_complexity)]
loading_change_sets:
HashMap<BufferId, Shared<Task<Result<Model<BufferChangeSet>, Arc<anyhow::Error>>>>>,
worktree_store: Model<WorktreeStore>,
opened_buffers: HashMap<BufferId, OpenBuffer>,
downstream_client: Option<(AnyProtoClient, u64)>,
shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
}
#[derive(Hash, Eq, PartialEq, Clone)]
struct SharedBuffer {
buffer: Model<Buffer>,
unstaged_changes: Option<Model<BufferChangeSet>>,
}
pub struct BufferChangeSet {
pub buffer_id: BufferId,
pub base_text: Option<Model<Buffer>>,
pub diff_to_buffer: git::diff::BufferDiff,
pub recalculate_diff_task: Option<Task<Result<()>>>,
pub diff_updated_futures: Vec<oneshot::Sender<()>>,
pub base_text_version: usize,
}
enum BufferStoreState {
@ -66,7 +81,10 @@ struct LocalBufferStore {
}
enum OpenBuffer {
Buffer(WeakModel<Buffer>),
Complete {
buffer: WeakModel<Buffer>,
unstaged_changes: Option<WeakModel<BufferChangeSet>>,
},
Operations(Vec<Operation>),
}
@ -85,6 +103,23 @@ pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>)
impl EventEmitter<BufferStoreEvent> for BufferStore {}
impl RemoteBufferStore {
fn load_staged_text(
&self,
buffer_id: BufferId,
cx: &AppContext,
) -> Task<Result<Option<String>>> {
let project_id = self.project_id;
let client = self.upstream_client.clone();
cx.background_executor().spawn(async move {
Ok(client
.request(proto::GetStagedText {
project_id,
buffer_id: buffer_id.to_proto(),
})
.await?
.staged_text)
})
}
pub fn wait_for_remote_buffer(
&mut self,
id: BufferId,
@ -352,6 +387,27 @@ impl RemoteBufferStore {
}
impl LocalBufferStore {
fn load_staged_text(
&self,
buffer: &Model<Buffer>,
cx: &AppContext,
) -> Task<Result<Option<String>>> {
let Some(file) = buffer.read(cx).file() else {
return Task::ready(Err(anyhow!("buffer has no file")));
};
let worktree_id = file.worktree_id(cx);
let path = file.path().clone();
let Some(worktree) = self
.worktree_store
.read(cx)
.worktree_for_id(worktree_id, cx)
else {
return Task::ready(Err(anyhow!("no such worktree")));
};
worktree.read(cx).load_staged_file(path.as_ref(), cx)
}
fn save_local_buffer(
&self,
buffer_handle: Model<Buffer>,
@ -463,94 +519,71 @@ impl LocalBufferStore {
) {
debug_assert!(worktree_handle.read(cx).is_local());
// Identify the loading buffers whose containing repository that has changed.
let future_buffers = this
.loading_buffers()
.filter_map(|(project_path, receiver)| {
if project_path.worktree_id != worktree_handle.read(cx).id() {
return None;
}
let path = &project_path.path;
changed_repos
.iter()
.find(|(work_dir, _)| path.starts_with(work_dir))?;
let path = path.clone();
Some(async move {
BufferStore::wait_for_loading_buffer(receiver)
.await
.ok()
.map(|buffer| (buffer, path))
})
})
.collect::<FuturesUnordered<_>>();
// Identify the current buffers whose containing repository has changed.
let current_buffers = this
.buffers()
let buffer_change_sets = this
.opened_buffers
.values()
.filter_map(|buffer| {
let file = File::from_dyn(buffer.read(cx).file())?;
if file.worktree != worktree_handle {
return None;
if let OpenBuffer::Complete {
buffer,
unstaged_changes,
} = buffer
{
let buffer = buffer.upgrade()?.read(cx);
let file = File::from_dyn(buffer.file())?;
if file.worktree != worktree_handle {
return None;
}
changed_repos
.iter()
.find(|(work_dir, _)| file.path.starts_with(work_dir))?;
let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
let snapshot = buffer.text_snapshot();
Some((unstaged_changes, snapshot, file.path.clone()))
} else {
None
}
changed_repos
.iter()
.find(|(work_dir, _)| file.path.starts_with(work_dir))?;
Some((buffer, file.path.clone()))
})
.collect::<Vec<_>>();
if future_buffers.len() + current_buffers.len() == 0 {
if buffer_change_sets.is_empty() {
return;
}
cx.spawn(move |this, mut cx| async move {
// Wait for all of the buffers to load.
let future_buffers = future_buffers.collect::<Vec<_>>().await;
// Reload the diff base for every buffer whose containing git repository has changed.
let snapshot =
worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
let diff_bases_by_buffer = cx
.background_executor()
.spawn(async move {
let mut diff_base_tasks = future_buffers
buffer_change_sets
.into_iter()
.flatten()
.chain(current_buffers)
.filter_map(|(buffer, path)| {
.filter_map(|(change_set, buffer_snapshot, path)| {
let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
Some(async move {
let base_text =
local_repo_entry.repo().load_index_text(&relative_path);
Some((buffer, base_text))
})
let base_text = local_repo_entry.repo().load_index_text(&relative_path);
Some((change_set, buffer_snapshot, base_text))
})
.collect::<FuturesUnordered<_>>();
let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
while let Some(diff_base) = diff_base_tasks.next().await {
if let Some(diff_base) = diff_base {
diff_bases.push(diff_base);
}
}
diff_bases
.collect::<Vec<_>>()
})
.await;
this.update(&mut cx, |this, cx| {
// Assign the new diff bases on all of the buffers.
for (buffer, diff_base) in diff_bases_by_buffer {
let buffer_id = buffer.update(cx, |buffer, cx| {
buffer.set_diff_base(diff_base.clone(), cx);
buffer.remote_id().to_proto()
for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
change_set.update(cx, |change_set, cx| {
if let Some(staged_text) = staged_text.clone() {
let _ =
change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
} else {
change_set.unset_base_text(buffer_snapshot.clone(), cx);
}
});
if let Some((client, project_id)) = &this.downstream_client.clone() {
client
.send(proto::UpdateDiffBase {
project_id: *project_id,
buffer_id,
diff_base,
buffer_id: buffer_snapshot.remote_id().to_proto(),
staged_text,
})
.log_err();
}
@ -759,12 +792,7 @@ impl LocalBufferStore {
.spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
.await;
cx.insert_model(reservation, |_| {
Buffer::build(
text_buffer,
loaded.diff_base,
Some(loaded.file),
Capability::ReadWrite,
)
Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
})
})
});
@ -777,7 +805,6 @@ impl LocalBufferStore {
let text_buffer = text::Buffer::new(0, buffer_id, "".into());
Buffer::build(
text_buffer,
None,
Some(Arc::new(File {
worktree,
path,
@ -861,11 +888,12 @@ impl BufferStore {
client.add_model_message_handler(Self::handle_buffer_reloaded);
client.add_model_message_handler(Self::handle_buffer_saved);
client.add_model_message_handler(Self::handle_update_buffer_file);
client.add_model_message_handler(Self::handle_update_diff_base);
client.add_model_request_handler(Self::handle_save_buffer);
client.add_model_request_handler(Self::handle_blame_buffer);
client.add_model_request_handler(Self::handle_reload_buffers);
client.add_model_request_handler(Self::handle_get_permalink_to_line);
client.add_model_request_handler(Self::handle_get_staged_text);
client.add_model_message_handler(Self::handle_update_diff_base);
}
/// Creates a buffer store, optionally retaining its buffers.
@ -885,7 +913,8 @@ impl BufferStore {
downstream_client: None,
opened_buffers: Default::default(),
shared_buffers: Default::default(),
loading_buffers_by_path: Default::default(),
loading_buffers: Default::default(),
loading_change_sets: Default::default(),
worktree_store,
}
}
@ -907,7 +936,8 @@ impl BufferStore {
}),
downstream_client: None,
opened_buffers: Default::default(),
loading_buffers_by_path: Default::default(),
loading_buffers: Default::default(),
loading_change_sets: Default::default(),
shared_buffers: Default::default(),
worktree_store,
}
@ -939,55 +969,125 @@ impl BufferStore {
project_path: ProjectPath,
cx: &mut ModelContext<Self>,
) -> Task<Result<Model<Buffer>>> {
let existing_buffer = self.get_by_path(&project_path, cx);
if let Some(existing_buffer) = existing_buffer {
return Task::ready(Ok(existing_buffer));
if let Some(buffer) = self.get_by_path(&project_path, cx) {
return Task::ready(Ok(buffer));
}
let Some(worktree) = self
.worktree_store
.read(cx)
.worktree_for_id(project_path.worktree_id, cx)
else {
return Task::ready(Err(anyhow!("no such worktree")));
};
let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
// If the given path is already being loaded, then wait for that existing
// task to complete and return the same buffer.
let task = match self.loading_buffers.entry(project_path.clone()) {
hash_map::Entry::Occupied(e) => e.get().clone(),
// Otherwise, record the fact that this path is now being loaded.
hash_map::Entry::Vacant(entry) => {
let (mut tx, rx) = postage::watch::channel();
entry.insert(rx.clone());
let path = project_path.path.clone();
let Some(worktree) = self
.worktree_store
.read(cx)
.worktree_for_id(project_path.worktree_id, cx)
else {
return Task::ready(Err(anyhow!("no such worktree")));
};
let load_buffer = match &self.state {
BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
};
cx.spawn(move |this, mut cx| async move {
let load_result = load_buffer.await;
*tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
// Record the fact that the buffer is no longer loading.
this.loading_buffers_by_path.remove(&project_path);
let buffer = load_result.map_err(Arc::new)?;
Ok(buffer)
})?);
anyhow::Ok(())
})
.detach();
rx
entry
.insert(
cx.spawn(move |this, mut cx| async move {
let load_result = load_buffer.await;
this.update(&mut cx, |this, _cx| {
// Record the fact that the buffer is no longer loading.
this.loading_buffers.remove(&project_path);
})
.ok();
load_result.map_err(Arc::new)
})
.shared(),
)
.clone()
}
};
cx.background_executor().spawn(async move {
Self::wait_for_loading_buffer(loading_watch)
cx.background_executor()
.spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
}
pub fn open_unstaged_changes(
&mut self,
buffer: Model<Buffer>,
cx: &mut ModelContext<Self>,
) -> Task<Result<Model<BufferChangeSet>>> {
let buffer_id = buffer.read(cx).remote_id();
if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
return Task::ready(Ok(change_set));
}
let task = match self.loading_change_sets.entry(buffer_id) {
hash_map::Entry::Occupied(e) => e.get().clone(),
hash_map::Entry::Vacant(entry) => {
let load = match &self.state {
BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
};
entry
.insert(
cx.spawn(move |this, cx| async move {
Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
.await
.map_err(Arc::new)
})
.shared(),
)
.clone()
}
};
cx.background_executor()
.spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
}
pub async fn open_unstaged_changes_internal(
this: WeakModel<Self>,
text: Result<Option<String>>,
buffer: Model<Buffer>,
mut cx: AsyncAppContext,
) -> Result<Model<BufferChangeSet>> {
let text = match text {
Err(e) => {
this.update(&mut cx, |this, cx| {
let buffer_id = buffer.read(cx).remote_id();
this.loading_change_sets.remove(&buffer_id);
})?;
return Err(e);
}
Ok(text) => text,
};
let change_set = buffer.update(&mut cx, |buffer, cx| {
cx.new_model(|_| BufferChangeSet::new(buffer))
})?;
if let Some(text) = text {
change_set
.update(&mut cx, |change_set, cx| {
let snapshot = buffer.read(cx).text_snapshot();
change_set.set_base_text(text, snapshot, cx)
})?
.await
.map_err(|e| e.cloned())
})
.ok();
}
this.update(&mut cx, |this, cx| {
let buffer_id = buffer.read(cx).remote_id();
this.loading_change_sets.remove(&buffer_id);
if let Some(OpenBuffer::Complete {
unstaged_changes, ..
}) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
{
*unstaged_changes = Some(change_set.downgrade());
}
})?;
Ok(change_set)
}
pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
@ -1166,7 +1266,10 @@ impl BufferStore {
fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
let remote_id = buffer.read(cx).remote_id();
let is_remote = buffer.read(cx).replica_id() != 0;
let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
let open_buffer = OpenBuffer::Complete {
buffer: buffer.downgrade(),
unstaged_changes: None,
};
let handle = cx.handle().downgrade();
buffer.update(cx, move |_, cx| {
@ -1212,15 +1315,11 @@ impl BufferStore {
pub fn loading_buffers(
&self,
) -> impl Iterator<
Item = (
&ProjectPath,
postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
),
> {
self.loading_buffers_by_path
.iter()
.map(|(path, rx)| (path, rx.clone()))
) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Model<Buffer>>>)> {
self.loading_buffers.iter().map(|(path, task)| {
let task = task.clone();
(path, async move { task.await.map_err(|e| anyhow!("{e}")) })
})
}
pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
@ -1235,9 +1334,7 @@ impl BufferStore {
}
pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
self.opened_buffers
.get(&buffer_id)
.and_then(|buffer| buffer.upgrade())
self.opened_buffers.get(&buffer_id)?.upgrade()
}
pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
@ -1252,6 +1349,17 @@ impl BufferStore {
})
}
pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Model<BufferChangeSet>> {
if let OpenBuffer::Complete {
unstaged_changes, ..
} = self.opened_buffers.get(&buffer_id)?
{
unstaged_changes.as_ref()?.upgrade()
} else {
None
}
}
pub fn buffer_version_info(
&self,
cx: &AppContext,
@ -1366,6 +1474,35 @@ impl BufferStore {
rx
}
pub fn recalculate_buffer_diffs(
&mut self,
buffers: Vec<Model<Buffer>>,
cx: &mut ModelContext<Self>,
) -> impl Future<Output = ()> {
let mut futures = Vec::new();
for buffer in buffers {
let buffer = buffer.read(cx).text_snapshot();
if let Some(OpenBuffer::Complete {
unstaged_changes, ..
}) = self.opened_buffers.get_mut(&buffer.remote_id())
{
if let Some(unstaged_changes) = unstaged_changes
.as_ref()
.and_then(|changes| changes.upgrade())
{
unstaged_changes.update(cx, |unstaged_changes, cx| {
futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
});
} else {
unstaged_changes.take();
}
}
}
async move {
futures::future::join_all(futures).await;
}
}
fn on_buffer_event(
&mut self,
buffer: Model<Buffer>,
@ -1413,7 +1550,7 @@ impl BufferStore {
match this.opened_buffers.entry(buffer_id) {
hash_map::Entry::Occupied(mut e) => match e.get_mut() {
OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
OpenBuffer::Buffer(buffer) => {
OpenBuffer::Complete { buffer, .. } => {
if let Some(buffer) = buffer.upgrade() {
buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
}
@ -1449,7 +1586,11 @@ impl BufferStore {
self.shared_buffers
.entry(guest_id)
.or_default()
.insert(buffer.clone());
.entry(buffer_id)
.or_insert_with(|| SharedBuffer {
buffer: buffer.clone(),
unstaged_changes: None,
});
let buffer = buffer.read(cx);
response.buffers.push(proto::BufferVersion {
@ -1469,13 +1610,14 @@ impl BufferStore {
.log_err();
}
client
.send(proto::UpdateDiffBase {
project_id,
buffer_id: buffer_id.into(),
diff_base: buffer.diff_base().map(ToString::to_string),
})
.log_err();
// todo!(max): do something
// client
// .send(proto::UpdateStagedText {
// project_id,
// buffer_id: buffer_id.into(),
// diff_base: buffer.diff_base().map(ToString::to_string),
// })
// .log_err();
client
.send(proto::BufferReloaded {
@ -1579,32 +1721,6 @@ impl BufferStore {
})?
}
pub async fn handle_update_diff_base(
this: Model<Self>,
envelope: TypedEnvelope<proto::UpdateDiffBase>,
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, cx| {
let buffer_id = envelope.payload.buffer_id;
let buffer_id = BufferId::new(buffer_id)?;
if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
buffer.update(cx, |buffer, cx| {
buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
});
}
if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
downstream_client
.send(proto::UpdateDiffBase {
project_id: *project_id,
buffer_id: buffer_id.into(),
diff_base: envelope.payload.diff_base,
})
.log_err();
}
Ok(())
})?
}
pub async fn handle_save_buffer(
this: Model<Self>,
envelope: TypedEnvelope<proto::SaveBuffer>,
@ -1654,16 +1770,14 @@ impl BufferStore {
let peer_id = envelope.sender_id;
let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
this.update(&mut cx, |this, _| {
if let Some(buffer) = this.get(buffer_id) {
if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
if shared.remove(&buffer) {
if shared.is_empty() {
this.shared_buffers.remove(&peer_id);
}
return;
if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
if shared.remove(&buffer_id).is_some() {
if shared.is_empty() {
this.shared_buffers.remove(&peer_id);
}
return;
}
};
}
debug_panic!(
"peer_id {} closed buffer_id {} which was either not open or already closed",
peer_id,
@ -1779,18 +1893,66 @@ impl BufferStore {
})
}
pub async fn wait_for_loading_buffer(
mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
loop {
if let Some(result) = receiver.borrow().as_ref() {
match result {
Ok(buffer) => return Ok(buffer.to_owned()),
Err(e) => return Err(e.to_owned()),
}
pub async fn handle_get_staged_text(
this: Model<Self>,
request: TypedEnvelope<proto::GetStagedText>,
mut cx: AsyncAppContext,
) -> Result<proto::GetStagedTextResponse> {
let buffer_id = BufferId::new(request.payload.buffer_id)?;
let change_set = this
.update(&mut cx, |this, cx| {
let buffer = this.get(buffer_id)?;
Some(this.open_unstaged_changes(buffer, cx))
})?
.ok_or_else(|| anyhow!("no such buffer"))?
.await?;
this.update(&mut cx, |this, _| {
let shared_buffers = this
.shared_buffers
.entry(request.original_sender_id.unwrap_or(request.sender_id))
.or_default();
debug_assert!(shared_buffers.contains_key(&buffer_id));
if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
shared.unstaged_changes = Some(change_set.clone());
}
receiver.next().await;
}
})?;
let staged_text = change_set.read_with(&cx, |change_set, cx| {
change_set
.base_text
.as_ref()
.map(|buffer| buffer.read(cx).text())
})?;
Ok(proto::GetStagedTextResponse { staged_text })
}
pub async fn handle_update_diff_base(
this: Model<Self>,
request: TypedEnvelope<proto::UpdateDiffBase>,
mut cx: AsyncAppContext,
) -> Result<()> {
let buffer_id = BufferId::new(request.payload.buffer_id)?;
let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
if let OpenBuffer::Complete {
unstaged_changes,
buffer,
} = this.opened_buffers.get(&buffer_id)?
{
Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
} else {
None
}
})?
else {
return Ok(());
};
change_set.update(&mut cx, |change_set, cx| {
if let Some(staged_text) = request.payload.staged_text {
let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
} else {
change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
}
})?;
Ok(())
}
pub fn reload_buffers(
@ -1839,14 +2001,17 @@ impl BufferStore {
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
let buffer_id = buffer.read(cx).remote_id();
if !self
.shared_buffers
.entry(peer_id)
.or_default()
.insert(buffer.clone())
{
let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
if shared_buffers.contains_key(&buffer_id) {
return Task::ready(Ok(()));
}
shared_buffers.insert(
buffer_id,
SharedBuffer {
buffer: buffer.clone(),
unstaged_changes: None,
},
);
let Some((client, project_id)) = self.downstream_client.clone() else {
return Task::ready(Ok(()));
@ -1909,8 +2074,8 @@ impl BufferStore {
}
}
pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
&self.shared_buffers
pub fn has_shared_buffers(&self) -> bool {
!self.shared_buffers.is_empty()
}
pub fn create_local_buffer(
@ -1998,10 +2163,129 @@ impl BufferStore {
}
}
impl BufferChangeSet {
pub fn new(buffer: &text::BufferSnapshot) -> Self {
Self {
buffer_id: buffer.remote_id(),
base_text: None,
diff_to_buffer: git::diff::BufferDiff::new(buffer),
recalculate_diff_task: None,
diff_updated_futures: Vec::new(),
base_text_version: 0,
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn new_with_base_text(
base_text: String,
buffer: text::BufferSnapshot,
cx: &mut ModelContext<Self>,
) -> Self {
let mut this = Self::new(&buffer);
let _ = this.set_base_text(base_text, buffer, cx);
this
}
pub fn diff_hunks_intersecting_range<'a>(
&'a self,
range: Range<text::Anchor>,
buffer_snapshot: &'a text::BufferSnapshot,
) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
self.diff_to_buffer
.hunks_intersecting_range(range, buffer_snapshot)
}
pub fn diff_hunks_intersecting_range_rev<'a>(
&'a self,
range: Range<text::Anchor>,
buffer_snapshot: &'a text::BufferSnapshot,
) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
self.diff_to_buffer
.hunks_intersecting_range_rev(range, buffer_snapshot)
}
#[cfg(any(test, feature = "test-support"))]
pub fn base_text_string(&self, cx: &AppContext) -> Option<String> {
self.base_text.as_ref().map(|buffer| buffer.read(cx).text())
}
pub fn set_base_text(
&mut self,
mut base_text: String,
buffer_snapshot: text::BufferSnapshot,
cx: &mut ModelContext<Self>,
) -> oneshot::Receiver<()> {
LineEnding::normalize(&mut base_text);
self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
}
pub fn unset_base_text(
&mut self,
buffer_snapshot: text::BufferSnapshot,
cx: &mut ModelContext<Self>,
) {
if self.base_text.is_some() {
self.base_text = None;
self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
self.recalculate_diff_task.take();
self.base_text_version += 1;
cx.notify();
}
}
pub fn recalculate_diff(
&mut self,
buffer_snapshot: text::BufferSnapshot,
cx: &mut ModelContext<Self>,
) -> oneshot::Receiver<()> {
if let Some(base_text) = self.base_text.clone() {
self.recalculate_diff_internal(base_text.read(cx).text(), buffer_snapshot, false, cx)
} else {
oneshot::channel().1
}
}
fn recalculate_diff_internal(
&mut self,
base_text: String,
buffer_snapshot: text::BufferSnapshot,
base_text_changed: bool,
cx: &mut ModelContext<Self>,
) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
self.diff_updated_futures.push(tx);
self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
let (base_text, diff) = cx
.background_executor()
.spawn(async move {
let diff = BufferDiff::build(&base_text, &buffer_snapshot).await;
(base_text, diff)
})
.await;
this.update(&mut cx, |this, cx| {
if base_text_changed {
this.base_text_version += 1;
this.base_text = Some(cx.new_model(|cx| {
Buffer::local_normalized(Rope::from(base_text), LineEnding::default(), cx)
}));
}
this.diff_to_buffer = diff;
this.recalculate_diff_task.take();
for tx in this.diff_updated_futures.drain(..) {
tx.send(()).ok();
}
cx.notify();
})?;
Ok(())
}));
rx
}
}
impl OpenBuffer {
fn upgrade(&self) -> Option<Model<Buffer>> {
match self {
OpenBuffer::Buffer(handle) => handle.upgrade(),
OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
OpenBuffer::Operations(_) => None,
}
}

View file

@ -25,7 +25,7 @@ pub mod search_history;
mod yarn;
use anyhow::{anyhow, Context as _, Result};
use buffer_store::{BufferStore, BufferStoreEvent};
use buffer_store::{BufferChangeSet, BufferStore, BufferStoreEvent};
use client::{proto, Client, Collaborator, PendingEntitySubscription, TypedEnvelope, UserStore};
use clock::ReplicaId;
use collections::{BTreeSet, HashMap, HashSet};
@ -1821,6 +1821,20 @@ impl Project {
})
}
pub fn open_unstaged_changes(
&mut self,
buffer: Model<Buffer>,
cx: &mut ModelContext<Self>,
) -> Task<Result<Model<BufferChangeSet>>> {
if self.is_disconnected(cx) {
return Task::ready(Err(anyhow!(ErrorCode::Disconnected)));
}
self.buffer_store.update(cx, |buffer_store, cx| {
buffer_store.open_unstaged_changes(buffer, cx)
})
}
pub fn open_buffer_by_id(
&mut self,
id: BufferId,
@ -2269,10 +2283,7 @@ impl Project {
event: &BufferEvent,
cx: &mut ModelContext<Self>,
) -> Option<()> {
if matches!(
event,
BufferEvent::Edited { .. } | BufferEvent::Reloaded | BufferEvent::DiffBaseChanged
) {
if matches!(event, BufferEvent::Edited { .. } | BufferEvent::Reloaded) {
self.request_buffer_diff_recalculation(&buffer, cx);
}
@ -2369,34 +2380,32 @@ impl Project {
}
fn recalculate_buffer_diffs(&mut self, cx: &mut ModelContext<Self>) -> Task<()> {
let buffers = self.buffers_needing_diff.drain().collect::<Vec<_>>();
cx.spawn(move |this, mut cx| async move {
let tasks: Vec<_> = buffers
.iter()
.filter_map(|buffer| {
let buffer = buffer.upgrade()?;
buffer
.update(&mut cx, |buffer, cx| buffer.recalculate_diff(cx))
.ok()
.flatten()
})
.collect();
futures::future::join_all(tasks).await;
this.update(&mut cx, |this, cx| {
if this.buffers_needing_diff.is_empty() {
// TODO: Would a `ModelContext<Project>.notify()` suffice here?
for buffer in buffers {
if let Some(buffer) = buffer.upgrade() {
buffer.update(cx, |_, cx| cx.notify());
loop {
let task = this
.update(&mut cx, |this, cx| {
let buffers = this
.buffers_needing_diff
.drain()
.filter_map(|buffer| buffer.upgrade())
.collect::<Vec<_>>();
if buffers.is_empty() {
None
} else {
Some(this.buffer_store.update(cx, |buffer_store, cx| {
buffer_store.recalculate_buffer_diffs(buffers, cx)
}))
}
}
})
.ok()
.flatten();
if let Some(task) = task {
task.await;
} else {
this.recalculate_buffer_diffs(cx).detach();
break;
}
})
.ok();
}
})
}
@ -4149,6 +4158,10 @@ impl Project {
.read(cx)
.language_servers_for_buffer(buffer, cx)
}
pub fn buffer_store(&self) -> &Model<BufferStore> {
&self.buffer_store
}
}
fn deserialize_code_actions(code_actions: &HashMap<String, bool>) -> Vec<lsp::CodeActionKind> {

View file

@ -1,6 +1,7 @@
use crate::{Event, *};
use fs::FakeFs;
use futures::{future, StreamExt};
use git::diff::assert_hunks;
use gpui::{AppContext, SemanticVersion, UpdateGlobal};
use http_client::Url;
use language::{
@ -5396,6 +5397,98 @@ async fn test_reordering_worktrees(cx: &mut gpui::TestAppContext) {
});
}
#[gpui::test]
async fn test_unstaged_changes_for_buffer(cx: &mut gpui::TestAppContext) {
init_test(cx);
let staged_contents = r#"
fn main() {
println!("hello world");
}
"#
.unindent();
let file_contents = r#"
// print goodbye
fn main() {
println!("goodbye world");
}
"#
.unindent();
let fs = FakeFs::new(cx.background_executor.clone());
fs.insert_tree(
"/dir",
json!({
".git": {},
"src": {
"main.rs": file_contents,
}
}),
)
.await;
fs.set_index_for_repo(
Path::new("/dir/.git"),
&[(Path::new("src/main.rs"), staged_contents)],
);
let project = Project::test(fs.clone(), ["/dir".as_ref()], cx).await;
let buffer = project
.update(cx, |project, cx| {
project.open_local_buffer("/dir/src/main.rs", cx)
})
.await
.unwrap();
let unstaged_changes = project
.update(cx, |project, cx| {
project.open_unstaged_changes(buffer.clone(), cx)
})
.await
.unwrap();
cx.run_until_parked();
unstaged_changes.update(cx, |unstaged_changes, cx| {
let snapshot = buffer.read(cx).snapshot();
assert_hunks(
unstaged_changes.diff_hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &snapshot),
&snapshot,
&unstaged_changes.base_text.as_ref().unwrap().read(cx).text(),
&[
(0..1, "", "// print goodbye\n"),
(
2..3,
" println!(\"hello world\");\n",
" println!(\"goodbye world\");\n",
),
],
);
});
let staged_contents = r#"
// print goodbye
fn main() {
}
"#
.unindent();
fs.set_index_for_repo(
Path::new("/dir/.git"),
&[(Path::new("src/main.rs"), staged_contents)],
);
cx.run_until_parked();
unstaged_changes.update(cx, |unstaged_changes, cx| {
let snapshot = buffer.read(cx).snapshot();
assert_hunks(
unstaged_changes.diff_hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &snapshot),
&snapshot,
&unstaged_changes.base_text.as_ref().unwrap().read(cx).text(),
&[(2..3, "", " println!(\"goodbye world\");\n")],
);
});
}
async fn search(
project: &Model<Project>,
query: SearchQuery,