Wait for acknowledgment before sending the next project update

This commit is contained in:
Antonio Scandurra 2022-11-14 15:32:49 +01:00
parent 65c5adff05
commit 40073f6100
11 changed files with 94 additions and 208 deletions

View file

@ -70,10 +70,6 @@ pub trait Item: Entity {
fn entry_id(&self, cx: &AppContext) -> Option<ProjectEntryId>;
}
pub struct ProjectStore {
projects: Vec<WeakModelHandle<Project>>,
}
// Language server state is stored across 3 collections:
// language_servers =>
// a mapping from unique server id to LanguageServerState which can either be a task for a
@ -102,7 +98,6 @@ pub struct Project {
next_entry_id: Arc<AtomicUsize>,
next_diagnostic_group_id: usize,
user_store: ModelHandle<UserStore>,
project_store: ModelHandle<ProjectStore>,
fs: Arc<dyn Fs>,
client_state: Option<ProjectClientState>,
collaborators: HashMap<PeerId, Collaborator>,
@ -152,6 +147,8 @@ enum WorktreeHandle {
enum ProjectClientState {
Local {
remote_id: u64,
metadata_changed: watch::Sender<()>,
_maintain_metadata: Task<()>,
_detect_unshare: Task<Option<()>>,
},
Remote {
@ -376,7 +373,7 @@ impl Project {
client.add_model_message_handler(Self::handle_start_language_server);
client.add_model_message_handler(Self::handle_update_language_server);
client.add_model_message_handler(Self::handle_remove_collaborator);
client.add_model_message_handler(Self::handle_update_project);
client.add_model_message_handler(Self::handle_project_updated);
client.add_model_message_handler(Self::handle_unshare_project);
client.add_model_message_handler(Self::handle_create_buffer_for_peer);
client.add_model_message_handler(Self::handle_update_buffer_file);
@ -412,46 +409,39 @@ impl Project {
pub fn local(
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
project_store: ModelHandle<ProjectStore>,
languages: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
cx: &mut MutableAppContext,
) -> ModelHandle<Self> {
cx.add_model(|cx: &mut ModelContext<Self>| {
let handle = cx.weak_handle();
project_store.update(cx, |store, cx| store.add_project(handle, cx));
Self {
worktrees: Default::default(),
collaborators: Default::default(),
opened_buffers: Default::default(),
shared_buffers: Default::default(),
incomplete_buffers: Default::default(),
loading_buffers: Default::default(),
loading_local_worktrees: Default::default(),
buffer_snapshots: Default::default(),
client_state: None,
opened_buffer: watch::channel(),
client_subscriptions: Vec::new(),
_subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
_maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
active_entry: None,
languages,
client,
user_store,
project_store,
fs,
next_entry_id: Default::default(),
next_diagnostic_group_id: Default::default(),
language_servers: Default::default(),
language_server_ids: Default::default(),
language_server_statuses: Default::default(),
last_workspace_edits_by_language_server: Default::default(),
language_server_settings: Default::default(),
buffers_being_formatted: Default::default(),
next_language_server_id: 0,
nonce: StdRng::from_entropy().gen(),
}
cx.add_model(|cx: &mut ModelContext<Self>| Self {
worktrees: Default::default(),
collaborators: Default::default(),
opened_buffers: Default::default(),
shared_buffers: Default::default(),
incomplete_buffers: Default::default(),
loading_buffers: Default::default(),
loading_local_worktrees: Default::default(),
buffer_snapshots: Default::default(),
client_state: None,
opened_buffer: watch::channel(),
client_subscriptions: Vec::new(),
_subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
_maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
active_entry: None,
languages,
client,
user_store,
fs,
next_entry_id: Default::default(),
next_diagnostic_group_id: Default::default(),
language_servers: Default::default(),
language_server_ids: Default::default(),
language_server_statuses: Default::default(),
last_workspace_edits_by_language_server: Default::default(),
language_server_settings: Default::default(),
buffers_being_formatted: Default::default(),
next_language_server_id: 0,
nonce: StdRng::from_entropy().gen(),
})
}
@ -459,7 +449,6 @@ impl Project {
remote_id: u64,
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
project_store: ModelHandle<ProjectStore>,
languages: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
mut cx: AsyncAppContext,
@ -482,9 +471,6 @@ impl Project {
}
let this = cx.add_model(|cx: &mut ModelContext<Self>| {
let handle = cx.weak_handle();
project_store.update(cx, |store, cx| store.add_project(handle, cx));
let mut this = Self {
worktrees: Vec::new(),
loading_buffers: Default::default(),
@ -497,7 +483,6 @@ impl Project {
_maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx),
languages,
user_store: user_store.clone(),
project_store,
fs,
next_entry_id: Default::default(),
next_diagnostic_group_id: Default::default(),
@ -593,9 +578,7 @@ impl Project {
let http_client = client::test::FakeHttpClient::with_404_response();
let client = cx.update(|cx| client::Client::new(http_client.clone(), cx));
let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
let project_store = cx.add_model(|_| ProjectStore::new());
let project =
cx.update(|cx| Project::local(client, user_store, project_store, languages, fs, cx));
let project = cx.update(|cx| Project::local(client, user_store, languages, fs, cx));
for path in root_paths {
let (tree, _) = project
.update(cx, |project, cx| {
@ -676,10 +659,6 @@ impl Project {
self.user_store.clone()
}
pub fn project_store(&self) -> ModelHandle<ProjectStore> {
self.project_store.clone()
}
#[cfg(any(test, feature = "test-support"))]
pub fn check_invariants(&self, cx: &AppContext) {
if self.is_local() {
@ -752,51 +731,12 @@ impl Project {
}
fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
if let Some(ProjectClientState::Local { remote_id, .. }) = &self.client_state {
let project_id = *remote_id;
// Broadcast worktrees only if the project is online.
let worktrees = self
.worktrees
.iter()
.filter_map(|worktree| {
worktree
.upgrade(cx)
.map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto())
})
.collect();
self.client
.send(proto::UpdateProject {
project_id,
worktrees,
})
.log_err();
let worktrees = self.visible_worktrees(cx).collect::<Vec<_>>();
let scans_complete = futures::future::join_all(
worktrees
.iter()
.filter_map(|worktree| Some(worktree.read(cx).as_local()?.scan_complete())),
);
let worktrees = worktrees.into_iter().map(|handle| handle.downgrade());
cx.spawn_weak(move |_, cx| async move {
scans_complete.await;
cx.read(|cx| {
for worktree in worktrees {
if let Some(worktree) = worktree
.upgrade(cx)
.and_then(|worktree| worktree.read(cx).as_local())
{
worktree.send_extension_counts(project_id);
}
}
})
})
.detach();
if let Some(ProjectClientState::Local {
metadata_changed, ..
}) = &mut self.client_state
{
*metadata_changed.borrow_mut() = ();
}
self.project_store.update(cx, |_, cx| cx.notify());
cx.notify();
}
@ -1092,8 +1032,32 @@ impl Project {
cx.notify();
let mut status = self.client.status();
let (metadata_changed_tx, mut metadata_changed_rx) = watch::channel();
self.client_state = Some(ProjectClientState::Local {
remote_id: project_id,
metadata_changed: metadata_changed_tx,
_maintain_metadata: cx.spawn_weak(move |this, cx| async move {
while let Some(()) = metadata_changed_rx.next().await {
let Some(this) = this.upgrade(&cx) else { break };
this.read_with(&cx, |this, cx| {
let worktrees = this
.worktrees
.iter()
.filter_map(|worktree| {
worktree.upgrade(cx).map(|worktree| {
worktree.read(cx).as_local().unwrap().metadata_proto()
})
})
.collect();
this.client.request(proto::UpdateProject {
project_id,
worktrees,
})
})
.await
.log_err();
}
}),
_detect_unshare: cx.spawn_weak(move |this, mut cx| {
async move {
let is_connected = status.next().await.map_or(false, |s| s.is_connected());
@ -1632,10 +1596,6 @@ impl Project {
operations: vec![language::proto::serialize_operation(operation)],
});
cx.background().spawn(request).detach_and_log_err(cx);
} else if let Some(project_id) = self.remote_id() {
let _ = self
.client
.send(proto::RegisterProjectActivity { project_id });
}
}
BufferEvent::Edited { .. } => {
@ -4573,9 +4533,9 @@ impl Project {
})
}
async fn handle_update_project(
async fn handle_project_updated(
this: ModelHandle<Self>,
envelope: TypedEnvelope<proto::UpdateProject>,
envelope: TypedEnvelope<proto::ProjectUpdated>,
client: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
@ -5832,48 +5792,6 @@ impl Project {
}
}
impl ProjectStore {
pub fn new() -> Self {
Self {
projects: Default::default(),
}
}
pub fn projects<'a>(
&'a self,
cx: &'a AppContext,
) -> impl 'a + Iterator<Item = ModelHandle<Project>> {
self.projects
.iter()
.filter_map(|project| project.upgrade(cx))
}
fn add_project(&mut self, project: WeakModelHandle<Project>, cx: &mut ModelContext<Self>) {
if let Err(ix) = self
.projects
.binary_search_by_key(&project.id(), WeakModelHandle::id)
{
self.projects.insert(ix, project);
}
cx.notify();
}
fn prune_projects(&mut self, cx: &mut ModelContext<Self>) {
let mut did_change = false;
self.projects.retain(|project| {
if project.is_upgradable(cx) {
true
} else {
did_change = true;
false
}
});
if did_change {
cx.notify();
}
}
}
impl WorktreeHandle {
pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
match self {
@ -5952,16 +5870,10 @@ impl<'a> Iterator for PathMatchCandidateSetIter<'a> {
}
}
impl Entity for ProjectStore {
type Event = ();
}
impl Entity for Project {
type Event = Event;
fn release(&mut self, cx: &mut gpui::MutableAppContext) {
self.project_store.update(cx, ProjectStore::prune_projects);
fn release(&mut self, _: &mut gpui::MutableAppContext) {
match &self.client_state {
Some(ProjectClientState::Local { remote_id, .. }) => {
self.client