diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index ae8cf8bf56..f405c14a18 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -473,18 +473,22 @@ impl Client { pub fn subscribe_to_entity( self: &Arc, remote_id: u64, - ) -> PendingEntitySubscription { + ) -> Result> { let id = (TypeId::of::(), remote_id); - self.state - .write() - .entities_by_type_and_remote_id - .insert(id, WeakSubscriber::Pending(Default::default())); - PendingEntitySubscription { - client: self.clone(), - remote_id, - consumed: false, - _entity_type: PhantomData, + let mut state = self.state.write(); + if state.entities_by_type_and_remote_id.contains_key(&id) { + return Err(anyhow!("already subscribed to entity")); + } else { + state + .entities_by_type_and_remote_id + .insert(id, WeakSubscriber::Pending(Default::default())); + Ok(PendingEntitySubscription { + client: self.clone(), + remote_id, + consumed: false, + _entity_type: PhantomData, + }) } } @@ -1605,14 +1609,17 @@ mod tests { let _subscription1 = client .subscribe_to_entity(1) + .unwrap() .set_model(&model1, &mut cx.to_async()); let _subscription2 = client .subscribe_to_entity(2) + .unwrap() .set_model(&model2, &mut cx.to_async()); // Ensure dropping a subscription for the same entity type still allows receiving of // messages for other entity IDs of the same type. let subscription3 = client .subscribe_to_entity(3) + .unwrap() .set_model(&model3, &mut cx.to_async()); drop(subscription3); diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 19078f31d7..72f3d05cae 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -463,7 +463,7 @@ impl Project { ) -> Result> { client.authenticate_and_connect(true, &cx).await?; - let subscription = client.subscribe_to_entity(remote_id); + let subscription = client.subscribe_to_entity(remote_id)?; let response = client .request_envelope(proto::JoinProject { project_id: remote_id, @@ -989,6 +989,11 @@ impl Project { if self.client_state.is_some() { return Err(anyhow!("project was already shared")); } + self.client_subscriptions.push( + self.client + .subscribe_to_entity(project_id)? + .set_model(&cx.handle(), &mut cx.to_async()), + ); for open_buffer in self.opened_buffers.values_mut() { match open_buffer { @@ -1025,12 +1030,6 @@ impl Project { .log_err(); } - self.client_subscriptions.push( - self.client - .subscribe_to_entity(project_id) - .set_model(&cx.handle(), &mut cx.to_async()), - ); - let (metadata_changed_tx, mut metadata_changed_rx) = mpsc::unbounded(); self.client_state = Some(ProjectClientState::Local { remote_id: project_id,