From 1ccf174388151c2997fdec620fae7d10c08c4b12 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 4 Apr 2023 18:34:39 -0700 Subject: [PATCH] Avoid applying outdated UpdateProject messages Co-authored-by: Nathan Sobo --- crates/client/src/client.rs | 15 +++++++++++++-- crates/project/src/project.rs | 18 +++++++++++++----- crates/rpc/src/peer.rs | 25 +++++++++++++++++++++---- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 76004f14a4..ae8cf8bf56 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -10,7 +10,10 @@ use async_tungstenite::tungstenite::{ error::Error as WebsocketError, http::{Request, StatusCode}, }; -use futures::{future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryStreamExt}; +use futures::{ + future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _, + TryStreamExt, +}; use gpui::{ actions, serde_json::{self, Value}, @@ -1187,6 +1190,14 @@ impl Client { &self, request: T, ) -> impl Future> { + self.request_envelope(request) + .map_ok(|envelope| envelope.payload) + } + + pub fn request_envelope( + &self, + request: T, + ) -> impl Future>> { let client_id = self.id; log::debug!( "rpc request start. client_id:{}. name:{}", @@ -1195,7 +1206,7 @@ impl Client { ); let response = self .connection_id() - .map(|conn_id| self.peer.request(conn_id, request)); + .map(|conn_id| self.peer.request_envelope(conn_id, request)); async move { let response = response?.await; log::debug!( diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 2755f281f3..1e9721339f 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -100,6 +100,7 @@ pub struct Project { next_language_server_id: usize, client: Arc, next_entry_id: Arc, + join_project_response_message_id: u32, next_diagnostic_group_id: usize, user_store: ModelHandle, fs: Arc, @@ -425,6 +426,7 @@ impl Project { loading_buffers_by_path: Default::default(), loading_local_worktrees: Default::default(), buffer_snapshots: Default::default(), + join_project_response_message_id: 0, client_state: None, opened_buffer: watch::channel(), client_subscriptions: Vec::new(), @@ -463,15 +465,15 @@ impl Project { let subscription = client.subscribe_to_entity(remote_id); let response = client - .request(proto::JoinProject { + .request_envelope(proto::JoinProject { project_id: remote_id, }) .await?; let this = cx.add_model(|cx| { - let replica_id = response.replica_id as ReplicaId; + let replica_id = response.payload.replica_id as ReplicaId; let mut worktrees = Vec::new(); - for worktree in response.worktrees { + for worktree in response.payload.worktrees { let worktree = cx.update(|cx| { Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx) }); @@ -487,6 +489,7 @@ impl Project { loading_local_worktrees: Default::default(), active_entry: None, collaborators: Default::default(), + join_project_response_message_id: response.message_id, _maintain_buffer_languages: Self::maintain_buffer_languages(&languages, cx), _maintain_workspace_config: Self::maintain_workspace_config(languages.clone(), cx), languages, @@ -505,6 +508,7 @@ impl Project { language_servers: Default::default(), language_server_ids: Default::default(), language_server_statuses: response + .payload .language_servers .into_iter() .map(|server| { @@ -537,6 +541,7 @@ impl Project { let subscription = subscription.set_model(&this, &mut cx); let user_ids = response + .payload .collaborators .iter() .map(|peer| peer.user_id) @@ -546,7 +551,7 @@ impl Project { .await?; this.update(&mut cx, |this, cx| { - this.set_collaborators_from_proto(response.collaborators, cx)?; + this.set_collaborators_from_proto(response.payload.collaborators, cx)?; this.client_subscriptions.push(subscription); anyhow::Ok(()) })?; @@ -4930,7 +4935,10 @@ impl Project { mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { - this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?; + // Don't handle messages that were sent before the response to us joining the project + if envelope.message_id > this.join_project_response_message_id { + this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?; + } Ok(()) }) } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 0df87fd92d..72ddfa567b 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -7,7 +7,7 @@ use collections::HashMap; use futures::{ channel::{mpsc, oneshot}, stream::BoxStream, - FutureExt, SinkExt, StreamExt, + FutureExt, SinkExt, StreamExt, TryFutureExt, }; use parking_lot::{Mutex, RwLock}; use serde::{ser::SerializeStruct, Serialize}; @@ -71,6 +71,7 @@ impl Clone for Receipt { impl Copy for Receipt {} +#[derive(Clone, Debug)] pub struct TypedEnvelope { pub sender_id: ConnectionId, pub original_sender_id: Option, @@ -370,6 +371,15 @@ impl Peer { receiver_id: ConnectionId, request: T, ) -> impl Future> { + self.request_internal(None, receiver_id, request) + .map_ok(|envelope| envelope.payload) + } + + pub fn request_envelope( + &self, + receiver_id: ConnectionId, + request: T, + ) -> impl Future>> { self.request_internal(None, receiver_id, request) } @@ -380,6 +390,7 @@ impl Peer { request: T, ) -> impl Future> { self.request_internal(Some(sender_id), receiver_id, request) + .map_ok(|envelope| envelope.payload) } pub fn request_internal( @@ -387,7 +398,7 @@ impl Peer { original_sender_id: Option, receiver_id: ConnectionId, request: T, - ) -> impl Future> { + ) -> impl Future>> { let (tx, rx) = oneshot::channel(); let send = self.connection_state(receiver_id).and_then(|connection| { let message_id = connection.next_message_id.fetch_add(1, SeqCst); @@ -410,6 +421,7 @@ impl Peer { async move { send?; let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?; + if let Some(proto::envelope::Payload::Error(error)) = &response.payload { Err(anyhow!( "RPC request {} failed - {}", @@ -417,8 +429,13 @@ impl Peer { error.message )) } else { - T::Response::from_envelope(response) - .ok_or_else(|| anyhow!("received response of the wrong type")) + Ok(TypedEnvelope { + message_id: response.id, + sender_id: receiver_id, + original_sender_id: response.original_sender_id, + payload: T::Response::from_envelope(response) + .ok_or_else(|| anyhow!("received response of the wrong type"))?, + }) } } }