diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 09286300d9..a8387f7c5a 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1621,6 +1621,10 @@ impl ProtoClient for Client { fn message_handler_set(&self) -> &parking_lot::Mutex { &self.handler_set } + + fn goes_via_collab(&self) -> bool { + true + } } #[derive(Serialize, Deserialize)] diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index 58d9ba8926..5c32c9030d 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -534,6 +534,9 @@ impl LspStore { } WorktreeStoreEvent::WorktreeRemoved(_, id) => self.remove_worktree(*id, cx), WorktreeStoreEvent::WorktreeOrderChanged => {} + WorktreeStoreEvent::WorktreeUpdateSent(worktree) => { + worktree.update(cx, |worktree, _cx| self.send_diagnostic_summaries(worktree)); + } } } @@ -764,24 +767,22 @@ impl LspStore { self.active_entry = active_entry; } - pub(crate) fn send_diagnostic_summaries( - &self, - worktree: &mut Worktree, - ) -> Result<(), anyhow::Error> { + pub(crate) fn send_diagnostic_summaries(&self, worktree: &mut Worktree) { if let Some(client) = self.downstream_client.clone() { if let Some(summaries) = self.diagnostic_summaries.get(&worktree.id()) { for (path, summaries) in summaries { for (&server_id, summary) in summaries { - client.send(proto::UpdateDiagnosticSummary { - project_id: self.project_id, - worktree_id: worktree.id().to_proto(), - summary: Some(summary.to_proto(server_id, path)), - })?; + client + .send(proto::UpdateDiagnosticSummary { + project_id: self.project_id, + worktree_id: worktree.id().to_proto(), + summary: Some(summary.to_proto(server_id, path)), + }) + .log_err(); } } } } - Ok(()) } pub fn request_lsp( diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index f4816cf0cd..fcf10d11c2 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -31,7 +31,7 @@ pub use environment::ProjectEnvironment; use futures::{ channel::mpsc::{self, UnboundedReceiver}, future::try_join_all, - AsyncWriteExt, FutureExt, StreamExt, + AsyncWriteExt, StreamExt, }; use git::{blame::Blame, repository::GitRepository}; @@ -152,7 +152,7 @@ pub struct Project { _subscriptions: Vec, buffers_needing_diff: HashSet>, git_diff_debouncer: DebouncedDelay, - remotely_created_buffers: Arc>, + remotely_created_models: Arc>, terminals: Terminals, node: Option>, tasks: Model, @@ -169,26 +169,28 @@ pub struct Project { } #[derive(Default)] -struct RemotelyCreatedBuffers { +struct RemotelyCreatedModels { + worktrees: Vec>, buffers: Vec>, retain_count: usize, } -struct RemotelyCreatedBufferGuard { - remote_buffers: std::sync::Weak>, +struct RemotelyCreatedModelGuard { + remote_models: std::sync::Weak>, } -impl Drop for RemotelyCreatedBufferGuard { +impl Drop for RemotelyCreatedModelGuard { fn drop(&mut self) { - if let Some(remote_buffers) = self.remote_buffers.upgrade() { - let mut remote_buffers = remote_buffers.lock(); + if let Some(remote_models) = self.remote_models.upgrade() { + let mut remote_models = remote_models.lock(); assert!( - remote_buffers.retain_count > 0, - "RemotelyCreatedBufferGuard dropped too many times" + remote_models.retain_count > 0, + "RemotelyCreatedModelGuard dropped too many times" ); - remote_buffers.retain_count -= 1; - if remote_buffers.retain_count == 0 { - remote_buffers.buffers.clear(); + remote_models.retain_count -= 1; + if remote_models.retain_count == 0 { + remote_models.buffers.clear(); + remote_models.worktrees.clear(); } } } @@ -620,7 +622,7 @@ impl Project { let snippets = SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx); - let worktree_store = cx.new_model(|_| WorktreeStore::new(false, fs.clone())); + let worktree_store = cx.new_model(|_| WorktreeStore::new(None, false, fs.clone())); cx.subscribe(&worktree_store, Self::on_worktree_store_event) .detach(); @@ -687,7 +689,7 @@ impl Project { dev_server_project_id: None, search_history: Self::new_search_history(), environment, - remotely_created_buffers: Default::default(), + remotely_created_models: Default::default(), last_formatting_failure: None, buffers_being_formatted: Default::default(), search_included_history: Self::new_search_history(), @@ -714,11 +716,8 @@ impl Project { let snippets = SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx); - let worktree_store = cx.new_model(|_| { - let mut worktree_store = WorktreeStore::new(false, fs.clone()); - worktree_store.set_upstream_client(ssh.clone().into()); - worktree_store - }); + let worktree_store = + cx.new_model(|_| WorktreeStore::new(Some(ssh.clone().into()), false, fs.clone())); cx.subscribe(&worktree_store, Self::on_worktree_store_event) .detach(); @@ -773,7 +772,7 @@ impl Project { dev_server_project_id: None, search_history: Self::new_search_history(), environment, - remotely_created_buffers: Default::default(), + remotely_created_models: Default::default(), last_formatting_failure: None, buffers_being_formatted: Default::default(), search_included_history: Self::new_search_history(), @@ -787,8 +786,9 @@ impl Project { ssh.subscribe_to_entity(SSH_PROJECT_ID, &this.worktree_store); ssh.subscribe_to_entity(SSH_PROJECT_ID, &this.lsp_store); ssh.subscribe_to_entity(SSH_PROJECT_ID, &this.settings_observer); - client.add_model_message_handler(Self::handle_update_worktree); client.add_model_message_handler(Self::handle_create_buffer_for_peer); + client.add_model_message_handler(Self::handle_update_worktree); + client.add_model_message_handler(Self::handle_update_project); client.add_model_request_handler(BufferStore::handle_update_buffer); BufferStore::init(&client); LspStore::init(&client); @@ -867,8 +867,7 @@ impl Project { let role = response.payload.role(); let worktree_store = cx.new_model(|_| { - let mut store = WorktreeStore::new(true, fs.clone()); - store.set_upstream_client(client.clone().into()); + let mut store = WorktreeStore::new(Some(client.clone().into()), true, fs.clone()); if let Some(dev_server_project_id) = response.payload.dev_server_project_id { store.set_dev_server_project_id(DevServerProjectId(dev_server_project_id)); } @@ -955,7 +954,7 @@ impl Project { search_included_history: Self::new_search_history(), search_excluded_history: Self::new_search_history(), environment: ProjectEnvironment::new(&worktree_store, None, cx), - remotely_created_buffers: Arc::new(Mutex::new(RemotelyCreatedBuffers::default())), + remotely_created_models: Arc::new(Mutex::new(RemotelyCreatedModels::default())), last_formatting_failure: None, buffers_being_formatted: Default::default(), }; @@ -1259,43 +1258,6 @@ impl Project { } } - fn metadata_changed(&mut self, cx: &mut ModelContext) { - cx.notify(); - - let ProjectClientState::Shared { remote_id } = self.client_state else { - return; - }; - let project_id = remote_id; - - let update_project = self.client.request(proto::UpdateProject { - project_id, - worktrees: self.worktree_metadata_protos(cx), - }); - cx.spawn(|this, mut cx| async move { - update_project.await?; - this.update(&mut cx, |this, cx| { - let client = this.client.clone(); - let worktrees = this.worktree_store.read(cx).worktrees().collect::>(); - - for worktree in worktrees { - worktree.update(cx, |worktree, cx| { - let client = client.clone(); - worktree.observe_updates(project_id, cx, { - move |update| client.request(update).map(|result| result.is_ok()) - }); - - this.lsp_store.update(cx, |lsp_store, _| { - lsp_store.send_diagnostic_summaries(worktree) - }) - })?; - } - - anyhow::Ok(()) - }) - }) - .detach_and_log_err(cx); - } - pub fn task_inventory(&self) -> &Model { &self.tasks } @@ -1513,7 +1475,7 @@ impl Project { buffer_store.shared(project_id, self.client.clone().into(), cx) }); self.worktree_store.update(cx, |worktree_store, cx| { - worktree_store.set_shared(true, cx); + worktree_store.shared(project_id, self.client.clone().into(), cx); }); self.lsp_store.update(cx, |lsp_store, cx| { lsp_store.shared(project_id, self.client.clone().into(), cx) @@ -1526,7 +1488,6 @@ impl Project { remote_id: project_id, }; - self.metadata_changed(cx); cx.emit(Event::RemoteIdChanged(Some(project_id))); cx.notify(); Ok(()) @@ -1540,7 +1501,11 @@ impl Project { self.buffer_store .update(cx, |buffer_store, _| buffer_store.forget_shared_buffers()); self.set_collaborators_from_proto(message.collaborators, cx)?; - self.metadata_changed(cx); + + self.worktree_store.update(cx, |worktree_store, cx| { + worktree_store.send_project_updates(cx); + }); + cx.notify(); cx.emit(Event::Reshared); Ok(()) } @@ -1576,7 +1541,6 @@ impl Project { pub fn unshare(&mut self, cx: &mut ModelContext) -> Result<()> { self.unshare_internal(cx)?; - self.metadata_changed(cx); cx.notify(); Ok(()) } @@ -1598,7 +1562,7 @@ impl Project { self.collaborators.clear(); self.client_subscriptions.clear(); self.worktree_store.update(cx, |store, cx| { - store.set_shared(false, cx); + store.unshared(cx); }); self.buffer_store.update(cx, |buffer_store, cx| { buffer_store.forget_shared_buffers(); @@ -1867,9 +1831,9 @@ impl Project { cx: &mut ModelContext, ) -> Result<()> { { - let mut remotely_created_buffers = self.remotely_created_buffers.lock(); - if remotely_created_buffers.retain_count > 0 { - remotely_created_buffers.buffers.push(buffer.clone()) + let mut remotely_created_models = self.remotely_created_models.lock(); + if remotely_created_models.retain_count > 0 { + remotely_created_models.buffers.push(buffer.clone()) } } @@ -2110,10 +2074,17 @@ impl Project { cx.emit(Event::WorktreeRemoved(*id)); } WorktreeStoreEvent::WorktreeOrderChanged => cx.emit(Event::WorktreeOrderChanged), + WorktreeStoreEvent::WorktreeUpdateSent(_) => {} } } fn on_worktree_added(&mut self, worktree: &Model, cx: &mut ModelContext) { + { + let mut remotely_created_models = self.remotely_created_models.lock(); + if remotely_created_models.retain_count > 0 { + remotely_created_models.worktrees.push(worktree.clone()) + } + } cx.observe(worktree, |_, _, cx| cx.notify()).detach(); cx.subscribe(worktree, |this, worktree, event, cx| { let is_local = worktree.read(cx).is_local(); @@ -2140,7 +2111,7 @@ impl Project { } }) .detach(); - self.metadata_changed(cx); + cx.notify(); } fn on_worktree_removed(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext) { @@ -2171,7 +2142,7 @@ impl Project { inventory.remove_worktree_sources(id_to_remove); }); - self.metadata_changed(cx); + cx.notify(); } fn on_buffer_event( @@ -3012,7 +2983,7 @@ impl Project { #[inline(never)] fn definition_impl( - &self, + &mut self, buffer: &Model, position: PointUtf16, cx: &mut ModelContext, @@ -3025,7 +2996,7 @@ impl Project { ) } pub fn definition( - &self, + &mut self, buffer: &Model, position: T, cx: &mut ModelContext, @@ -3035,7 +3006,7 @@ impl Project { } fn declaration_impl( - &self, + &mut self, buffer: &Model, position: PointUtf16, cx: &mut ModelContext, @@ -3049,7 +3020,7 @@ impl Project { } pub fn declaration( - &self, + &mut self, buffer: &Model, position: T, cx: &mut ModelContext, @@ -3059,7 +3030,7 @@ impl Project { } fn type_definition_impl( - &self, + &mut self, buffer: &Model, position: PointUtf16, cx: &mut ModelContext, @@ -3073,7 +3044,7 @@ impl Project { } pub fn type_definition( - &self, + &mut self, buffer: &Model, position: T, cx: &mut ModelContext, @@ -3083,7 +3054,7 @@ impl Project { } pub fn implementation( - &self, + &mut self, buffer: &Model, position: T, cx: &mut ModelContext, @@ -3098,7 +3069,7 @@ impl Project { } pub fn references( - &self, + &mut self, buffer: &Model, position: T, cx: &mut ModelContext, @@ -3113,7 +3084,7 @@ impl Project { } fn document_highlights_impl( - &self, + &mut self, buffer: &Model, position: PointUtf16, cx: &mut ModelContext, @@ -3127,7 +3098,7 @@ impl Project { } pub fn document_highlights( - &self, + &mut self, buffer: &Model, position: T, cx: &mut ModelContext, @@ -3514,7 +3485,7 @@ impl Project { query: Some(query.to_proto()), limit: limit as _, }); - let guard = self.retain_remotely_created_buffers(cx); + let guard = self.retain_remotely_created_models(cx); cx.spawn(move |this, mut cx| async move { let response = request.await?; @@ -3536,7 +3507,7 @@ impl Project { } pub fn request_lsp( - &self, + &mut self, buffer_handle: Model, server: LanguageServerToQuery, request: R, @@ -3546,8 +3517,14 @@ impl Project { ::Result: Send, ::Params: Send, { - self.lsp_store.update(cx, |lsp_store, cx| { + let guard = self.retain_remotely_created_models(cx); + let task = self.lsp_store.update(cx, |lsp_store, cx| { lsp_store.request_lsp(buffer_handle, server, request, cx) + }); + cx.spawn(|_, _| async move { + let result = task.await; + drop(guard); + result }) } @@ -4095,6 +4072,7 @@ impl Project { })? } + // Collab sends UpdateWorktree protos as messages async fn handle_update_worktree( this: Model, envelope: TypedEnvelope, @@ -4130,19 +4108,21 @@ impl Project { BufferStore::handle_update_buffer(buffer_store, envelope, cx).await } - fn retain_remotely_created_buffers( + fn retain_remotely_created_models( &mut self, cx: &mut ModelContext, - ) -> RemotelyCreatedBufferGuard { + ) -> RemotelyCreatedModelGuard { { - let mut remotely_created_buffers = self.remotely_created_buffers.lock(); - if remotely_created_buffers.retain_count == 0 { - remotely_created_buffers.buffers = self.buffer_store.read(cx).buffers().collect(); + let mut remotely_create_models = self.remotely_created_models.lock(); + if remotely_create_models.retain_count == 0 { + remotely_create_models.buffers = self.buffer_store.read(cx).buffers().collect(); + remotely_create_models.worktrees = + self.worktree_store.read(cx).worktrees().collect(); } - remotely_created_buffers.retain_count += 1; + remotely_create_models.retain_count += 1; } - RemotelyCreatedBufferGuard { - remote_buffers: Arc::downgrade(&self.remotely_created_buffers), + RemotelyCreatedModelGuard { + remote_models: Arc::downgrade(&self.remotely_created_models), } } @@ -4637,16 +4617,11 @@ impl Project { worktrees: Vec, cx: &mut ModelContext, ) -> Result<()> { - self.metadata_changed(cx); - self.worktree_store.update(cx, |worktree_store, cx| { - worktree_store.set_worktrees_from_proto( - worktrees, - self.replica_id(), - self.remote_id().ok_or_else(|| anyhow!("invalid project"))?, - self.client.clone().into(), - cx, - ) - }) + cx.notify(); + let result = self.worktree_store.update(cx, |worktree_store, cx| { + worktree_store.set_worktrees_from_proto(worktrees, self.replica_id(), cx) + }); + result } fn set_collaborators_from_proto( diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index 07764d4a05..7fae8b9e1d 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -39,8 +39,10 @@ struct MatchingEntry { pub struct WorktreeStore { next_entry_id: Arc, upstream_client: Option, + downstream_client: Option, + remote_id: u64, dev_server_project_id: Option, - is_shared: bool, + retain_worktrees: bool, worktrees: Vec, worktrees_reordered: bool, #[allow(clippy::type_complexity)] @@ -53,6 +55,7 @@ pub enum WorktreeStoreEvent { WorktreeAdded(Model), WorktreeRemoved(EntityId, WorktreeId), WorktreeOrderChanged, + WorktreeUpdateSent(Model), } impl EventEmitter for WorktreeStore {} @@ -66,23 +69,25 @@ impl WorktreeStore { client.add_model_request_handler(Self::handle_expand_project_entry); } - pub fn new(retain_worktrees: bool, fs: Arc) -> Self { + pub fn new( + upstream_client: Option, + retain_worktrees: bool, + fs: Arc, + ) -> Self { Self { next_entry_id: Default::default(), loading_worktrees: Default::default(), - upstream_client: None, dev_server_project_id: None, - is_shared: retain_worktrees, + downstream_client: None, worktrees: Vec::new(), worktrees_reordered: false, + retain_worktrees, + remote_id: 0, + upstream_client, fs, } } - pub fn set_upstream_client(&mut self, client: AnyProtoClient) { - self.upstream_client = Some(client); - } - pub fn set_dev_server_project_id(&mut self, id: DevServerProjectId) { self.dev_server_project_id = Some(id); } @@ -201,6 +206,13 @@ impl WorktreeStore { path: abs_path.clone(), }) .await?; + + if let Some(existing_worktree) = this.read_with(&cx, |this, cx| { + this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx) + })? { + return Ok(existing_worktree); + } + let worktree = cx.update(|cx| { Worktree::remote( 0, @@ -302,7 +314,10 @@ impl WorktreeStore { } pub fn add(&mut self, worktree: &Model, cx: &mut ModelContext) { - let push_strong_handle = self.is_shared || worktree.read(cx).is_visible(); + let worktree_id = worktree.read(cx).id(); + debug_assert!(!self.worktrees().any(|w| w.read(cx).id() == worktree_id)); + + let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible(); let handle = if push_strong_handle { WorktreeHandle::Strong(worktree.clone()) } else { @@ -322,13 +337,15 @@ impl WorktreeStore { } cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone())); + self.send_project_updates(cx); let handle_id = worktree.entity_id(); - cx.observe_release(worktree, move |_, worktree, cx| { + cx.observe_release(worktree, move |this, worktree, cx| { cx.emit(WorktreeStoreEvent::WorktreeRemoved( handle_id, worktree.id(), )); + this.send_project_updates(cx); }) .detach(); } @@ -349,6 +366,7 @@ impl WorktreeStore { false } }); + self.send_project_updates(cx); } pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) { @@ -359,8 +377,6 @@ impl WorktreeStore { &mut self, worktrees: Vec, replica_id: ReplicaId, - remote_id: u64, - client: AnyProtoClient, cx: &mut ModelContext, ) -> Result<()> { let mut old_worktrees_by_id = self @@ -372,18 +388,31 @@ impl WorktreeStore { }) .collect::>(); + let client = self + .upstream_client + .clone() + .ok_or_else(|| anyhow!("invalid project"))?; + for worktree in worktrees { if let Some(old_worktree) = old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id)) { - self.worktrees.push(WorktreeHandle::Strong(old_worktree)); + let push_strong_handle = + self.retain_worktrees || old_worktree.read(cx).is_visible(); + let handle = if push_strong_handle { + WorktreeHandle::Strong(old_worktree.clone()) + } else { + WorktreeHandle::Weak(old_worktree.downgrade()) + }; + self.worktrees.push(handle); } else { self.add( - &Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx), + &Worktree::remote(self.remote_id, replica_id, worktree, client.clone(), cx), cx, ); } } + self.send_project_updates(cx); Ok(()) } @@ -446,33 +475,109 @@ impl WorktreeStore { } } - pub fn set_shared(&mut self, is_shared: bool, cx: &mut ModelContext) { - self.is_shared = is_shared; + pub fn send_project_updates(&mut self, cx: &mut ModelContext) { + let Some(downstream_client) = self.downstream_client.clone() else { + return; + }; + let project_id = self.remote_id; + + let update = proto::UpdateProject { + project_id, + worktrees: self.worktree_metadata_protos(cx), + }; + + // collab has bad concurrency guarantees, so we send requests in serial. + let update_project = if downstream_client.goes_via_collab() { + Some(downstream_client.request(update)) + } else { + downstream_client.send(update).log_err(); + None + }; + cx.spawn(|this, mut cx| async move { + if let Some(update_project) = update_project { + update_project.await?; + } + + this.update(&mut cx, |this, cx| { + let worktrees = this.worktrees().collect::>(); + + for worktree in worktrees { + worktree.update(cx, |worktree, cx| { + let client = downstream_client.clone(); + worktree.observe_updates(project_id, cx, { + move |update| { + let client = client.clone(); + async move { + if client.goes_via_collab() { + client.request(update).map(|result| result.is_ok()).await + } else { + client.send(update).is_ok() + } + } + } + }); + }); + + cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone())) + } + + anyhow::Ok(()) + }) + }) + .detach_and_log_err(cx); + } + + pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec { + self.worktrees() + .map(|worktree| { + let worktree = worktree.read(cx); + proto::WorktreeMetadata { + id: worktree.id().to_proto(), + root_name: worktree.root_name().into(), + visible: worktree.is_visible(), + abs_path: worktree.abs_path().to_string_lossy().into(), + } + }) + .collect() + } + + pub fn shared( + &mut self, + remote_id: u64, + downsteam_client: AnyProtoClient, + cx: &mut ModelContext, + ) { + self.retain_worktrees = true; + self.remote_id = remote_id; + self.downstream_client = Some(downsteam_client); // When shared, retain all worktrees - if is_shared { - for worktree_handle in self.worktrees.iter_mut() { - match worktree_handle { - WorktreeHandle::Strong(_) => {} - WorktreeHandle::Weak(worktree) => { - if let Some(worktree) = worktree.upgrade() { - *worktree_handle = WorktreeHandle::Strong(worktree); - } + for worktree_handle in self.worktrees.iter_mut() { + match worktree_handle { + WorktreeHandle::Strong(_) => {} + WorktreeHandle::Weak(worktree) => { + if let Some(worktree) = worktree.upgrade() { + *worktree_handle = WorktreeHandle::Strong(worktree); } } } } + self.send_project_updates(cx); + } + + pub fn unshared(&mut self, cx: &mut ModelContext) { + self.retain_worktrees = false; + self.downstream_client.take(); + // When not shared, only retain the visible worktrees - else { - for worktree_handle in self.worktrees.iter_mut() { - if let WorktreeHandle::Strong(worktree) = worktree_handle { - let is_visible = worktree.update(cx, |worktree, _| { - worktree.stop_observing_updates(); - worktree.is_visible() - }); - if !is_visible { - *worktree_handle = WorktreeHandle::Weak(worktree.downgrade()); - } + for worktree_handle in self.worktrees.iter_mut() { + if let WorktreeHandle::Strong(worktree) = worktree_handle { + let is_visible = worktree.update(cx, |worktree, _| { + worktree.stop_observing_updates(); + worktree.is_visible() + }); + if !is_visible { + *worktree_handle = WorktreeHandle::Weak(worktree.downgrade()); } } } diff --git a/crates/remote/src/ssh_session.rs b/crates/remote/src/ssh_session.rs index 4aab731e64..10608b74f3 100644 --- a/crates/remote/src/ssh_session.rs +++ b/crates/remote/src/ssh_session.rs @@ -247,7 +247,8 @@ impl SshSession { let line_ix = start_ix + ix; let content = &stderr_buffer[start_ix..line_ix]; start_ix = line_ix + 1; - if let Ok(record) = serde_json::from_slice::(content) { + if let Ok(mut record) = serde_json::from_slice::(content) { + record.message = format!("(remote) {}", record.message); record.log(log::logger()) } else { eprintln!("(remote) {}", String::from_utf8_lossy(content)); @@ -469,6 +470,10 @@ impl ProtoClient for SshSession { fn message_handler_set(&self) -> &Mutex { &self.state } + + fn goes_via_collab(&self) -> bool { + false + } } impl SshClientState { diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index bbd82281d8..54f48e3626 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -44,7 +44,11 @@ impl HeadlessProject { pub fn new(session: Arc, fs: Arc, cx: &mut ModelContext) -> Self { let languages = Arc::new(LanguageRegistry::new(cx.background_executor().clone())); - let worktree_store = cx.new_model(|_| WorktreeStore::new(true, fs.clone())); + let worktree_store = cx.new_model(|cx| { + let mut store = WorktreeStore::new(None, true, fs.clone()); + store.shared(SSH_PROJECT_ID, session.clone().into(), cx); + store + }); let buffer_store = cx.new_model(|cx| { let mut buffer_store = BufferStore::new(worktree_store.clone(), Some(SSH_PROJECT_ID), cx); @@ -196,18 +200,11 @@ impl HeadlessProject { .await?; this.update(&mut cx, |this, cx| { - let session = this.session.clone(); this.worktree_store.update(cx, |worktree_store, cx| { worktree_store.add(&worktree, cx); }); - worktree.update(cx, |worktree, cx| { - worktree.observe_updates(0, cx, move |update| { - session.send(update).ok(); - futures::future::ready(true) - }); - proto::AddWorktreeResponse { - worktree_id: worktree.id().to_proto(), - } + worktree.update(cx, |worktree, _| proto::AddWorktreeResponse { + worktree_id: worktree.id().to_proto(), }) }) } diff --git a/crates/rpc/src/proto_client.rs b/crates/rpc/src/proto_client.rs index 4a990a8433..89ef580cdf 100644 --- a/crates/rpc/src/proto_client.rs +++ b/crates/rpc/src/proto_client.rs @@ -27,6 +27,8 @@ pub trait ProtoClient: Send + Sync { fn send_response(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()>; fn message_handler_set(&self) -> &parking_lot::Mutex; + + fn goes_via_collab(&self) -> bool; } #[derive(Default)] @@ -139,6 +141,10 @@ impl AnyProtoClient { Self(client) } + pub fn goes_via_collab(&self) -> bool { + self.0.goes_via_collab() + } + pub fn request( &self, request: T,