Ensure worktree is registered/shared synchronously

This commit is contained in:
Antonio Scandurra 2022-03-03 10:10:53 +01:00
parent 530f15b46b
commit 53327e2bf0
3 changed files with 61 additions and 42 deletions

View file

@ -2396,17 +2396,18 @@ impl Project {
}); });
if let Some(project_id) = remote_project_id { if let Some(project_id) = remote_project_id {
worktree
.update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().register(project_id, cx)
})
.await?;
if is_shared { if is_shared {
worktree worktree
.update(&mut cx, |worktree, cx| { .update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().share(project_id, cx) worktree.as_local_mut().unwrap().share(project_id, cx)
}) })
.await?; .await?;
} else {
worktree
.update(&mut cx, |worktree, cx| {
worktree.as_local_mut().unwrap().register(project_id, cx)
})
.await?;
} }
} }

View file

@ -741,8 +741,9 @@ impl LocalWorktree {
authorized_logins: self.authorized_logins(), authorized_logins: self.authorized_logins(),
visible: self.visible, visible: self.visible,
}; };
let request = client.request(register_message);
cx.spawn(|this, mut cx| async move { cx.spawn(|this, mut cx| async move {
let response = client.request(register_message).await; let response = request.await;
this.update(&mut cx, |this, _| { this.update(&mut cx, |this, _| {
let worktree = this.as_local_mut().unwrap(); let worktree = this.as_local_mut().unwrap();
match response { match response {
@ -759,44 +760,49 @@ impl LocalWorktree {
}) })
} }
pub fn share( pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
&mut self, let register = self.register(project_id, cx);
project_id: u64,
cx: &mut ModelContext<Worktree>,
) -> impl Future<Output = Result<()>> {
let (mut share_tx, mut share_rx) = oneshot::channel(); let (mut share_tx, mut share_rx) = oneshot::channel();
let (snapshots_to_send_tx, snapshots_to_send_rx) =
smol::channel::unbounded::<LocalSnapshot>();
if self.share.is_some() { if self.share.is_some() {
let _ = share_tx.try_send(Ok(())); let _ = share_tx.try_send(Ok(()));
} else { } else {
let snapshot = self.snapshot(); let snapshot = self.snapshot();
let rpc = self.client.clone(); let rpc = self.client.clone();
let worktree_id = cx.model_id() as u64; let worktree_id = cx.model_id() as u64;
let (snapshots_to_send_tx, snapshots_to_send_rx) =
smol::channel::unbounded::<LocalSnapshot>();
let maintain_remote_snapshot = cx.background().spawn({ let maintain_remote_snapshot = cx.background().spawn({
let rpc = rpc.clone(); let rpc = rpc.clone();
let snapshot = snapshot.clone();
let diagnostic_summaries = self.diagnostic_summaries.clone(); let diagnostic_summaries = self.diagnostic_summaries.clone();
async move { async move {
if let Err(error) = rpc match snapshots_to_send_rx.recv().await {
.request(proto::UpdateWorktree { Ok(snapshot) => {
project_id, if let Err(error) = rpc
worktree_id, .request(proto::UpdateWorktree {
root_name: snapshot.root_name().to_string(), project_id,
updated_entries: snapshot worktree_id,
.entries_by_path root_name: snapshot.root_name().to_string(),
.iter() updated_entries: snapshot
.filter(|e| !e.is_ignored) .entries_by_path
.map(Into::into) .iter()
.collect(), .filter(|e| !e.is_ignored)
removed_entries: Default::default(), .map(Into::into)
}) .collect(),
.await removed_entries: Default::default(),
{ })
let _ = share_tx.try_send(Err(error)); .await
return Err(anyhow!("failed to send initial update worktree")); {
} else { let _ = share_tx.try_send(Err(error));
let _ = share_tx.try_send(Ok(())); return Err(anyhow!("failed to send initial update worktree"));
} else {
let _ = share_tx.try_send(Ok(()));
}
}
Err(error) => {
let _ = share_tx.try_send(Err(error.into()));
return Err(anyhow!("failed to send initial update worktree"));
}
} }
for (path, summary) in diagnostic_summaries.iter() { for (path, summary) in diagnostic_summaries.iter() {
@ -821,17 +827,24 @@ impl LocalWorktree {
}); });
self.share = Some(ShareState { self.share = Some(ShareState {
project_id, project_id,
snapshots_tx: snapshots_to_send_tx, snapshots_tx: snapshots_to_send_tx.clone(),
_maintain_remote_snapshot: Some(maintain_remote_snapshot), _maintain_remote_snapshot: Some(maintain_remote_snapshot),
}); });
} }
async move { cx.spawn_weak(|this, cx| async move {
register.await?;
if let Some(this) = this.upgrade(&cx) {
this.read_with(&cx, |this, _| {
let this = this.as_local().unwrap();
let _ = snapshots_to_send_tx.try_send(this.snapshot());
});
}
share_rx share_rx
.next() .next()
.await .await
.unwrap_or_else(|| Err(anyhow!("share ended"))) .unwrap_or_else(|| Err(anyhow!("share ended")))
} })
} }
pub fn unshare(&mut self) { pub fn unshare(&mut self) {

View file

@ -4664,12 +4664,17 @@ mod tests {
} }
log::info!("Host: find/create local worktree {:?}", path); log::info!("Host: find/create local worktree {:?}", path);
project let find_or_create_worktree = project.update(&mut cx, |project, cx| {
.update(&mut cx, |project, cx| { project.find_or_create_local_worktree(path, true, cx)
project.find_or_create_local_worktree(path, true, cx) });
}) let find_or_create_worktree = async move {
.await find_or_create_worktree.await.unwrap();
.unwrap(); };
if rng.lock().gen() {
cx.background().spawn(find_or_create_worktree).detach();
} else {
find_or_create_worktree.await;
}
} }
10..=80 if !files.lock().is_empty() => { 10..=80 if !files.lock().is_empty() => {
let buffer = if self.buffers.is_empty() || rng.lock().gen() { let buffer = if self.buffers.is_empty() || rng.lock().gen() {