Handle buffer diff base updates and file renames properly for SSH projects (#14989)

Release Notes:

- N/A

---------

Co-authored-by: Conrad <conrad@zed.dev>
This commit is contained in:
Max Brunsfeld 2024-07-23 11:32:37 -07:00 committed by GitHub
parent ec093c390f
commit 38e3182bef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1021 additions and 811 deletions

View file

@ -1,13 +1,16 @@
use crate::ProjectPath;
use anyhow::{anyhow, Context as _, Result};
use crate::{
worktree_store::{WorktreeStore, WorktreeStoreEvent},
ProjectPath,
};
use anyhow::{anyhow, Result};
use collections::{hash_map, HashMap};
use futures::{channel::oneshot, StreamExt as _};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
use gpui::{
AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
};
use language::{
proto::{deserialize_version, serialize_version, split_operations},
Buffer, Capability, Language, Operation,
proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
Buffer, Capability, Event as BufferEvent, Language, Operation,
};
use rpc::{
proto::{self, AnyProtoClient, PeerId},
@ -16,11 +19,15 @@ use rpc::{
use std::{io, path::Path, sync::Arc};
use text::BufferId;
use util::{debug_panic, maybe, ResultExt as _};
use worktree::{File, ProjectEntryId, RemoteWorktree, Worktree};
use worktree::{
File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
};
/// A set of open buffers.
pub struct BufferStore {
retain_buffers: bool,
#[allow(unused)]
worktree_store: Model<WorktreeStore>,
opened_buffers: HashMap<BufferId, OpenBuffer>,
local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
@ -51,6 +58,12 @@ pub enum BufferStoreEvent {
has_changed_file: bool,
saved_version: clock::Global,
},
LocalBufferUpdated {
buffer: Model<Buffer>,
},
DiffBaseUpdated {
buffer: Model<Buffer>,
},
}
impl EventEmitter<BufferStoreEvent> for BufferStore {}
@ -62,9 +75,22 @@ impl BufferStore {
/// and won't be released unless they are explicitly removed, or `retain_buffers`
/// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
/// weak handles.
pub fn new(retain_buffers: bool) -> Self {
pub fn new(
worktree_store: Model<WorktreeStore>,
retain_buffers: bool,
cx: &mut ModelContext<Self>,
) -> Self {
cx.subscribe(&worktree_store, |this, _, event, cx| match event {
WorktreeStoreEvent::WorktreeAdded(worktree) => {
this.subscribe_to_worktree(worktree, cx);
}
_ => {}
})
.detach();
Self {
retain_buffers,
worktree_store,
opened_buffers: Default::default(),
remote_buffer_listeners: Default::default(),
loading_remote_buffers_by_id: Default::default(),
@ -77,7 +103,6 @@ impl BufferStore {
pub fn open_buffer(
&mut self,
project_path: ProjectPath,
worktree: Model<Worktree>,
cx: &mut ModelContext<Self>,
) -> Task<Result<Model<Buffer>>> {
let existing_buffer = self.get_by_path(&project_path, cx);
@ -85,6 +110,14 @@ impl BufferStore {
return Task::ready(Ok(existing_buffer));
}
let Some(worktree) = self
.worktree_store
.read(cx)
.worktree_for_id(project_path.worktree_id, cx)
else {
return Task::ready(Err(anyhow!("no such worktree")));
};
let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
// If the given path is already being loaded, then wait for that existing
// task to complete and return the same buffer.
@ -127,6 +160,131 @@ impl BufferStore {
})
}
fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
cx.subscribe(worktree, |this, worktree, event, cx| {
if worktree.read(cx).is_local() {
match event {
worktree::Event::UpdatedEntries(changes) => {
this.local_worktree_entries_changed(&worktree, changes, cx);
}
worktree::Event::UpdatedGitRepositories(updated_repos) => {
this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
}
_ => {}
}
}
})
.detach();
}
fn local_worktree_entries_changed(
&mut self,
worktree_handle: &Model<Worktree>,
changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
cx: &mut ModelContext<Self>,
) {
let snapshot = worktree_handle.read(cx).snapshot();
for (path, entry_id, _) in changes {
self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
}
}
fn local_worktree_git_repos_changed(
&mut self,
worktree_handle: Model<Worktree>,
changed_repos: &UpdatedGitRepositoriesSet,
cx: &mut ModelContext<Self>,
) {
debug_assert!(worktree_handle.read(cx).is_local());
// Identify the loading buffers whose containing repository that has changed.
let future_buffers = self
.loading_buffers()
.filter_map(|(project_path, receiver)| {
if project_path.worktree_id != worktree_handle.read(cx).id() {
return None;
}
let path = &project_path.path;
changed_repos
.iter()
.find(|(work_dir, _)| path.starts_with(work_dir))?;
let path = path.clone();
Some(async move {
Self::wait_for_loading_buffer(receiver)
.await
.ok()
.map(|buffer| (buffer, path))
})
})
.collect::<FuturesUnordered<_>>();
// Identify the current buffers whose containing repository has changed.
let current_buffers = self
.buffers()
.filter_map(|buffer| {
let file = File::from_dyn(buffer.read(cx).file())?;
if file.worktree != worktree_handle {
return None;
}
changed_repos
.iter()
.find(|(work_dir, _)| file.path.starts_with(work_dir))?;
Some((buffer, file.path.clone()))
})
.collect::<Vec<_>>();
if future_buffers.len() + current_buffers.len() == 0 {
return;
}
cx.spawn(move |this, mut cx| async move {
// Wait for all of the buffers to load.
let future_buffers = future_buffers.collect::<Vec<_>>().await;
// Reload the diff base for every buffer whose containing git repository has changed.
let snapshot =
worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
let diff_bases_by_buffer = cx
.background_executor()
.spawn(async move {
let mut diff_base_tasks = future_buffers
.into_iter()
.flatten()
.chain(current_buffers)
.filter_map(|(buffer, path)| {
let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
Some(async move {
let base_text =
local_repo_entry.repo().load_index_text(&relative_path);
Some((buffer, base_text))
})
})
.collect::<FuturesUnordered<_>>();
let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
while let Some(diff_base) = diff_base_tasks.next().await {
if let Some(diff_base) = diff_base {
diff_bases.push(diff_base);
}
}
diff_bases
})
.await;
this.update(&mut cx, |_, cx| {
// Assign the new diff bases on all of the buffers.
for (buffer, diff_base) in diff_bases_by_buffer {
buffer.update(cx, |buffer, cx| {
buffer.set_diff_base(diff_base.clone(), cx);
});
cx.emit(BufferStoreEvent::DiffBaseUpdated { buffer })
}
})
})
.detach_and_log_err(cx);
}
fn open_local_buffer_internal(
&mut self,
path: Arc<Path>,
@ -265,9 +423,16 @@ impl BufferStore {
&mut self,
buffer: Model<Buffer>,
path: ProjectPath,
worktree: Model<Worktree>,
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
let Some(worktree) = self
.worktree_store
.read(cx)
.worktree_for_id(path.worktree_id, cx)
else {
return Task::ready(Err(anyhow!("no such worktree")));
};
let old_file = File::from_dyn(buffer.read(cx).file())
.cloned()
.map(Arc::new);
@ -411,6 +576,7 @@ impl BufferStore {
}
}
cx.subscribe(&buffer, Self::on_buffer_event).detach();
cx.emit(BufferStoreEvent::BufferAdded(buffer));
Ok(())
}
@ -461,31 +627,6 @@ impl BufferStore {
.or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
}
fn get_or_remove_by_path(
&mut self,
entry_id: ProjectEntryId,
project_path: &ProjectPath,
) -> Option<(BufferId, Model<Buffer>)> {
let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
Some(&buffer_id) => buffer_id,
None => match self.local_buffer_ids_by_path.get(project_path) {
Some(&buffer_id) => buffer_id,
None => {
return None;
}
},
};
let buffer = if let Some(buffer) = self.get(buffer_id) {
buffer
} else {
self.opened_buffers.remove(&buffer_id);
self.local_buffer_ids_by_path.remove(project_path);
self.local_buffer_ids_by_entry_id.remove(&entry_id);
return None;
};
Some((buffer_id, buffer))
}
pub fn wait_for_remote_buffer(
&mut self,
id: BufferId,
@ -561,25 +702,48 @@ impl BufferStore {
.retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
}
pub fn file_changed(
fn on_buffer_event(
&mut self,
buffer: Model<Buffer>,
event: &BufferEvent,
cx: &mut ModelContext<Self>,
) {
match event {
BufferEvent::FileHandleChanged => {
self.buffer_changed_file(buffer, cx);
}
_ => {}
}
}
fn local_worktree_entry_changed(
&mut self,
path: Arc<Path>,
entry_id: ProjectEntryId,
worktree_handle: &Model<worktree::Worktree>,
path: &Arc<Path>,
worktree: &Model<worktree::Worktree>,
snapshot: &worktree::Snapshot,
cx: &mut ModelContext<Self>,
) -> Option<(Model<Buffer>, Arc<File>, Arc<File>)> {
let (buffer_id, buffer) = self.get_or_remove_by_path(
entry_id,
&ProjectPath {
worktree_id: snapshot.id(),
path,
},
)?;
) -> Option<()> {
let project_path = ProjectPath {
worktree_id: snapshot.id(),
path: path.clone(),
};
let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
Some(&buffer_id) => buffer_id,
None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
};
let buffer = if let Some(buffer) = self.get(buffer_id) {
buffer
} else {
self.opened_buffers.remove(&buffer_id);
self.local_buffer_ids_by_path.remove(&project_path);
self.local_buffer_ids_by_entry_id.remove(&entry_id);
return None;
};
let result = buffer.update(cx, |buffer, cx| {
let (old_file, new_file) = buffer.update(cx, |buffer, cx| {
let old_file = File::from_dyn(buffer.file())?;
if old_file.worktree != *worktree_handle {
if old_file.worktree != *worktree {
return None;
}
@ -592,7 +756,7 @@ impl BufferStore {
entry_id: Some(entry.id),
mtime: entry.mtime,
path: entry.path.clone(),
worktree: worktree_handle.clone(),
worktree: worktree.clone(),
is_deleted: false,
is_private: entry.is_private,
}
@ -602,7 +766,7 @@ impl BufferStore {
entry_id: Some(entry.id),
mtime: entry.mtime,
path: entry.path.clone(),
worktree: worktree_handle.clone(),
worktree: worktree.clone(),
is_deleted: false,
is_private: entry.is_private,
}
@ -612,7 +776,7 @@ impl BufferStore {
entry_id: old_file.entry_id,
path: old_file.path.clone(),
mtime: old_file.mtime,
worktree: worktree_handle.clone(),
worktree: worktree.clone(),
is_deleted: true,
is_private: old_file.is_private,
}
@ -625,47 +789,42 @@ impl BufferStore {
let old_file = Arc::new(old_file.clone());
let new_file = Arc::new(new_file);
buffer.file_updated(new_file.clone(), cx);
Some((cx.handle(), old_file, new_file))
});
Some((old_file, new_file))
})?;
if let Some((buffer, old_file, new_file)) = &result {
if new_file.path != old_file.path {
self.local_buffer_ids_by_path.remove(&ProjectPath {
path: old_file.path.clone(),
worktree_id: old_file.worktree_id(cx),
});
self.local_buffer_ids_by_path.insert(
ProjectPath {
worktree_id: new_file.worktree_id(cx),
path: new_file.path.clone(),
},
buffer_id,
);
cx.emit(BufferStoreEvent::BufferChangedFilePath {
buffer: buffer.clone(),
old_file: Some(old_file.clone()),
});
if new_file.path != old_file.path {
self.local_buffer_ids_by_path.remove(&ProjectPath {
path: old_file.path.clone(),
worktree_id: old_file.worktree_id(cx),
});
self.local_buffer_ids_by_path.insert(
ProjectPath {
worktree_id: new_file.worktree_id(cx),
path: new_file.path.clone(),
},
buffer_id,
);
cx.emit(BufferStoreEvent::BufferChangedFilePath {
buffer: buffer.clone(),
old_file: Some(old_file.clone()),
});
}
if new_file.entry_id != old_file.entry_id {
if let Some(entry_id) = old_file.entry_id {
self.local_buffer_ids_by_entry_id.remove(&entry_id);
}
if new_file.entry_id != old_file.entry_id {
if let Some(entry_id) = old_file.entry_id {
self.local_buffer_ids_by_entry_id.remove(&entry_id);
}
if let Some(entry_id) = new_file.entry_id {
self.local_buffer_ids_by_entry_id
.insert(entry_id, buffer_id);
}
if let Some(entry_id) = new_file.entry_id {
self.local_buffer_ids_by_entry_id
.insert(entry_id, buffer_id);
}
}
result
cx.emit(BufferStoreEvent::LocalBufferUpdated { buffer });
None
}
pub fn buffer_changed_file(
&mut self,
buffer: Model<Buffer>,
cx: &mut AppContext,
) -> Option<()> {
fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
let file = File::from_dyn(buffer.read(cx).file())?;
let remote_id = buffer.read(cx).remote_id();
@ -862,7 +1021,6 @@ impl BufferStore {
pub async fn handle_save_buffer(
this: Model<Self>,
project_id: u64,
worktree: Option<Model<Worktree>>,
envelope: TypedEnvelope<proto::SaveBuffer>,
mut cx: AsyncAppContext,
) -> Result<proto::BufferSaved> {
@ -876,10 +1034,9 @@ impl BufferStore {
let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
if let Some(new_path) = envelope.payload.new_path {
let worktree = worktree.context("no such worktree")?;
let new_path = ProjectPath::from_proto(new_path);
this.update(&mut cx, |this, cx| {
this.save_buffer_as(buffer.clone(), new_path, worktree, cx)
this.save_buffer_as(buffer.clone(), new_path, cx)
})?
.await?;
} else {
@ -895,6 +1052,44 @@ impl BufferStore {
})
}
pub async fn handle_buffer_saved(
this: Model<Self>,
envelope: TypedEnvelope<proto::BufferSaved>,
mut cx: AsyncAppContext,
) -> Result<()> {
let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
let version = deserialize_version(&envelope.payload.version);
let mtime = envelope.payload.mtime.map(|time| time.into());
this.update(&mut cx, |this, cx| {
if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
buffer.update(cx, |buffer, cx| {
buffer.did_save(version, mtime, cx);
});
}
})
}
pub async fn handle_buffer_reloaded(
this: Model<Self>,
envelope: TypedEnvelope<proto::BufferReloaded>,
mut cx: AsyncAppContext,
) -> Result<()> {
let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
let version = deserialize_version(&envelope.payload.version);
let mtime = envelope.payload.mtime.map(|time| time.into());
let line_ending = deserialize_line_ending(
proto::LineEnding::from_i32(envelope.payload.line_ending)
.ok_or_else(|| anyhow!("missing line ending"))?,
);
this.update(&mut cx, |this, cx| {
if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
buffer.update(cx, |buffer, cx| {
buffer.did_reload(version, line_ending, mtime, cx);
});
}
})
}
pub async fn wait_for_loading_buffer(
mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
) -> Result<Model<Buffer>, Arc<anyhow::Error>> {