Wait for remote worktree to catch up with host before mutating entries
This ensures that entries don't randomly re-appear on remote worktrees due to observing an update too late. In fact, it ensures that the remote worktree has the same starting state of the host before preemptively applying the fs operation locally.
This commit is contained in:
parent
ecb847a027
commit
6212f2fe30
6 changed files with 171 additions and 94 deletions
|
@ -29,7 +29,6 @@ use language::{
|
|||
use lazy_static::lazy_static;
|
||||
use parking_lot::Mutex;
|
||||
use postage::{
|
||||
barrier,
|
||||
prelude::{Sink as _, Stream as _},
|
||||
watch,
|
||||
};
|
||||
|
@ -84,17 +83,13 @@ pub struct RemoteWorktree {
|
|||
pub(crate) background_snapshot: Arc<Mutex<Snapshot>>,
|
||||
project_id: u64,
|
||||
client: Arc<Client>,
|
||||
updates_tx: UnboundedSender<BackgroundUpdate>,
|
||||
updates_tx: UnboundedSender<proto::UpdateWorktree>,
|
||||
last_scan_id_rx: watch::Receiver<usize>,
|
||||
replica_id: ReplicaId,
|
||||
diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
|
||||
visible: bool,
|
||||
}
|
||||
|
||||
enum BackgroundUpdate {
|
||||
Update(proto::UpdateWorktree),
|
||||
Barrier(barrier::Sender),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Snapshot {
|
||||
id: WorktreeId,
|
||||
|
@ -102,12 +97,12 @@ pub struct Snapshot {
|
|||
root_char_bag: CharBag,
|
||||
entries_by_path: SumTree<Entry>,
|
||||
entries_by_id: SumTree<PathEntry>,
|
||||
scan_id: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LocalSnapshot {
|
||||
abs_path: Arc<Path>,
|
||||
scan_id: usize,
|
||||
ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
|
||||
removed_entry_ids: HashMap<u64, ProjectEntryId>,
|
||||
next_entry_id: Arc<AtomicUsize>,
|
||||
|
@ -221,11 +216,13 @@ impl Worktree {
|
|||
root_char_bag,
|
||||
entries_by_path: Default::default(),
|
||||
entries_by_id: Default::default(),
|
||||
scan_id: worktree.scan_id as usize,
|
||||
};
|
||||
|
||||
let (updates_tx, mut updates_rx) = mpsc::unbounded();
|
||||
let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
|
||||
let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
|
||||
let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize);
|
||||
let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
|
||||
Worktree::Remote(RemoteWorktree {
|
||||
project_id: project_remote_id,
|
||||
|
@ -233,6 +230,7 @@ impl Worktree {
|
|||
snapshot: snapshot.clone(),
|
||||
background_snapshot: background_snapshot.clone(),
|
||||
updates_tx,
|
||||
last_scan_id_rx,
|
||||
client: client.clone(),
|
||||
diagnostic_summaries: TreeMap::from_ordered_entries(
|
||||
worktree.diagnostic_summaries.into_iter().map(|summary| {
|
||||
|
@ -291,14 +289,12 @@ impl Worktree {
|
|||
cx.background()
|
||||
.spawn(async move {
|
||||
while let Some(update) = updates_rx.next().await {
|
||||
if let BackgroundUpdate::Update(update) = update {
|
||||
if let Err(error) =
|
||||
background_snapshot.lock().apply_remote_update(update)
|
||||
{
|
||||
log::error!("error applying worktree update: {}", error);
|
||||
}
|
||||
snapshot_updated_tx.send(()).await.ok();
|
||||
if let Err(error) =
|
||||
background_snapshot.lock().apply_remote_update(update)
|
||||
{
|
||||
log::error!("error applying worktree update: {}", error);
|
||||
}
|
||||
snapshot_updated_tx.send(()).await.ok();
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
@ -308,7 +304,11 @@ impl Worktree {
|
|||
async move {
|
||||
while let Some(_) = snapshot_updated_rx.recv().await {
|
||||
if let Some(this) = this.upgrade(&cx) {
|
||||
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.poll_snapshot(cx);
|
||||
let this = this.as_remote_mut().unwrap();
|
||||
*last_scan_id_tx.borrow_mut() = this.snapshot.scan_id;
|
||||
});
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
@ -368,6 +368,13 @@ impl Worktree {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn scan_id(&self) -> usize {
|
||||
match self {
|
||||
Worktree::Local(worktree) => worktree.snapshot.scan_id,
|
||||
Worktree::Remote(worktree) => worktree.snapshot.scan_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_visible(&self) -> bool {
|
||||
match self {
|
||||
Worktree::Local(worktree) => worktree.visible,
|
||||
|
@ -465,7 +472,6 @@ impl LocalWorktree {
|
|||
let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
|
||||
let mut snapshot = LocalSnapshot {
|
||||
abs_path,
|
||||
scan_id: 0,
|
||||
ignores: Default::default(),
|
||||
removed_entry_ids: Default::default(),
|
||||
next_entry_id,
|
||||
|
@ -475,6 +481,7 @@ impl LocalWorktree {
|
|||
root_char_bag,
|
||||
entries_by_path: Default::default(),
|
||||
entries_by_id: Default::default(),
|
||||
scan_id: 0,
|
||||
},
|
||||
};
|
||||
if let Some(metadata) = metadata {
|
||||
|
@ -505,24 +512,13 @@ impl LocalWorktree {
|
|||
|
||||
cx.spawn_weak(|this, mut cx| async move {
|
||||
while let Some(scan_state) = scan_states_rx.next().await {
|
||||
if let Some(handle) = this.upgrade(&cx) {
|
||||
let to_send = handle.update(&mut cx, |this, cx| {
|
||||
last_scan_state_tx.blocking_send(scan_state).ok();
|
||||
if let Some(this) = this.upgrade(&cx) {
|
||||
last_scan_state_tx.blocking_send(scan_state).ok();
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.poll_snapshot(cx);
|
||||
let tree = this.as_local_mut().unwrap();
|
||||
if !tree.is_scanning() {
|
||||
if let Some(share) = tree.share.as_ref() {
|
||||
return Some((tree.snapshot(), share.snapshots_tx.clone()));
|
||||
}
|
||||
}
|
||||
None
|
||||
});
|
||||
|
||||
if let Some((snapshot, snapshots_to_send_tx)) = to_send {
|
||||
if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
|
||||
log::error!("error submitting snapshot to send {}", err);
|
||||
}
|
||||
}
|
||||
this.as_local().unwrap().broadcast_snapshot()
|
||||
})
|
||||
.await;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
@ -745,7 +741,11 @@ impl LocalWorktree {
|
|||
let mut snapshot = this.background_snapshot.lock();
|
||||
snapshot.delete_entry(entry_id);
|
||||
});
|
||||
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.poll_snapshot(cx);
|
||||
this.as_local().unwrap().broadcast_snapshot()
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
@ -780,7 +780,11 @@ impl LocalWorktree {
|
|||
)
|
||||
})
|
||||
.await?;
|
||||
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.poll_snapshot(cx);
|
||||
this.as_local().unwrap().broadcast_snapshot()
|
||||
})
|
||||
.await;
|
||||
Ok(entry)
|
||||
}))
|
||||
}
|
||||
|
@ -814,7 +818,11 @@ impl LocalWorktree {
|
|||
.refresh_entry(path, abs_path, None)
|
||||
})
|
||||
.await?;
|
||||
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.poll_snapshot(cx);
|
||||
this.as_local().unwrap().broadcast_snapshot()
|
||||
})
|
||||
.await;
|
||||
Ok(entry)
|
||||
})
|
||||
}
|
||||
|
@ -923,6 +931,7 @@ impl LocalWorktree {
|
|||
.map(Into::into)
|
||||
.collect(),
|
||||
removed_entries: Default::default(),
|
||||
scan_id: snapshot.scan_id as u64,
|
||||
})
|
||||
.await
|
||||
{
|
||||
|
@ -991,6 +1000,23 @@ impl LocalWorktree {
|
|||
pub fn is_shared(&self) -> bool {
|
||||
self.share.is_some()
|
||||
}
|
||||
|
||||
fn broadcast_snapshot(&self) -> impl Future<Output = ()> {
|
||||
let mut to_send = None;
|
||||
if !self.is_scanning() {
|
||||
if let Some(share) = self.share.as_ref() {
|
||||
to_send = Some((self.snapshot(), share.snapshots_tx.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
async move {
|
||||
if let Some((snapshot, snapshots_to_send_tx)) = to_send {
|
||||
if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
|
||||
log::error!("error submitting snapshot to send {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteWorktree {
|
||||
|
@ -1003,18 +1029,19 @@ impl RemoteWorktree {
|
|||
envelope: TypedEnvelope<proto::UpdateWorktree>,
|
||||
) -> Result<()> {
|
||||
self.updates_tx
|
||||
.unbounded_send(BackgroundUpdate::Update(envelope.payload))
|
||||
.unbounded_send(envelope.payload)
|
||||
.expect("consumer runs to completion");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish_pending_remote_updates(&self) -> impl Future<Output = ()> {
|
||||
let (tx, mut rx) = barrier::channel();
|
||||
self.updates_tx
|
||||
.unbounded_send(BackgroundUpdate::Barrier(tx))
|
||||
.expect("consumer runs to completion");
|
||||
fn wait_for_snapshot(&self, scan_id: usize) -> impl Future<Output = ()> {
|
||||
let mut rx = self.last_scan_id_rx.clone();
|
||||
async move {
|
||||
rx.recv().await;
|
||||
while let Some(applied_scan_id) = rx.next().await {
|
||||
if applied_scan_id >= scan_id {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1038,16 +1065,12 @@ impl RemoteWorktree {
|
|||
pub fn insert_entry(
|
||||
&self,
|
||||
entry: proto::Entry,
|
||||
scan_id: usize,
|
||||
cx: &mut ModelContext<Worktree>,
|
||||
) -> Task<Result<Entry>> {
|
||||
let wait_for_snapshot = self.wait_for_snapshot(scan_id);
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
this.update(&mut cx, |worktree, _| {
|
||||
worktree
|
||||
.as_remote_mut()
|
||||
.unwrap()
|
||||
.finish_pending_remote_updates()
|
||||
})
|
||||
.await;
|
||||
wait_for_snapshot.await;
|
||||
this.update(&mut cx, |worktree, _| {
|
||||
let worktree = worktree.as_remote_mut().unwrap();
|
||||
let mut snapshot = worktree.background_snapshot.lock();
|
||||
|
@ -1061,16 +1084,12 @@ impl RemoteWorktree {
|
|||
pub(crate) fn delete_entry(
|
||||
&self,
|
||||
id: ProjectEntryId,
|
||||
scan_id: usize,
|
||||
cx: &mut ModelContext<Worktree>,
|
||||
) -> Task<Result<()>> {
|
||||
let wait_for_snapshot = self.wait_for_snapshot(scan_id);
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
this.update(&mut cx, |worktree, _| {
|
||||
worktree
|
||||
.as_remote_mut()
|
||||
.unwrap()
|
||||
.finish_pending_remote_updates()
|
||||
})
|
||||
.await;
|
||||
wait_for_snapshot.await;
|
||||
this.update(&mut cx, |worktree, _| {
|
||||
let worktree = worktree.as_remote_mut().unwrap();
|
||||
let mut snapshot = worktree.background_snapshot.lock();
|
||||
|
@ -1145,6 +1164,7 @@ impl Snapshot {
|
|||
|
||||
self.entries_by_path.edit(entries_by_path_edits, &());
|
||||
self.entries_by_id.edit(entries_by_id_edits, &());
|
||||
self.scan_id = update.scan_id as usize;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1233,6 +1253,10 @@ impl Snapshot {
|
|||
&self.root_name
|
||||
}
|
||||
|
||||
pub fn scan_id(&self) -> usize {
|
||||
self.scan_id
|
||||
}
|
||||
|
||||
pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
|
||||
let path = path.as_ref();
|
||||
self.traverse_from_path(true, true, path)
|
||||
|
@ -1282,6 +1306,7 @@ impl LocalSnapshot {
|
|||
.map(|(path, summary)| summary.to_proto(&path.0))
|
||||
.collect(),
|
||||
visible,
|
||||
scan_id: self.scan_id as u64,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1347,6 +1372,7 @@ impl LocalSnapshot {
|
|||
root_name: self.root_name().to_string(),
|
||||
updated_entries,
|
||||
removed_entries,
|
||||
scan_id: self.scan_id as u64,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1390,11 +1416,18 @@ impl LocalSnapshot {
|
|||
entries: impl IntoIterator<Item = Entry>,
|
||||
ignore: Option<Arc<Gitignore>>,
|
||||
) {
|
||||
let mut parent_entry = self
|
||||
.entries_by_path
|
||||
.get(&PathKey(parent_path.clone()), &())
|
||||
.unwrap()
|
||||
.clone();
|
||||
let mut parent_entry = if let Some(parent_entry) =
|
||||
self.entries_by_path.get(&PathKey(parent_path.clone()), &())
|
||||
{
|
||||
parent_entry.clone()
|
||||
} else {
|
||||
log::warn!(
|
||||
"populating a directory {:?} that has been removed",
|
||||
parent_path
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(ignore) = ignore {
|
||||
self.ignores.insert(parent_path, (ignore, self.scan_id));
|
||||
}
|
||||
|
@ -1454,7 +1487,7 @@ impl LocalSnapshot {
|
|||
|
||||
if path.file_name() == Some(&GITIGNORE) {
|
||||
if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
|
||||
*scan_id = self.scan_id;
|
||||
*scan_id = self.snapshot.scan_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2773,7 +2806,6 @@ mod tests {
|
|||
let next_entry_id = Arc::new(AtomicUsize::new(0));
|
||||
let mut initial_snapshot = LocalSnapshot {
|
||||
abs_path: root_dir.path().into(),
|
||||
scan_id: 0,
|
||||
removed_entry_ids: Default::default(),
|
||||
ignores: Default::default(),
|
||||
next_entry_id: next_entry_id.clone(),
|
||||
|
@ -2783,6 +2815,7 @@ mod tests {
|
|||
entries_by_id: Default::default(),
|
||||
root_name: Default::default(),
|
||||
root_char_bag: Default::default(),
|
||||
scan_id: 0,
|
||||
},
|
||||
};
|
||||
initial_snapshot.insert_entry(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue