Separate repository state synchronization from worktree synchronization (#27140)

This PR updates our DB schemas and wire protocol to separate the
synchronization of git statuses and other repository state from the
synchronization of worktrees. This paves the way for moving the code
that executes git status updates out of the `worktree` crate and onto
the new `GitStore`. That end goal is motivated by two (related) points:

- Disentangling git status updates from the worktree's
`BackgroundScanner` will allow us to implement a simpler concurrency
story for those updates, hopefully fixing some known but elusive bugs
(upstream state not updating after push; statuses getting out of sync in
remote projects).
- By moving git repository state to the project-scoped `GitStore`, we
can get rid of the duplication that currently happens when two worktrees
are associated with the same git repository.

Co-authored-by: Max <max@zed.dev>

Release Notes:

- N/A

---------

Co-authored-by: Max <max@zed.dev>
Co-authored-by: Max Brunsfeld <maxbrunsfeld@gmail.com>
This commit is contained in:
Cole Miller 2025-03-20 18:07:03 -04:00 committed by GitHub
parent 700af63c45
commit bc1c0a2297
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1147 additions and 535 deletions

View file

@ -41,7 +41,7 @@ use postage::{
watch,
};
use rpc::{
proto::{self, split_worktree_update, FromProto, ToProto},
proto::{self, split_worktree_related_message, FromProto, ToProto, WorktreeRelatedMessage},
AnyProtoClient,
};
pub use settings::WorktreeId;
@ -140,12 +140,12 @@ struct ScanRequest {
pub struct RemoteWorktree {
snapshot: Snapshot,
background_snapshot: Arc<Mutex<(Snapshot, Vec<proto::UpdateWorktree>)>>,
background_snapshot: Arc<Mutex<(Snapshot, Vec<WorktreeRelatedMessage>)>>,
project_id: u64,
client: AnyProtoClient,
file_scan_inclusions: PathMatcher,
updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
update_observer: Option<mpsc::UnboundedSender<proto::UpdateWorktree>>,
updates_tx: Option<UnboundedSender<WorktreeRelatedMessage>>,
update_observer: Option<mpsc::UnboundedSender<WorktreeRelatedMessage>>,
snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>,
replica_id: ReplicaId,
visible: bool,
@ -200,6 +200,7 @@ pub struct RepositoryEntry {
pub(crate) statuses_by_path: SumTree<StatusEntry>,
work_directory_id: ProjectEntryId,
pub work_directory: WorkDirectory,
work_directory_abs_path: PathBuf,
pub(crate) current_branch: Option<Branch>,
pub current_merge_conflicts: TreeSet<RepoPath>,
}
@ -247,13 +248,12 @@ impl RepositoryEntry {
.cloned()
}
pub fn initial_update(&self) -> proto::RepositoryEntry {
proto::RepositoryEntry {
work_directory_id: self.work_directory_id.to_proto(),
branch: self
.current_branch
.as_ref()
.map(|branch| branch.name.to_string()),
pub fn initial_update(
&self,
project_id: u64,
worktree_scan_id: usize,
) -> proto::UpdateRepository {
proto::UpdateRepository {
branch_summary: self.current_branch.as_ref().map(branch_to_proto),
updated_statuses: self
.statuses_by_path
@ -266,10 +266,26 @@ impl RepositoryEntry {
.iter()
.map(|repo_path| repo_path.to_proto())
.collect(),
project_id,
// This is semantically wrong---we want to move to having separate IDs for repositories.
// But for the moment, RepositoryEntry isn't set up to provide that at this level, so we
// shim it using the work directory's project entry ID. The pair of this + project ID will
// be globally unique.
id: self.work_directory_id().to_proto(),
abs_path: self.work_directory_abs_path.as_path().to_proto(),
entry_ids: vec![self.work_directory_id().to_proto()],
// This is also semantically wrong, and should be replaced once we separate git repo updates
// from worktree scans.
scan_id: worktree_scan_id as u64,
}
}
pub fn build_update(&self, old: &Self) -> proto::RepositoryEntry {
pub fn build_update(
&self,
old: &Self,
project_id: u64,
scan_id: usize,
) -> proto::UpdateRepository {
let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
let mut removed_statuses: Vec<String> = Vec::new();
@ -311,12 +327,7 @@ impl RepositoryEntry {
}
}
proto::RepositoryEntry {
work_directory_id: self.work_directory_id.to_proto(),
branch: self
.current_branch
.as_ref()
.map(|branch| branch.name.to_string()),
proto::UpdateRepository {
branch_summary: self.current_branch.as_ref().map(branch_to_proto),
updated_statuses,
removed_statuses,
@ -325,6 +336,11 @@ impl RepositoryEntry {
.iter()
.map(|path| path.as_ref().to_proto())
.collect(),
project_id,
id: self.work_directory_id.to_proto(),
abs_path: self.work_directory_abs_path.as_path().to_proto(),
entry_ids: vec![self.work_directory_id.to_proto()],
scan_id: scan_id as u64,
}
}
}
@ -808,8 +824,12 @@ impl Worktree {
Arc::<Path>::from_proto(worktree.abs_path),
);
let background_snapshot = Arc::new(Mutex::new((snapshot.clone(), Vec::new())));
let (background_updates_tx, mut background_updates_rx) = mpsc::unbounded();
let background_snapshot = Arc::new(Mutex::new((
snapshot.clone(),
Vec::<WorktreeRelatedMessage>::new(),
)));
let (background_updates_tx, mut background_updates_rx) =
mpsc::unbounded::<WorktreeRelatedMessage>();
let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
let worktree_id = snapshot.id();
@ -839,12 +859,9 @@ impl Worktree {
while let Some(update) = background_updates_rx.next().await {
{
let mut lock = background_snapshot.lock();
if let Err(error) = lock
.0
lock.0
.apply_remote_update(update.clone(), &settings.file_scan_inclusions)
{
log::error!("error applying worktree update: {}", error);
}
.log_err();
lock.1.push(update);
}
snapshot_updated_tx.send(()).await.ok();
@ -864,16 +881,18 @@ impl Worktree {
let mut lock = this.background_snapshot.lock();
this.snapshot = lock.0.clone();
for update in lock.1.drain(..) {
if !update.updated_entries.is_empty()
|| !update.removed_entries.is_empty()
{
entries_changed = true;
}
if !update.updated_repositories.is_empty()
|| !update.removed_repositories.is_empty()
{
git_repos_changed = true;
}
entries_changed |= match &update {
WorktreeRelatedMessage::UpdateWorktree(update_worktree) => {
!update_worktree.updated_entries.is_empty()
|| !update_worktree.removed_entries.is_empty()
}
_ => false,
};
git_repos_changed |= matches!(
update,
WorktreeRelatedMessage::UpdateRepository(_)
| WorktreeRelatedMessage::RemoveRepository(_)
);
if let Some(tx) = &this.update_observer {
tx.unbounded_send(update).ok();
}
@ -1010,7 +1029,7 @@ impl Worktree {
pub fn observe_updates<F, Fut>(&mut self, project_id: u64, cx: &Context<Worktree>, callback: F)
where
F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut,
F: 'static + Send + Fn(WorktreeRelatedMessage) -> Fut,
Fut: 'static + Send + Future<Output = bool>,
{
match self {
@ -2289,8 +2308,8 @@ impl LocalWorktree {
fn observe_updates<F, Fut>(&mut self, project_id: u64, cx: &Context<Worktree>, callback: F)
where
F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut,
Fut: Send + Future<Output = bool>,
F: 'static + Send + Fn(WorktreeRelatedMessage) -> Fut,
Fut: 'static + Send + Future<Output = bool>,
{
if let Some(observer) = self.update_observer.as_mut() {
*observer.resume_updates.borrow_mut() = ();
@ -2308,14 +2327,17 @@ impl LocalWorktree {
let _maintain_remote_snapshot = cx.background_spawn(async move {
let mut is_first = true;
while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await {
let update = if is_first {
let updates = if is_first {
is_first = false;
snapshot.build_initial_update(project_id, worktree_id)
} else {
snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes)
};
for update in proto::split_worktree_update(update) {
for update in updates
.into_iter()
.flat_map(proto::split_worktree_related_message)
{
let _ = resume_updates_rx.try_recv();
loop {
let result = callback(update.clone());
@ -2378,7 +2400,7 @@ impl RemoteWorktree {
self.disconnected = true;
}
pub fn update_from_remote(&self, update: proto::UpdateWorktree) {
pub fn update_from_remote(&self, update: WorktreeRelatedMessage) {
if let Some(updates_tx) = &self.updates_tx {
updates_tx
.unbounded_send(update)
@ -2388,29 +2410,41 @@ impl RemoteWorktree {
fn observe_updates<F, Fut>(&mut self, project_id: u64, cx: &Context<Worktree>, callback: F)
where
F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut,
F: 'static + Send + Fn(WorktreeRelatedMessage) -> Fut,
Fut: 'static + Send + Future<Output = bool>,
{
let (tx, mut rx) = mpsc::unbounded();
let initial_update = self
let initial_updates = self
.snapshot
.build_initial_update(project_id, self.id().to_proto());
self.update_observer = Some(tx);
cx.spawn(async move |this, cx| {
let mut update = initial_update;
let mut updates = initial_updates;
'outer: loop {
// SSH projects use a special project ID of 0, and we need to
// remap it to the correct one here.
update.project_id = project_id;
for mut update in updates {
// SSH projects use a special project ID of 0, and we need to
// remap it to the correct one here.
match &mut update {
WorktreeRelatedMessage::UpdateWorktree(update_worktree) => {
update_worktree.project_id = project_id;
}
WorktreeRelatedMessage::UpdateRepository(update_repository) => {
update_repository.project_id = project_id;
}
WorktreeRelatedMessage::RemoveRepository(remove_repository) => {
remove_repository.project_id = project_id;
}
};
for chunk in split_worktree_update(update) {
if !callback(chunk).await {
break 'outer;
for chunk in split_worktree_related_message(update) {
if !callback(chunk).await {
break 'outer;
}
}
}
if let Some(next_update) = rx.next().await {
update = next_update;
updates = vec![next_update];
} else {
break;
}
@ -2570,7 +2604,11 @@ impl Snapshot {
self.abs_path.as_path()
}
fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree {
fn build_initial_update(
&self,
project_id: u64,
worktree_id: u64,
) -> Vec<WorktreeRelatedMessage> {
let mut updated_entries = self
.entries_by_path
.iter()
@ -2578,14 +2616,7 @@ impl Snapshot {
.collect::<Vec<_>>();
updated_entries.sort_unstable_by_key(|e| e.id);
let mut updated_repositories = self
.repositories
.iter()
.map(|repository| repository.initial_update())
.collect::<Vec<_>>();
updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
proto::UpdateWorktree {
[proto::UpdateWorktree {
project_id,
worktree_id,
abs_path: self.abs_path().to_proto(),
@ -2594,9 +2625,18 @@ impl Snapshot {
removed_entries: Vec::new(),
scan_id: self.scan_id as u64,
is_last_update: self.completed_scan_id == self.scan_id,
updated_repositories,
// Sent in separate messages.
updated_repositories: Vec::new(),
removed_repositories: Vec::new(),
}
.into()]
.into_iter()
.chain(
self.repositories
.iter()
.map(|repository| repository.initial_update(project_id, self.scan_id).into()),
)
.collect()
}
pub fn absolutize(&self, path: &Path) -> Result<PathBuf> {
@ -2678,9 +2718,97 @@ impl Snapshot {
}
}
pub(crate) fn apply_remote_update(
pub(crate) fn apply_update_repository(
&mut self,
mut update: proto::UpdateWorktree,
update: proto::UpdateRepository,
) -> Result<()> {
// NOTE: this is practically but not semantically correct. For now we're using the
// ID field to store the work directory ID, but eventually it will be a different
// kind of ID.
let work_directory_id = ProjectEntryId::from_proto(update.id);
if let Some(work_dir_entry) = self.entry_for_id(work_directory_id) {
let conflicted_paths = TreeSet::from_ordered_entries(
update
.current_merge_conflicts
.into_iter()
.map(|path| RepoPath(Path::new(&path).into())),
);
if self
.repositories
.contains(&PathKey(work_dir_entry.path.clone()), &())
{
let edits = update
.removed_statuses
.into_iter()
.map(|path| Edit::Remove(PathKey(FromProto::from_proto(path))))
.chain(
update
.updated_statuses
.into_iter()
.filter_map(|updated_status| {
Some(Edit::Insert(updated_status.try_into().log_err()?))
}),
)
.collect::<Vec<_>>();
self.repositories
.update(&PathKey(work_dir_entry.path.clone()), &(), |repo| {
repo.current_branch = update.branch_summary.as_ref().map(proto_to_branch);
repo.statuses_by_path.edit(edits, &());
repo.current_merge_conflicts = conflicted_paths
});
} else {
let statuses = SumTree::from_iter(
update
.updated_statuses
.into_iter()
.filter_map(|updated_status| updated_status.try_into().log_err()),
&(),
);
self.repositories.insert_or_replace(
RepositoryEntry {
work_directory_id,
// When syncing repository entries from a peer, we don't need
// the location_in_repo field, since git operations don't happen locally
// anyway.
work_directory: WorkDirectory::InProject {
relative_path: work_dir_entry.path.clone(),
},
current_branch: update.branch_summary.as_ref().map(proto_to_branch),
statuses_by_path: statuses,
current_merge_conflicts: conflicted_paths,
work_directory_abs_path: update.abs_path.into(),
},
&(),
);
}
} else {
log::error!("no work directory entry for repository {:?}", update.id)
}
Ok(())
}
pub(crate) fn apply_remove_repository(
&mut self,
update: proto::RemoveRepository,
) -> Result<()> {
// NOTE: this is practically but not semantically correct. For now we're using the
// ID field to store the work directory ID, but eventually it will be a different
// kind of ID.
let work_directory_id = ProjectEntryId::from_proto(update.id);
self.repositories.retain(&(), |entry: &RepositoryEntry| {
entry.work_directory_id != work_directory_id
});
Ok(())
}
pub(crate) fn apply_update_worktree(
&mut self,
update: proto::UpdateWorktree,
always_included_paths: &PathMatcher,
) -> Result<()> {
log::debug!(
@ -2726,79 +2854,6 @@ impl Snapshot {
self.entries_by_path.edit(entries_by_path_edits, &());
self.entries_by_id.edit(entries_by_id_edits, &());
update.removed_repositories.sort_unstable();
self.repositories.retain(&(), |entry: &RepositoryEntry| {
update
.removed_repositories
.binary_search(&entry.work_directory_id.to_proto())
.is_err()
});
for repository in update.updated_repositories {
let work_directory_id = ProjectEntryId::from_proto(repository.work_directory_id);
if let Some(work_dir_entry) = self.entry_for_id(work_directory_id) {
let conflicted_paths = TreeSet::from_ordered_entries(
repository
.current_merge_conflicts
.into_iter()
.map(|path| RepoPath(Path::new(&path).into())),
);
if self
.repositories
.contains(&PathKey(work_dir_entry.path.clone()), &())
{
let edits = repository
.removed_statuses
.into_iter()
.map(|path| Edit::Remove(PathKey(FromProto::from_proto(path))))
.chain(repository.updated_statuses.into_iter().filter_map(
|updated_status| {
Some(Edit::Insert(updated_status.try_into().log_err()?))
},
))
.collect::<Vec<_>>();
self.repositories
.update(&PathKey(work_dir_entry.path.clone()), &(), |repo| {
repo.current_branch =
repository.branch_summary.as_ref().map(proto_to_branch);
repo.statuses_by_path.edit(edits, &());
repo.current_merge_conflicts = conflicted_paths
});
} else {
let statuses = SumTree::from_iter(
repository
.updated_statuses
.into_iter()
.filter_map(|updated_status| updated_status.try_into().log_err()),
&(),
);
self.repositories.insert_or_replace(
RepositoryEntry {
work_directory_id,
// When syncing repository entries from a peer, we don't need
// the location_in_repo field, since git operations don't happen locally
// anyway.
work_directory: WorkDirectory::InProject {
relative_path: work_dir_entry.path.clone(),
},
current_branch: repository.branch_summary.as_ref().map(proto_to_branch),
statuses_by_path: statuses,
current_merge_conflicts: conflicted_paths,
},
&(),
);
}
} else {
log::error!(
"no work directory entry for repository {:?}",
repository.work_directory_id
)
}
}
self.scan_id = update.scan_id as usize;
if update.is_last_update {
self.completed_scan_id = update.scan_id as usize;
@ -2807,6 +2862,24 @@ impl Snapshot {
Ok(())
}
pub(crate) fn apply_remote_update(
&mut self,
update: WorktreeRelatedMessage,
always_included_paths: &PathMatcher,
) -> Result<()> {
match update {
WorktreeRelatedMessage::UpdateWorktree(update) => {
self.apply_update_worktree(update, always_included_paths)
}
WorktreeRelatedMessage::UpdateRepository(update) => {
self.apply_update_repository(update)
}
WorktreeRelatedMessage::RemoveRepository(update) => {
self.apply_remove_repository(update)
}
}
}
pub fn entry_count(&self) -> usize {
self.entries_by_path.summary().count
}
@ -3046,11 +3119,10 @@ impl LocalSnapshot {
worktree_id: u64,
entry_changes: UpdatedEntriesSet,
repo_changes: UpdatedGitRepositoriesSet,
) -> proto::UpdateWorktree {
) -> Vec<WorktreeRelatedMessage> {
let mut updated_entries = Vec::new();
let mut removed_entries = Vec::new();
let mut updated_repositories = Vec::new();
let mut removed_repositories = Vec::new();
let mut updates = Vec::new();
for (_, entry_id, path_change) in entry_changes.iter() {
if let PathChange::Removed = path_change {
@ -3064,13 +3136,23 @@ impl LocalSnapshot {
let new_repo = self.repositories.get(&PathKey(entry.path.clone()), &());
match (&change.old_repository, new_repo) {
(Some(old_repo), Some(new_repo)) => {
updated_repositories.push(new_repo.build_update(old_repo));
updates.push(
new_repo
.build_update(old_repo, project_id, self.scan_id)
.into(),
);
}
(None, Some(new_repo)) => {
updated_repositories.push(new_repo.initial_update());
updates.push(new_repo.initial_update(project_id, self.scan_id).into());
}
(Some(old_repo), None) => {
removed_repositories.push(old_repo.work_directory_id.to_proto());
updates.push(
proto::RemoveRepository {
project_id,
id: old_repo.work_directory_id.to_proto(),
}
.into(),
);
}
_ => {}
}
@ -3078,24 +3160,27 @@ impl LocalSnapshot {
removed_entries.sort_unstable();
updated_entries.sort_unstable_by_key(|e| e.id);
removed_repositories.sort_unstable();
updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
// TODO - optimize, knowing that removed_entries are sorted.
removed_entries.retain(|id| updated_entries.binary_search_by_key(id, |e| e.id).is_err());
proto::UpdateWorktree {
project_id,
worktree_id,
abs_path: self.abs_path().to_proto(),
root_name: self.root_name().to_string(),
updated_entries,
removed_entries,
scan_id: self.scan_id as u64,
is_last_update: self.completed_scan_id == self.scan_id,
updated_repositories,
removed_repositories,
}
updates.push(
proto::UpdateWorktree {
project_id,
worktree_id,
abs_path: self.abs_path().to_proto(),
root_name: self.root_name().to_string(),
updated_entries,
removed_entries,
scan_id: self.scan_id as u64,
is_last_update: self.completed_scan_id == self.scan_id,
// Sent in separate messages.
updated_repositories: Vec::new(),
removed_repositories: Vec::new(),
}
.into(),
);
updates
}
fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
@ -3547,12 +3632,15 @@ impl BackgroundScannerState {
watcher: &dyn Watcher,
) -> Option<LocalRepositoryEntry> {
log::info!("insert git repository for {dot_git_path:?}");
let work_dir_id = self
.snapshot
.entry_for_path(work_directory.path_key().0)
.map(|entry| entry.id)?;
let work_dir_entry = self.snapshot.entry_for_path(work_directory.path_key().0)?;
let work_directory_abs_path = self.snapshot.absolutize(&work_dir_entry.path).log_err()?;
if self.snapshot.git_repositories.get(&work_dir_id).is_some() {
if self
.snapshot
.git_repositories
.get(&work_dir_entry.id)
.is_some()
{
log::info!("existing git repository for {work_directory:?}");
return None;
}
@ -3593,10 +3681,12 @@ impl BackgroundScannerState {
);
}
let work_directory_id = work_dir_entry.id;
self.snapshot.repositories.insert_or_replace(
RepositoryEntry {
work_directory_id: work_dir_id,
work_directory_id,
work_directory: work_directory.clone(),
work_directory_abs_path,
current_branch: None,
statuses_by_path: Default::default(),
current_merge_conflicts: Default::default(),
@ -3605,7 +3695,7 @@ impl BackgroundScannerState {
);
let local_repository = LocalRepositoryEntry {
work_directory_id: work_dir_id,
work_directory_id,
work_directory: work_directory.clone(),
git_dir_scan_id: 0,
status_scan_id: 0,
@ -3618,7 +3708,7 @@ impl BackgroundScannerState {
self.snapshot
.git_repositories
.insert(work_dir_id, local_repository.clone());
.insert(work_directory_id, local_repository.clone());
log::info!("inserting new local git repository");
Some(local_repository)

View file

@ -18,6 +18,7 @@ use parking_lot::Mutex;
use postage::stream::Stream;
use pretty_assertions::assert_eq;
use rand::prelude::*;
use rpc::proto::WorktreeRelatedMessage;
use serde_json::json;
use settings::{Settings, SettingsStore};
use std::{
@ -1748,7 +1749,12 @@ async fn test_random_worktree_operations_during_initial_scan(
for (i, snapshot) in snapshots.into_iter().enumerate().rev() {
let mut updated_snapshot = snapshot.clone();
for update in updates.lock().iter() {
if update.scan_id >= updated_snapshot.scan_id() as u64 {
let scan_id = match update {
WorktreeRelatedMessage::UpdateWorktree(update) => update.scan_id,
WorktreeRelatedMessage::UpdateRepository(update) => update.scan_id,
WorktreeRelatedMessage::RemoveRepository(_) => u64::MAX,
};
if scan_id >= updated_snapshot.scan_id() as u64 {
updated_snapshot
.apply_remote_update(update.clone(), &settings.file_scan_inclusions)
.unwrap();
@ -1885,7 +1891,12 @@ async fn test_random_worktree_changes(cx: &mut TestAppContext, mut rng: StdRng)
for (i, mut prev_snapshot) in snapshots.into_iter().enumerate().rev() {
for update in updates.lock().iter() {
if update.scan_id >= prev_snapshot.scan_id() as u64 {
let scan_id = match update {
WorktreeRelatedMessage::UpdateWorktree(update) => update.scan_id,
WorktreeRelatedMessage::UpdateRepository(update) => update.scan_id,
WorktreeRelatedMessage::RemoveRepository(_) => u64::MAX,
};
if scan_id >= prev_snapshot.scan_id() as u64 {
prev_snapshot
.apply_remote_update(update.clone(), &settings.file_scan_inclusions)
.unwrap();