This commit is contained in:
Antonio Scandurra 2023-10-23 16:23:38 +02:00
parent cc445f7cef
commit efbf0c828d
8 changed files with 336 additions and 289 deletions

View file

@ -2,7 +2,7 @@ use crate::{
copy_recursive, ignore::IgnoreStack, DiagnosticSummary, ProjectEntryId, RemoveOptions,
};
use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, Context as _, Result};
use client2::{proto, Client};
use clock::ReplicaId;
use collections::{HashMap, HashSet, VecDeque};
@ -21,7 +21,9 @@ use futures::{
};
use fuzzy2::CharBag;
use git::{DOT_GIT, GITIGNORE};
use gpui2::{AppContext, AsyncAppContext, EventEmitter, Executor, Handle, ModelContext, Task};
use gpui2::{
AppContext, AsyncAppContext, Context, EventEmitter, Executor, Handle, ModelContext, Task,
};
use language2::{
proto::{
deserialize_fingerprint, deserialize_version, serialize_fingerprint, serialize_line_ending,
@ -299,7 +301,7 @@ impl Worktree {
.await
.context("failed to stat worktree path")?;
Ok(cx.add_model(move |cx: &mut ModelContext<Worktree>| {
cx.entity(move |cx: &mut ModelContext<Worktree>| {
let root_name = abs_path
.file_name()
.map_or(String::new(), |f| f.to_string_lossy().to_string());
@ -308,7 +310,7 @@ impl Worktree {
ignores_by_parent_abs_path: Default::default(),
git_repositories: Default::default(),
snapshot: Snapshot {
id: WorktreeId::from_usize(cx.model_id()),
id: WorktreeId::from_usize(cx.entity_id()),
abs_path: abs_path.clone(),
root_name: root_name.clone(),
root_char_bag: root_name.chars().map(|c| c.to_ascii_lowercase()).collect(),
@ -336,8 +338,8 @@ impl Worktree {
let (path_prefixes_to_scan_tx, path_prefixes_to_scan_rx) = channel::unbounded();
let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
cx.spawn_weak(|this, mut cx| async move {
while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) {
cx.spawn(|this, mut cx| async move {
while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade()) {
this.update(&mut cx, |this, cx| {
let this = this.as_local_mut().unwrap();
match state {
@ -361,10 +363,10 @@ impl Worktree {
})
.detach();
let background_scanner_task = cx.background().spawn({
let background_scanner_task = cx.executor().spawn({
let fs = fs.clone();
let snapshot = snapshot.clone();
let background = cx.background().clone();
let background = cx.executor().clone();
async move {
let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
BackgroundScanner::new(
@ -394,10 +396,9 @@ impl Worktree {
fs,
visible,
})
}))
})
}
// abcdefghi
pub fn remote(
project_remote_id: u64,
replica_id: ReplicaId,
@ -426,7 +427,7 @@ impl Worktree {
let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
cx.background()
cx.executor()
.spawn({
let background_snapshot = background_snapshot.clone();
async move {
@ -442,27 +443,24 @@ impl Worktree {
})
.detach();
cx.spawn_weak(|this, mut cx| async move {
cx.spawn(|this, mut cx| async move {
while (snapshot_updated_rx.recv().await).is_some() {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
let this = this.as_remote_mut().unwrap();
this.snapshot = this.background_snapshot.lock().clone();
cx.emit(Event::UpdatedEntries(Arc::from([])));
cx.notify();
while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
if this.observed_snapshot(*scan_id) {
let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap();
let _ = tx.send(());
} else {
break;
}
this.update(&mut cx, |this, cx| {
let this = this.as_remote_mut().unwrap();
this.snapshot = this.background_snapshot.lock().clone();
cx.emit(Event::UpdatedEntries(Arc::from([])));
cx.notify();
while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
if this.observed_snapshot(*scan_id) {
let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap();
let _ = tx.send(());
} else {
break;
}
});
} else {
break;
}
}
})?;
}
anyhow::Ok(())
})
.detach();
@ -598,13 +596,13 @@ impl LocalWorktree {
let path = Arc::from(path);
cx.spawn(move |this, mut cx| async move {
let (file, contents, diff_base) = this
.update(&mut cx, |t, cx| t.as_local().unwrap().load(&path, cx))
.update(&mut cx, |t, cx| t.as_local().unwrap().load(&path, cx))?
.await?;
let text_buffer = cx
.background()
.executor()
.spawn(async move { text::Buffer::new(0, id, contents) })
.await;
Ok(cx.add_model(|_| Buffer::build(text_buffer, diff_base, Some(Arc::new(file)))))
cx.entity(|_| Buffer::build(text_buffer, diff_base, Some(Arc::new(file))))
})
}
@ -878,18 +876,18 @@ impl LocalWorktree {
let fs = self.fs.clone();
let entry = self.refresh_entry(path.clone(), None, cx);
cx.spawn(|this, cx| async move {
cx.spawn(|this, mut cx| async move {
let text = fs.load(&abs_path).await?;
let entry = entry.await?;
let mut index_task = None;
let snapshot = this.read_with(&cx, |this, _| this.as_local().unwrap().snapshot());
let snapshot = this.update(&mut cx, |this, _| this.as_local().unwrap().snapshot())?;
if let Some(repo) = snapshot.repository_for_path(&path) {
let repo_path = repo.work_directory.relativize(&snapshot, &path).unwrap();
if let Some(repo) = snapshot.git_repositories.get(&*repo.work_directory) {
let repo = repo.repo_ptr.clone();
index_task = Some(
cx.background()
cx.executor()
.spawn(async move { repo.lock().load_index_text(&repo_path) }),
);
}
@ -901,10 +899,13 @@ impl LocalWorktree {
None
};
let worktree = this
.upgrade()
.ok_or_else(|| anyhow!("worktree was dropped"))?;
Ok((
File {
entry_id: entry.id,
worktree: this,
worktree,
path: entry.path,
mtime: entry.mtime,
is_local: true,
@ -923,7 +924,6 @@ impl LocalWorktree {
has_changed_file: bool,
cx: &mut ModelContext<Worktree>,
) -> Task<Result<()>> {
let handle = cx.handle();
let buffer = buffer_handle.read(cx);
let rpc = self.client.clone();
@ -935,13 +935,14 @@ impl LocalWorktree {
let version = buffer.version();
let save = self.write_file(path, text, buffer.line_ending(), cx);
cx.as_mut().spawn(|mut cx| async move {
cx.spawn(|this, mut cx| async move {
let entry = save.await?;
let this = this.upgrade().context("worktree dropped")?;
if has_changed_file {
let new_file = Arc::new(File {
entry_id: entry.id,
worktree: handle,
worktree: this,
path: entry.path,
mtime: entry.mtime,
is_local: true,
@ -1005,7 +1006,7 @@ impl LocalWorktree {
let lowest_ancestor = self.lowest_ancestor(&path);
let abs_path = self.absolutize(&path);
let fs = self.fs.clone();
let write = cx.background().spawn(async move {
let write = cx.executor().spawn(async move {
if is_dir {
fs.create_dir(&abs_path).await
} else {
@ -1035,7 +1036,7 @@ impl LocalWorktree {
this.as_local_mut().unwrap().refresh_entry(path, None, cx),
refreshes,
)
});
})?;
for refresh in refreshes {
refresh.await.log_err();
}
@ -1055,14 +1056,14 @@ impl LocalWorktree {
let abs_path = self.absolutize(&path);
let fs = self.fs.clone();
let write = cx
.background()
.executor()
.spawn(async move { fs.save(&abs_path, &text, line_ending).await });
cx.spawn(|this, mut cx| async move {
write.await?;
this.update(&mut cx, |this, cx| {
this.as_local_mut().unwrap().refresh_entry(path, None, cx)
})
})?
.await
})
}
@ -1076,7 +1077,7 @@ impl LocalWorktree {
let abs_path = self.absolutize(&entry.path);
let fs = self.fs.clone();
let delete = cx.background().spawn(async move {
let delete = cx.executor().spawn(async move {
if entry.is_file() {
fs.remove_file(&abs_path, Default::default()).await?;
} else {
@ -1098,7 +1099,7 @@ impl LocalWorktree {
this.as_local_mut()
.unwrap()
.refresh_entries_for_paths(vec![path])
})
})?
.recv()
.await;
Ok(())
@ -1116,7 +1117,7 @@ impl LocalWorktree {
let abs_old_path = self.absolutize(&old_path);
let abs_new_path = self.absolutize(&new_path);
let fs = self.fs.clone();
let rename = cx.background().spawn(async move {
let rename = cx.executor().spawn(async move {
fs.rename(&abs_old_path, &abs_new_path, Default::default())
.await
});
@ -1127,7 +1128,7 @@ impl LocalWorktree {
this.as_local_mut()
.unwrap()
.refresh_entry(new_path.clone(), Some(old_path), cx)
})
})?
.await
}))
}
@ -1143,7 +1144,7 @@ impl LocalWorktree {
let abs_old_path = self.absolutize(&old_path);
let abs_new_path = self.absolutize(&new_path);
let fs = self.fs.clone();
let copy = cx.background().spawn(async move {
let copy = cx.executor().spawn(async move {
copy_recursive(
fs.as_ref(),
&abs_old_path,
@ -1159,7 +1160,7 @@ impl LocalWorktree {
this.as_local_mut()
.unwrap()
.refresh_entry(new_path.clone(), None, cx)
})
})?
.await
}))
}
@ -1171,7 +1172,7 @@ impl LocalWorktree {
) -> Option<Task<Result<()>>> {
let path = self.entry_for_id(entry_id)?.path.clone();
let mut refresh = self.refresh_entries_for_paths(vec![path]);
Some(cx.background().spawn(async move {
Some(cx.executor().spawn(async move {
refresh.next().await;
Ok(())
}))
@ -1204,15 +1205,13 @@ impl LocalWorktree {
vec![path.clone()]
};
let mut refresh = self.refresh_entries_for_paths(paths);
cx.spawn_weak(move |this, mut cx| async move {
cx.spawn(move |this, mut cx| async move {
refresh.recv().await;
this.upgrade(&cx)
.ok_or_else(|| anyhow!("worktree was dropped"))?
.update(&mut cx, |this, _| {
this.entry_for_path(path)
.cloned()
.ok_or_else(|| anyhow!("failed to read path after update"))
})
this.update(&mut cx, |this, _| {
this.entry_for_path(path)
.cloned()
.ok_or_else(|| anyhow!("failed to read path after update"))
})?
})
}
@ -1246,8 +1245,8 @@ impl LocalWorktree {
.unbounded_send((self.snapshot(), Arc::from([]), Arc::from([])))
.ok();
let worktree_id = cx.model_id() as u64;
let _maintain_remote_snapshot = cx.background().spawn(async move {
let worktree_id = cx.entity_id().as_u64();
let _maintain_remote_snapshot = cx.executor().spawn(async move {
let mut is_first = true;
while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await {
let update;
@ -1294,7 +1293,7 @@ impl LocalWorktree {
for (&server_id, summary) in summaries {
if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary {
project_id,
worktree_id: cx.model_id() as u64,
worktree_id: cx.entity_id().as_u64(),
summary: Some(summary.to_proto(server_id, &path)),
}) {
return Task::ready(Err(e));
@ -1305,7 +1304,7 @@ impl LocalWorktree {
let rx = self.observe_updates(project_id, cx, move |update| {
client.request(update).map(|result| result.is_ok())
});
cx.foreground()
cx.executor()
.spawn(async move { rx.await.map_err(|_| anyhow!("share ended")) })
}
@ -1339,7 +1338,7 @@ impl RemoteWorktree {
let version = buffer.version();
let rpc = self.client.clone();
let project_id = self.project_id;
cx.as_mut().spawn(|mut cx| async move {
cx.spawn(|_, mut cx| async move {
let response = rpc
.request(proto::SaveBuffer {
project_id,
@ -1356,7 +1355,7 @@ impl RemoteWorktree {
buffer_handle.update(&mut cx, |buffer, cx| {
buffer.did_save(version.clone(), fingerprint, mtime, cx);
});
})?;
Ok(())
})
@ -1436,7 +1435,7 @@ impl RemoteWorktree {
let entry = snapshot.insert_entry(entry);
worktree.snapshot = snapshot.clone();
entry
})
})?
})
}
@ -2634,7 +2633,7 @@ impl language2::File for File {
}
fn worktree_id(&self) -> usize {
self.worktree.id()
self.worktree.entity_id().as_u64() as usize
}
fn is_deleted(&self) -> bool {
@ -2647,7 +2646,7 @@ impl language2::File for File {
fn to_proto(&self) -> rpc::proto::File {
rpc::proto::File {
worktree_id: self.worktree.id() as u64,
worktree_id: self.worktree.entity_id().as_u64(),
entry_id: self.entry_id.to_proto(),
path: self.path.to_string_lossy().into(),
mtime: Some(self.mtime.into()),
@ -2670,8 +2669,7 @@ impl language2::LocalFile for File {
let worktree = self.worktree.read(cx).as_local().unwrap();
let abs_path = worktree.absolutize(&self.path);
let fs = worktree.fs.clone();
cx.background()
.spawn(async move { fs.load(&abs_path).await })
cx.executor().spawn(async move { fs.load(&abs_path).await })
}
fn buffer_reloaded(