From ce34bf62fe57d7c1303cb5231c7a40ed51d13aef Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 24 Apr 2023 13:18:37 -0700 Subject: [PATCH 1/3] Add failing test for diagnostic message ordering Co-authored-by: Julia Risley --- crates/collab/src/tests/integration_tests.rs | 140 ++++++++++++++++++- 1 file changed, 139 insertions(+), 1 deletion(-) diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index d42c4b3ebc..5c19226960 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -32,7 +32,10 @@ use std::{ env, future, mem, path::{Path, PathBuf}, rc::Rc, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, }; use unindent::Unindent as _; use workspace::{ @@ -3636,6 +3639,141 @@ async fn test_collaborating_with_diagnostics( }); } +#[gpui::test(iterations = 10)] +async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + .await; + + // Set up a fake language server. + let mut language = Language::new( + LanguageConfig { + name: "Rust".into(), + path_suffixes: vec!["rs".to_string()], + ..Default::default() + }, + Some(tree_sitter_rust::language()), + ); + let mut fake_language_servers = language + .set_fake_lsp_adapter(Arc::new(FakeLspAdapter { + disk_based_diagnostics_progress_token: Some("the-disk-based-token".into()), + disk_based_diagnostics_sources: vec!["the-disk-based-diagnostics-source".into()], + ..Default::default() + })) + .await; + client_a.language_registry.add(Arc::new(language)); + + let file_names = &["one.rs", "two.rs", "three.rs", "four.rs", "five.rs"]; + client_a + .fs + .insert_tree( + "/test", + json!({ + "one.rs": "const ONE: usize = 1;", + "two.rs": "const TWO: usize = 2;", + "three.rs": "const THREE: usize = 3;", + "four.rs": "const FOUR: usize = 3;", + "five.rs": "const FIVE: usize = 3;", + }), + ) + .await; + + let (project_a, worktree_id) = client_a.build_local_project("/test", cx_a).await; + + // Share a project as client A + let active_call_a = cx_a.read(ActiveCall::global); + let project_id = active_call_a + .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx)) + .await + .unwrap(); + + // Join the project as client B and open all three files. + let project_b = client_b.build_remote_project(project_id, cx_b).await; + let guest_buffers = futures::future::try_join_all(file_names.iter().map(|file_name| { + project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, file_name), cx)) + })) + .await + .unwrap(); + + // Simulate a language server reporting errors for a file. + let fake_language_server = fake_language_servers.next().await.unwrap(); + fake_language_server + .request::(lsp::WorkDoneProgressCreateParams { + token: lsp::NumberOrString::String("the-disk-based-token".to_string()), + }) + .await + .unwrap(); + fake_language_server.notify::(lsp::ProgressParams { + token: lsp::NumberOrString::String("the-disk-based-token".to_string()), + value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Begin( + lsp::WorkDoneProgressBegin { + title: "Progress Began".into(), + ..Default::default() + }, + )), + }); + for file_name in file_names { + fake_language_server.notify::( + lsp::PublishDiagnosticsParams { + uri: lsp::Url::from_file_path(Path::new("/test").join(file_name)).unwrap(), + version: None, + diagnostics: vec![lsp::Diagnostic { + severity: Some(lsp::DiagnosticSeverity::WARNING), + source: Some("the-disk-based-diagnostics-source".into()), + range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)), + message: "message one".to_string(), + ..Default::default() + }], + }, + ); + } + fake_language_server.notify::(lsp::ProgressParams { + token: lsp::NumberOrString::String("the-disk-based-token".to_string()), + value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::End( + lsp::WorkDoneProgressEnd { message: None }, + )), + }); + + // When the "disk base diagnostics finished" message is received, the buffers' + // diagnostics are expected to be present. + let disk_based_diagnostics_finished = Arc::new(AtomicBool::new(false)); + project_b.update(cx_b, { + let project_b = project_b.clone(); + let disk_based_diagnostics_finished = disk_based_diagnostics_finished.clone(); + move |_, cx| { + cx.subscribe(&project_b, move |_, _, event, cx| { + if let project::Event::DiskBasedDiagnosticsFinished { .. } = event { + disk_based_diagnostics_finished.store(true, SeqCst); + for buffer in &guest_buffers { + assert_eq!( + buffer + .read(cx) + .snapshot() + .diagnostics_in_range::<_, usize>(0..5, false) + .count(), + 1, + "expected a diagnostic for buffer {:?}", + buffer.read(cx).file().unwrap().path(), + ); + } + } + }) + .detach(); + } + }); + + deterministic.run_until_parked(); + assert!(disk_based_diagnostics_finished.load(SeqCst)); +} + #[gpui::test(iterations = 10)] async fn test_collaborating_with_completion( deterministic: Arc, From a8ddba55d8728591ae1da62ce16673c2d7c9cb81 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 24 Apr 2023 13:52:03 -0700 Subject: [PATCH 2/3] Send language server updates via the same task that sends buffer operations Co-authored-by: Julia Risley --- crates/project/src/project.rs | 225 +++++++++++++++++++++------------- 1 file changed, 137 insertions(+), 88 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index aad9a50856..976554eab4 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -142,6 +142,10 @@ enum BufferMessage { buffer_id: u64, operation: proto::Operation, }, + LanguageServerUpdate { + language_server_id: LanguageServerId, + message: proto::update_language_server::Variant, + }, Resync, } @@ -1791,9 +1795,35 @@ impl Project { ) -> Option<()> { const MAX_BATCH_SIZE: usize = 128; - let mut needs_resync_with_host = false; let mut operations_by_buffer_id = HashMap::default(); + async fn flush_operations( + this: &ModelHandle, + operations_by_buffer_id: &mut HashMap>, + needs_resync_with_host: &mut bool, + is_local: bool, + cx: &AsyncAppContext, + ) { + for (buffer_id, operations) in operations_by_buffer_id.drain() { + let request = this.read_with(cx, |this, _| { + let project_id = this.remote_id()?; + Some(this.client.request(proto::UpdateBuffer { + buffer_id, + project_id, + operations, + })) + }); + if let Some(request) = request { + if request.await.is_err() && !is_local { + *needs_resync_with_host = true; + break; + } + } + } + } + + let mut needs_resync_with_host = false; let mut changes = rx.ready_chunks(MAX_BATCH_SIZE); + while let Some(changes) = changes.next().await { let this = this.upgrade(&mut cx)?; let is_local = this.read_with(&cx, |this, _| this.is_local()); @@ -1813,6 +1843,7 @@ impl Project { .or_insert(Vec::new()) .push(operation); } + BufferMessage::Resync => { operations_by_buffer_id.clear(); if this @@ -1823,25 +1854,43 @@ impl Project { needs_resync_with_host = false; } } - } - } - for (buffer_id, operations) in operations_by_buffer_id.drain() { - let request = this.read_with(&cx, |this, _| { - let project_id = this.remote_id()?; - Some(this.client.request(proto::UpdateBuffer { - buffer_id, - project_id, - operations, - })) - }); - if let Some(request) = request { - if request.await.is_err() && !is_local { - needs_resync_with_host = true; - break; + BufferMessage::LanguageServerUpdate { + language_server_id, + message, + } => { + flush_operations( + &this, + &mut operations_by_buffer_id, + &mut needs_resync_with_host, + is_local, + &cx, + ) + .await; + + this.read_with(&cx, |this, _| { + if let Some(project_id) = this.remote_id() { + this.client + .send(proto::UpdateLanguageServer { + project_id, + language_server_id: language_server_id.0 as u64, + variant: Some(message), + }) + .log_err(); + } + }); } } } + + flush_operations( + &this, + &mut operations_by_buffer_id, + &mut needs_resync_with_host, + is_local, + &cx, + ) + .await; } None @@ -1962,19 +2011,24 @@ impl Project { Duration::from_secs(1); let task = cx.spawn_weak(|this, mut cx| async move { - cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await; - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx | { - this.disk_based_diagnostics_finished(language_server_id, cx); - this.broadcast_language_server_update( - language_server_id, - proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( - proto::LspDiskBasedDiagnosticsUpdated {}, - ), - ); - }); - } - }); + cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await; + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| { + this.disk_based_diagnostics_finished( + language_server_id, + cx, + ); + this.buffer_changes_tx + .unbounded_send( + BufferMessage::LanguageServerUpdate { + language_server_id, + message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default()) + }, + ) + .ok(); + }); + } + }); *simulate_disk_based_diagnostics_completion = Some(task); } } @@ -2609,7 +2663,7 @@ impl Project { fn on_lsp_progress( &mut self, progress: lsp::ProgressParams, - server_id: LanguageServerId, + language_server_id: LanguageServerId, disk_based_diagnostics_progress_token: Option, cx: &mut ModelContext, ) { @@ -2622,7 +2676,7 @@ impl Project { }; let lsp::ProgressParamsValue::WorkDone(progress) = progress.value; let language_server_status = - if let Some(status) = self.language_server_statuses.get_mut(&server_id) { + if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) { status } else { return; @@ -2642,16 +2696,16 @@ impl Project { lsp::WorkDoneProgress::Begin(report) => { if is_disk_based_diagnostics_progress { language_server_status.has_pending_diagnostic_updates = true; - self.disk_based_diagnostics_started(server_id, cx); - self.broadcast_language_server_update( - server_id, - proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating( - proto::LspDiskBasedDiagnosticsUpdating {}, - ), - ); + self.disk_based_diagnostics_started(language_server_id, cx); + self.buffer_changes_tx + .unbounded_send(BufferMessage::LanguageServerUpdate { + language_server_id, + message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default()) + }) + .ok(); } else { self.on_lsp_work_start( - server_id, + language_server_id, token.clone(), LanguageServerProgress { message: report.message.clone(), @@ -2660,20 +2714,24 @@ impl Project { }, cx, ); - self.broadcast_language_server_update( - server_id, - proto::update_language_server::Variant::WorkStart(proto::LspWorkStart { - token, - message: report.message, - percentage: report.percentage.map(|p| p as u32), - }), - ); + self.buffer_changes_tx + .unbounded_send(BufferMessage::LanguageServerUpdate { + language_server_id, + message: proto::update_language_server::Variant::WorkStart( + proto::LspWorkStart { + token, + message: report.message, + percentage: report.percentage.map(|p| p as u32), + }, + ), + }) + .ok(); } } lsp::WorkDoneProgress::Report(report) => { if !is_disk_based_diagnostics_progress { self.on_lsp_work_progress( - server_id, + language_server_id, token.clone(), LanguageServerProgress { message: report.message.clone(), @@ -2682,16 +2740,18 @@ impl Project { }, cx, ); - self.broadcast_language_server_update( - server_id, - proto::update_language_server::Variant::WorkProgress( - proto::LspWorkProgress { - token, - message: report.message, - percentage: report.percentage.map(|p| p as u32), - }, - ), - ); + self.buffer_changes_tx + .unbounded_send(BufferMessage::LanguageServerUpdate { + language_server_id, + message: proto::update_language_server::Variant::WorkProgress( + proto::LspWorkProgress { + token, + message: report.message, + percentage: report.percentage.map(|p| p as u32), + }, + ), + }) + .ok(); } } lsp::WorkDoneProgress::End(_) => { @@ -2699,21 +2759,26 @@ impl Project { if is_disk_based_diagnostics_progress { language_server_status.has_pending_diagnostic_updates = false; - self.disk_based_diagnostics_finished(server_id, cx); - self.broadcast_language_server_update( - server_id, - proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( - proto::LspDiskBasedDiagnosticsUpdated {}, - ), - ); + self.disk_based_diagnostics_finished(language_server_id, cx); + self.buffer_changes_tx + .unbounded_send(BufferMessage::LanguageServerUpdate { + language_server_id, + message: + proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( + Default::default(), + ), + }) + .ok(); } else { - self.on_lsp_work_end(server_id, token.clone(), cx); - self.broadcast_language_server_update( - server_id, - proto::update_language_server::Variant::WorkEnd(proto::LspWorkEnd { - token, - }), - ); + self.on_lsp_work_end(language_server_id, token.clone(), cx); + self.buffer_changes_tx + .unbounded_send(BufferMessage::LanguageServerUpdate { + language_server_id, + message: proto::update_language_server::Variant::WorkEnd( + proto::LspWorkEnd { token }, + ), + }) + .ok(); } } } @@ -2822,22 +2887,6 @@ impl Project { }) } - fn broadcast_language_server_update( - &self, - language_server_id: LanguageServerId, - event: proto::update_language_server::Variant, - ) { - if let Some(project_id) = self.remote_id() { - self.client - .send(proto::UpdateLanguageServer { - project_id, - language_server_id: language_server_id.0 as u64, - variant: Some(event), - }) - .log_err(); - } - } - pub fn language_server_statuses( &self, ) -> impl DoubleEndedIterator { From 7bd51851c2bce6cee5d63b09a26c157b92d2d00c Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 24 Apr 2023 13:54:47 -0700 Subject: [PATCH 3/3] :art: Co-authored-by: Julia Risley --- crates/project/src/project.rs | 59 ++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 976554eab4..c82855b03c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -93,7 +93,7 @@ pub trait Item { pub struct Project { worktrees: Vec, active_entry: Option, - buffer_changes_tx: mpsc::UnboundedSender, + buffer_ordered_messages_tx: mpsc::UnboundedSender, languages: Arc, language_servers: HashMap, language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>, @@ -137,7 +137,8 @@ struct LspBufferSnapshot { snapshot: TextBufferSnapshot, } -enum BufferMessage { +/// Message ordered with respect to buffer operations +enum BufferOrderedMessage { Operation { buffer_id: u64, operation: proto::Operation, @@ -447,11 +448,11 @@ impl Project { ) -> ModelHandle { cx.add_model(|cx: &mut ModelContext| { let (tx, rx) = mpsc::unbounded(); - cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx)) + cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx)) .detach(); Self { worktrees: Default::default(), - buffer_changes_tx: tx, + buffer_ordered_messages_tx: tx, collaborators: Default::default(), opened_buffers: Default::default(), shared_buffers: Default::default(), @@ -515,11 +516,11 @@ impl Project { } let (tx, rx) = mpsc::unbounded(); - cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx)) + cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx)) .detach(); let mut this = Self { worktrees: Vec::new(), - buffer_changes_tx: tx, + buffer_ordered_messages_tx: tx, loading_buffers_by_path: Default::default(), opened_buffer: watch::channel(), shared_buffers: Default::default(), @@ -1172,8 +1173,8 @@ impl Project { ) }) .collect(); - self.buffer_changes_tx - .unbounded_send(BufferMessage::Resync) + self.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::Resync) .unwrap(); cx.notify(); Ok(()) @@ -1788,9 +1789,9 @@ impl Project { } } - async fn send_buffer_messages( + async fn send_buffer_ordered_messages( this: WeakModelHandle, - rx: UnboundedReceiver, + rx: UnboundedReceiver, mut cx: AsyncAppContext, ) -> Option<()> { const MAX_BATCH_SIZE: usize = 128; @@ -1830,7 +1831,7 @@ impl Project { for change in changes { match change { - BufferMessage::Operation { + BufferOrderedMessage::Operation { buffer_id, operation, } => { @@ -1844,7 +1845,7 @@ impl Project { .push(operation); } - BufferMessage::Resync => { + BufferOrderedMessage::Resync => { operations_by_buffer_id.clear(); if this .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx)) @@ -1855,7 +1856,7 @@ impl Project { } } - BufferMessage::LanguageServerUpdate { + BufferOrderedMessage::LanguageServerUpdate { language_server_id, message, } => { @@ -1904,8 +1905,8 @@ impl Project { ) -> Option<()> { match event { BufferEvent::Operation(operation) => { - self.buffer_changes_tx - .unbounded_send(BufferMessage::Operation { + self.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::Operation { buffer_id: buffer.read(cx).remote_id(), operation: language::proto::serialize_operation(operation), }) @@ -2018,9 +2019,9 @@ impl Project { language_server_id, cx, ); - this.buffer_changes_tx + this.buffer_ordered_messages_tx .unbounded_send( - BufferMessage::LanguageServerUpdate { + BufferOrderedMessage::LanguageServerUpdate { language_server_id, message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default()) }, @@ -2697,8 +2698,8 @@ impl Project { if is_disk_based_diagnostics_progress { language_server_status.has_pending_diagnostic_updates = true; self.disk_based_diagnostics_started(language_server_id, cx); - self.buffer_changes_tx - .unbounded_send(BufferMessage::LanguageServerUpdate { + self.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default()) }) @@ -2714,8 +2715,8 @@ impl Project { }, cx, ); - self.buffer_changes_tx - .unbounded_send(BufferMessage::LanguageServerUpdate { + self.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::WorkStart( proto::LspWorkStart { @@ -2740,8 +2741,8 @@ impl Project { }, cx, ); - self.buffer_changes_tx - .unbounded_send(BufferMessage::LanguageServerUpdate { + self.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::WorkProgress( proto::LspWorkProgress { @@ -2760,8 +2761,8 @@ impl Project { if is_disk_based_diagnostics_progress { language_server_status.has_pending_diagnostic_updates = false; self.disk_based_diagnostics_finished(language_server_id, cx); - self.buffer_changes_tx - .unbounded_send(BufferMessage::LanguageServerUpdate { + self.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( @@ -2771,8 +2772,8 @@ impl Project { .ok(); } else { self.on_lsp_work_end(language_server_id, token.clone(), cx); - self.buffer_changes_tx - .unbounded_send(BufferMessage::LanguageServerUpdate { + self.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::LanguageServerUpdate { language_server_id, message: proto::update_language_server::Variant::WorkEnd( proto::LspWorkEnd { token }, @@ -4915,8 +4916,8 @@ impl Project { if is_host { this.opened_buffers .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_))); - this.buffer_changes_tx - .unbounded_send(BufferMessage::Resync) + this.buffer_ordered_messages_tx + .unbounded_send(BufferOrderedMessage::Resync) .unwrap(); }