Merge branch 'main' into user-timeline

This commit is contained in:
Antonio Scandurra 2022-07-04 09:23:16 +02:00
commit d3b9eca791
50 changed files with 3076 additions and 1109 deletions

View file

@ -242,7 +242,7 @@ impl LspCommand for PerformRename {
.read_with(&cx, |project, cx| {
project
.language_server_for_buffer(buffer.read(cx), cx)
.cloned()
.map(|(adapter, server)| (adapter.clone(), server.clone()))
})
.ok_or_else(|| anyhow!("no language server found for buffer"))?;
Project::deserialize_workspace_edit(
@ -359,7 +359,7 @@ impl LspCommand for GetDefinition {
.read_with(&cx, |project, cx| {
project
.language_server_for_buffer(buffer.read(cx), cx)
.cloned()
.map(|(adapter, server)| (adapter.clone(), server.clone()))
})
.ok_or_else(|| anyhow!("no language server found for buffer"))?;
@ -388,8 +388,8 @@ impl LspCommand for GetDefinition {
.update(&mut cx, |this, cx| {
this.open_local_buffer_via_lsp(
target_uri,
lsp_adapter.clone(),
language_server.clone(),
language_server.server_id(),
lsp_adapter.name(),
cx,
)
})
@ -599,7 +599,7 @@ impl LspCommand for GetReferences {
.read_with(&cx, |project, cx| {
project
.language_server_for_buffer(buffer.read(cx), cx)
.cloned()
.map(|(adapter, server)| (adapter.clone(), server.clone()))
})
.ok_or_else(|| anyhow!("no language server found for buffer"))?;
@ -609,8 +609,8 @@ impl LspCommand for GetReferences {
.update(&mut cx, |this, cx| {
this.open_local_buffer_via_lsp(
lsp_location.uri,
lsp_adapter.clone(),
language_server.clone(),
language_server.server_id(),
lsp_adapter.name(),
cx,
)
})

File diff suppressed because it is too large Load diff

View file

@ -7,9 +7,9 @@ use super::{
};
use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
use anyhow::{anyhow, Context, Result};
use client::{proto, Client, TypedEnvelope};
use client::{proto, Client};
use clock::ReplicaId;
use collections::HashMap;
use collections::{HashMap, VecDeque};
use futures::{
channel::{
mpsc::{self, UnboundedSender},
@ -40,11 +40,11 @@ use std::{
ffi::{OsStr, OsString},
fmt,
future::Future,
mem,
ops::{Deref, DerefMut},
os::unix::prelude::{OsStrExt, OsStringExt},
path::{Path, PathBuf},
sync::{atomic::AtomicUsize, Arc},
task::Poll,
time::{Duration, SystemTime},
};
use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap};
@ -82,7 +82,7 @@ pub struct RemoteWorktree {
project_id: u64,
client: Arc<Client>,
updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
last_scan_id_rx: watch::Receiver<usize>,
snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>,
replica_id: ReplicaId,
diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
visible: bool,
@ -96,6 +96,7 @@ pub struct Snapshot {
entries_by_path: SumTree<Entry>,
entries_by_id: SumTree<PathEntry>,
scan_id: usize,
is_complete: bool,
}
#[derive(Clone)]
@ -125,13 +126,16 @@ impl DerefMut for LocalSnapshot {
#[derive(Clone, Debug)]
enum ScanState {
Idle,
Scanning,
/// The worktree is performing its initial scan of the filesystem.
Initializing,
/// The worktree is updating in response to filesystem events.
Updating,
Err(Arc<anyhow::Error>),
}
struct ShareState {
project_id: u64,
snapshots_tx: Sender<LocalSnapshot>,
snapshots_tx: watch::Sender<LocalSnapshot>,
_maintain_remote_snapshot: Option<Task<Option<()>>>,
}
@ -172,10 +176,10 @@ impl Worktree {
pub fn remote(
project_remote_id: u64,
replica_id: ReplicaId,
worktree: proto::Worktree,
worktree: proto::WorktreeMetadata,
client: Arc<Client>,
cx: &mut MutableAppContext,
) -> (ModelHandle<Self>, Task<()>) {
) -> ModelHandle<Self> {
let remote_id = worktree.id;
let root_char_bag: CharBag = worktree
.root_name
@ -190,13 +194,13 @@ impl Worktree {
root_char_bag,
entries_by_path: Default::default(),
entries_by_id: Default::default(),
scan_id: worktree.scan_id as usize,
scan_id: 0,
is_complete: false,
};
let (updates_tx, mut updates_rx) = mpsc::unbounded();
let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize);
let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
Worktree::Remote(RemoteWorktree {
project_id: project_remote_id,
@ -204,96 +208,50 @@ impl Worktree {
snapshot: snapshot.clone(),
background_snapshot: background_snapshot.clone(),
updates_tx: Some(updates_tx),
last_scan_id_rx,
snapshot_subscriptions: Default::default(),
client: client.clone(),
diagnostic_summaries: TreeMap::from_ordered_entries(
worktree.diagnostic_summaries.into_iter().map(|summary| {
(
PathKey(PathBuf::from(summary.path).into()),
DiagnosticSummary {
language_server_id: summary.language_server_id as usize,
error_count: summary.error_count as usize,
warning_count: summary.warning_count as usize,
},
)
}),
),
diagnostic_summaries: Default::default(),
visible,
})
});
let deserialize_task = cx.spawn({
let worktree_handle = worktree_handle.clone();
|cx| async move {
let (entries_by_path, entries_by_id) = cx
.background()
.spawn(async move {
let mut entries_by_path_edits = Vec::new();
let mut entries_by_id_edits = Vec::new();
for entry in worktree.entries {
match Entry::try_from((&root_char_bag, entry)) {
Ok(entry) => {
entries_by_id_edits.push(Edit::Insert(PathEntry {
id: entry.id,
path: entry.path.clone(),
is_ignored: entry.is_ignored,
scan_id: 0,
}));
entries_by_path_edits.push(Edit::Insert(entry));
}
Err(err) => log::warn!("error for remote worktree entry {:?}", err),
}
}
let mut entries_by_path = SumTree::new();
let mut entries_by_id = SumTree::new();
entries_by_path.edit(entries_by_path_edits, &());
entries_by_id.edit(entries_by_id_edits, &());
(entries_by_path, entries_by_id)
})
.await;
{
let mut snapshot = background_snapshot.lock();
snapshot.entries_by_path = entries_by_path;
snapshot.entries_by_id = entries_by_id;
cx.background()
.spawn(async move {
while let Some(update) = updates_rx.next().await {
if let Err(error) = background_snapshot.lock().apply_remote_update(update) {
log::error!("error applying worktree update: {}", error);
}
snapshot_updated_tx.send(()).await.ok();
}
})
.detach();
cx.background()
.spawn(async move {
while let Some(update) = updates_rx.next().await {
if let Err(error) =
background_snapshot.lock().apply_remote_update(update)
{
log::error!("error applying worktree update: {}", error);
cx.spawn(|mut cx| {
let this = worktree_handle.downgrade();
async move {
while let Some(_) = snapshot_updated_rx.recv().await {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
this.poll_snapshot(cx);
let this = this.as_remote_mut().unwrap();
while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
if this.observed_snapshot(*scan_id) {
let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap();
let _ = tx.send(());
} else {
break;
}
}
snapshot_updated_tx.send(()).await.ok();
}
})
.detach();
cx.spawn(|mut cx| {
let this = worktree_handle.downgrade();
async move {
while let Some(_) = snapshot_updated_rx.recv().await {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
this.poll_snapshot(cx);
let this = this.as_remote_mut().unwrap();
*last_scan_id_tx.borrow_mut() = this.snapshot.scan_id;
});
} else {
break;
}
}
});
} else {
break;
}
})
.detach();
}
}
});
(worktree_handle, deserialize_task)
})
.detach();
worktree_handle
}
pub fn as_local(&self) -> Option<&LocalWorktree> {
@ -377,38 +335,9 @@ impl Worktree {
fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
match self {
Self::Local(worktree) => {
let is_fake_fs = worktree.fs.is_fake();
worktree.snapshot = worktree.background_snapshot.lock().clone();
if worktree.is_scanning() {
if worktree.poll_task.is_none() {
worktree.poll_task = Some(cx.spawn_weak(|this, mut cx| async move {
if is_fake_fs {
#[cfg(any(test, feature = "test-support"))]
cx.background().simulate_random_delay().await;
} else {
smol::Timer::after(Duration::from_millis(100)).await;
}
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
this.as_local_mut().unwrap().poll_task = None;
this.poll_snapshot(cx);
});
}
}));
}
} else {
worktree.poll_task.take();
cx.emit(Event::UpdatedEntries);
}
}
Self::Remote(worktree) => {
worktree.snapshot = worktree.background_snapshot.lock().clone();
cx.emit(Event::UpdatedEntries);
}
Self::Local(worktree) => worktree.poll_snapshot(false, cx),
Self::Remote(worktree) => worktree.poll_snapshot(cx),
};
cx.notify();
}
}
@ -436,7 +365,8 @@ impl LocalWorktree {
.context("failed to stat worktree path")?;
let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
let (mut last_scan_state_tx, last_scan_state_rx) =
watch::channel_with(ScanState::Initializing);
let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
let mut snapshot = LocalSnapshot {
abs_path,
@ -450,6 +380,7 @@ impl LocalWorktree {
entries_by_path: Default::default(),
entries_by_id: Default::default(),
scan_id: 0,
is_complete: true,
},
extension_counts: Default::default(),
};
@ -481,11 +412,7 @@ impl LocalWorktree {
while let Some(scan_state) = scan_states_rx.next().await {
if let Some(this) = this.upgrade(&cx) {
last_scan_state_tx.blocking_send(scan_state).ok();
this.update(&mut cx, |this, cx| {
this.poll_snapshot(cx);
this.as_local().unwrap().broadcast_snapshot()
})
.await;
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
} else {
break;
}
@ -569,22 +496,53 @@ impl LocalWorktree {
Ok(updated)
}
fn poll_snapshot(&mut self, force: bool, cx: &mut ModelContext<Worktree>) {
self.poll_task.take();
match self.scan_state() {
ScanState::Idle => {
self.snapshot = self.background_snapshot.lock().clone();
if let Some(share) = self.share.as_mut() {
*share.snapshots_tx.borrow_mut() = self.snapshot.clone();
}
cx.emit(Event::UpdatedEntries);
}
ScanState::Initializing => {
let is_fake_fs = self.fs.is_fake();
self.snapshot = self.background_snapshot.lock().clone();
self.poll_task = Some(cx.spawn_weak(|this, mut cx| async move {
if is_fake_fs {
#[cfg(any(test, feature = "test-support"))]
cx.background().simulate_random_delay().await;
} else {
smol::Timer::after(Duration::from_millis(100)).await;
}
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
}
}));
cx.emit(Event::UpdatedEntries);
}
_ => {
if force {
self.snapshot = self.background_snapshot.lock().clone();
}
}
}
cx.notify();
}
pub fn scan_complete(&self) -> impl Future<Output = ()> {
let mut scan_state_rx = self.last_scan_state_rx.clone();
async move {
let mut scan_state = Some(scan_state_rx.borrow().clone());
while let Some(ScanState::Scanning) = scan_state {
while let Some(ScanState::Initializing | ScanState::Updating) = scan_state {
scan_state = scan_state_rx.recv().await;
}
}
}
fn is_scanning(&self) -> bool {
if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
true
} else {
false
}
fn scan_state(&self) -> ScanState {
self.last_scan_state_rx.borrow().clone()
}
pub fn snapshot(&self) -> LocalSnapshot {
@ -614,7 +572,6 @@ impl LocalWorktree {
.refresh_entry(path, abs_path, None, cx)
})
.await?;
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
Ok((
File {
entry_id: Some(entry.id),
@ -712,16 +669,14 @@ impl LocalWorktree {
Some(cx.spawn(|this, mut cx| async move {
delete.await?;
this.update(&mut cx, |this, _| {
let this = this.as_local_mut().unwrap();
let mut snapshot = this.background_snapshot.lock();
snapshot.delete_entry(entry_id);
});
this.update(&mut cx, |this, cx| {
this.poll_snapshot(cx);
this.as_local().unwrap().broadcast_snapshot()
})
.await;
let this = this.as_local_mut().unwrap();
{
let mut snapshot = this.background_snapshot.lock();
snapshot.delete_entry(entry_id);
}
this.poll_snapshot(true, cx);
});
Ok(())
}))
}
@ -757,11 +712,6 @@ impl LocalWorktree {
)
})
.await?;
this.update(&mut cx, |this, cx| {
this.poll_snapshot(cx);
this.as_local().unwrap().broadcast_snapshot()
})
.await;
Ok(entry)
}))
}
@ -797,11 +747,6 @@ impl LocalWorktree {
)
})
.await?;
this.update(&mut cx, |this, cx| {
this.poll_snapshot(cx);
this.as_local().unwrap().broadcast_snapshot()
})
.await;
Ok(entry)
}))
}
@ -835,11 +780,6 @@ impl LocalWorktree {
.refresh_entry(path, abs_path, None, cx)
})
.await?;
this.update(&mut cx, |this, cx| {
this.poll_snapshot(cx);
this.as_local().unwrap().broadcast_snapshot()
})
.await;
Ok(entry)
})
}
@ -872,61 +812,55 @@ impl LocalWorktree {
let this = this
.upgrade(&cx)
.ok_or_else(|| anyhow!("worktree was dropped"))?;
let (entry, snapshot, snapshots_tx) = this.read_with(&cx, |this, _| {
let this = this.as_local().unwrap();
let mut snapshot = this.background_snapshot.lock();
entry.is_ignored = snapshot
.ignore_stack_for_path(&path, entry.is_dir())
.is_path_ignored(&path, entry.is_dir());
if let Some(old_path) = old_path {
snapshot.remove_path(&old_path);
this.update(&mut cx, |this, cx| {
let this = this.as_local_mut().unwrap();
let inserted_entry;
{
let mut snapshot = this.background_snapshot.lock();
entry.is_ignored = snapshot
.ignore_stack_for_path(&path, entry.is_dir())
.is_path_ignored(&path, entry.is_dir());
if let Some(old_path) = old_path {
snapshot.remove_path(&old_path);
}
inserted_entry = snapshot.insert_entry(entry, fs.as_ref());
snapshot.scan_id += 1;
}
let entry = snapshot.insert_entry(entry, fs.as_ref());
snapshot.scan_id += 1;
let snapshots_tx = this.share.as_ref().map(|s| s.snapshots_tx.clone());
(entry, snapshot.clone(), snapshots_tx)
});
this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
if let Some(snapshots_tx) = snapshots_tx {
snapshots_tx.send(snapshot).await.ok();
}
Ok(entry)
this.poll_snapshot(true, cx);
Ok(inserted_entry)
})
})
}
pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
let (share_tx, share_rx) = oneshot::channel();
let (snapshots_to_send_tx, snapshots_to_send_rx) =
smol::channel::unbounded::<LocalSnapshot>();
if self.share.is_some() {
let _ = share_tx.send(Ok(()));
} else {
let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
let rpc = self.client.clone();
let worktree_id = cx.model_id() as u64;
let maintain_remote_snapshot = cx.background().spawn({
let rpc = rpc.clone();
let diagnostic_summaries = self.diagnostic_summaries.clone();
async move {
let mut prev_snapshot = match snapshots_to_send_rx.recv().await {
Ok(snapshot) => {
if let Err(error) = rpc
.request(proto::UpdateWorktree {
project_id,
worktree_id,
root_name: snapshot.root_name().to_string(),
updated_entries: snapshot
.entries_by_path
.iter()
.filter(|e| !e.is_ignored)
.map(Into::into)
.collect(),
removed_entries: Default::default(),
scan_id: snapshot.scan_id as u64,
})
.await
{
let mut prev_snapshot = match snapshots_rx.recv().await {
Some(snapshot) => {
let update = proto::UpdateWorktree {
project_id,
worktree_id,
root_name: snapshot.root_name().to_string(),
updated_entries: snapshot
.entries_by_path
.iter()
.map(Into::into)
.collect(),
removed_entries: Default::default(),
scan_id: snapshot.scan_id as u64,
is_last_update: true,
};
if let Err(error) = send_worktree_update(&rpc, update).await {
let _ = share_tx.send(Err(error));
return Err(anyhow!("failed to send initial update worktree"));
} else {
@ -934,8 +868,10 @@ impl LocalWorktree {
snapshot
}
}
Err(error) => {
let _ = share_tx.send(Err(error.into()));
None => {
share_tx
.send(Err(anyhow!("worktree dropped before share completed")))
.ok();
return Err(anyhow!("failed to send initial update worktree"));
}
};
@ -948,44 +884,12 @@ impl LocalWorktree {
})?;
}
// Stream ignored entries in chunks.
{
let mut ignored_entries = prev_snapshot
.entries_by_path
.iter()
.filter(|e| e.is_ignored);
let mut ignored_entries_to_send = Vec::new();
loop {
#[cfg(any(test, feature = "test-support"))]
const CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const CHUNK_SIZE: usize = 256;
let entry = ignored_entries.next();
if ignored_entries_to_send.len() >= CHUNK_SIZE || entry.is_none() {
rpc.request(proto::UpdateWorktree {
project_id,
worktree_id,
root_name: prev_snapshot.root_name().to_string(),
updated_entries: mem::take(&mut ignored_entries_to_send),
removed_entries: Default::default(),
scan_id: prev_snapshot.scan_id as u64,
})
.await?;
}
if let Some(entry) = entry {
ignored_entries_to_send.push(entry.into());
} else {
break;
}
}
}
while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
let message =
snapshot.build_update(&prev_snapshot, project_id, worktree_id, true);
rpc.request(message).await?;
while let Some(snapshot) = snapshots_rx.recv().await {
send_worktree_update(
&rpc,
snapshot.build_update(&prev_snapshot, project_id, worktree_id, true),
)
.await?;
prev_snapshot = snapshot;
}
@ -995,18 +899,12 @@ impl LocalWorktree {
});
self.share = Some(ShareState {
project_id,
snapshots_tx: snapshots_to_send_tx.clone(),
snapshots_tx,
_maintain_remote_snapshot: Some(maintain_remote_snapshot),
});
}
cx.spawn_weak(|this, cx| async move {
if let Some(this) = this.upgrade(&cx) {
this.read_with(&cx, |this, _| {
let this = this.as_local().unwrap();
let _ = snapshots_to_send_tx.try_send(this.snapshot());
});
}
cx.foreground().spawn(async move {
share_rx
.await
.unwrap_or_else(|_| Err(anyhow!("share ended")))
@ -1021,23 +919,6 @@ impl LocalWorktree {
self.share.is_some()
}
fn broadcast_snapshot(&self) -> impl Future<Output = ()> {
let mut to_send = None;
if !self.is_scanning() {
if let Some(share) = self.share.as_ref() {
to_send = Some((self.snapshot(), share.snapshots_tx.clone()));
}
}
async move {
if let Some((snapshot, snapshots_to_send_tx)) = to_send {
if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
log::error!("error submitting snapshot to send {}", err);
}
}
}
}
pub fn send_extension_counts(&self, project_id: u64) {
let mut extensions = Vec::new();
let mut counts = Vec::new();
@ -1063,31 +944,45 @@ impl RemoteWorktree {
self.snapshot.clone()
}
fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
self.snapshot = self.background_snapshot.lock().clone();
cx.emit(Event::UpdatedEntries);
cx.notify();
}
pub fn disconnected_from_host(&mut self) {
self.updates_tx.take();
self.snapshot_subscriptions.clear();
}
pub fn update_from_remote(
&mut self,
envelope: TypedEnvelope<proto::UpdateWorktree>,
) -> Result<()> {
pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) {
if let Some(updates_tx) = &self.updates_tx {
updates_tx
.unbounded_send(envelope.payload)
.unbounded_send(update)
.expect("consumer runs to completion");
}
Ok(())
}
fn wait_for_snapshot(&self, scan_id: usize) -> impl Future<Output = ()> {
let mut rx = self.last_scan_id_rx.clone();
async move {
while let Some(applied_scan_id) = rx.next().await {
if applied_scan_id >= scan_id {
return;
}
fn observed_snapshot(&self, scan_id: usize) -> bool {
self.scan_id > scan_id || (self.scan_id == scan_id && self.is_complete)
}
fn wait_for_snapshot(&mut self, scan_id: usize) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
if self.observed_snapshot(scan_id) {
let _ = tx.send(());
} else {
match self
.snapshot_subscriptions
.binary_search_by_key(&scan_id, |probe| probe.0)
{
Ok(ix) | Err(ix) => self.snapshot_subscriptions.insert(ix, (scan_id, tx)),
}
}
async move {
let _ = rx.await;
}
}
pub fn update_diagnostic_summary(
@ -1109,7 +1004,7 @@ impl RemoteWorktree {
}
pub fn insert_entry(
&self,
&mut self,
entry: proto::Entry,
scan_id: usize,
cx: &mut ModelContext<Worktree>,
@ -1128,7 +1023,7 @@ impl RemoteWorktree {
}
pub(crate) fn delete_entry(
&self,
&mut self,
id: ProjectEntryId,
scan_id: usize,
cx: &mut ModelContext<Worktree>,
@ -1204,7 +1099,7 @@ impl Snapshot {
for entry_id in update.removed_entries {
let entry = self
.entry_for_id(ProjectEntryId::from_proto(entry_id))
.ok_or_else(|| anyhow!("unknown entry"))?;
.ok_or_else(|| anyhow!("unknown entry {}", entry_id))?;
entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
entries_by_id_edits.push(Edit::Remove(entry.id));
}
@ -1226,6 +1121,7 @@ impl Snapshot {
self.entries_by_path.edit(entries_by_path_edits, &());
self.entries_by_id.edit(entries_by_id_edits, &());
self.scan_id = update.scan_id as usize;
self.is_complete = update.is_last_update;
Ok(())
}
@ -1351,27 +1247,16 @@ impl LocalSnapshot {
}
#[cfg(test)]
pub(crate) fn to_proto(
&self,
diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
visible: bool,
) -> proto::Worktree {
pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree {
let root_name = self.root_name.clone();
proto::Worktree {
id: self.id.0 as u64,
proto::UpdateWorktree {
project_id,
worktree_id: self.id().to_proto(),
root_name,
entries: self
.entries_by_path
.iter()
.filter(|e| !e.is_ignored)
.map(Into::into)
.collect(),
diagnostic_summaries: diagnostic_summaries
.iter()
.map(|(path, summary)| summary.to_proto(&path.0))
.collect(),
visible,
updated_entries: self.entries_by_path.iter().map(Into::into).collect(),
removed_entries: Default::default(),
scan_id: self.scan_id as u64,
is_last_update: true,
}
}
@ -1438,6 +1323,7 @@ impl LocalSnapshot {
updated_entries,
removed_entries,
scan_id: self.scan_id as u64,
is_last_update: true,
}
}
@ -2109,7 +1995,7 @@ impl BackgroundScanner {
}
async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
if self.notify.unbounded_send(ScanState::Scanning).is_err() {
if self.notify.unbounded_send(ScanState::Initializing).is_err() {
return;
}
@ -2128,8 +2014,13 @@ impl BackgroundScanner {
}
futures::pin_mut!(events_rx);
while let Some(events) = events_rx.next().await {
if self.notify.unbounded_send(ScanState::Scanning).is_err() {
while let Some(mut events) = events_rx.next().await {
while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
events.extend(additional_events);
}
if self.notify.unbounded_send(ScanState::Updating).is_err() {
break;
}
@ -2781,6 +2672,19 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
}
}
async fn send_worktree_update(client: &Arc<Client>, update: proto::UpdateWorktree) -> Result<()> {
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
client.request(update).await?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@ -2990,6 +2894,7 @@ mod tests {
root_name: Default::default(),
root_char_bag: Default::default(),
scan_id: 0,
is_complete: true,
},
extension_counts: Default::default(),
};