lsp: Fix workspace diagnostics lag & add streaming support (#34022)

Closes https://github.com/zed-industries/zed/issues/33980
Closes https://github.com/zed-industries/zed/discussions/33979

- Switches to the debounce task pattern for diagnostic summary
computations, which most importantly lets us do them only once when a
large number of DiagnosticUpdated events are received at once.
- Makes workspace diagnostic requests not time out if a partial result
is received.
- Makes diagnostics from workspace diagnostic partial results get
merged.

There might be some related areas where we're not fully complying with
the LSP spec but they may be outside the scope of what this PR should
include.

Release Notes:

- Added support for streaming LSP workspace diagnostics.
- Fixed editor freeze from large LSP workspace diagnostic responses.
This commit is contained in:
teapo 2025-07-15 17:41:45 +02:00 committed by GitHub
parent 5f3e7a5f91
commit d7bb1c1d0e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 460 additions and 114 deletions

View file

@ -29,7 +29,7 @@ use clock::Global;
use collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map};
use futures::{
AsyncWriteExt, Future, FutureExt, StreamExt,
future::{Shared, join_all},
future::{Either, Shared, join_all, pending, select},
select, select_biased,
stream::FuturesUnordered,
};
@ -85,9 +85,11 @@ use std::{
cmp::{Ordering, Reverse},
convert::TryInto,
ffi::OsStr,
future::ready,
iter, mem,
ops::{ControlFlow, Range},
path::{self, Path, PathBuf},
pin::pin,
rc::Rc,
sync::Arc,
time::{Duration, Instant},
@ -7585,7 +7587,8 @@ impl LspStore {
diagnostics,
|_, _, _| false,
cx,
)
)?;
Ok(())
}
pub fn merge_diagnostic_entries(
@ -9130,13 +9133,39 @@ impl LspStore {
}
};
let progress = match progress.value {
lsp::ProgressParamsValue::WorkDone(progress) => progress,
lsp::ProgressParamsValue::WorkspaceDiagnostic(_) => {
return;
match progress.value {
lsp::ProgressParamsValue::WorkDone(progress) => {
self.handle_work_done_progress(
progress,
language_server_id,
disk_based_diagnostics_progress_token,
token,
cx,
);
}
};
lsp::ProgressParamsValue::WorkspaceDiagnostic(report) => {
if let Some(LanguageServerState::Running {
workspace_refresh_task: Some(workspace_refresh_task),
..
}) = self
.as_local_mut()
.and_then(|local| local.language_servers.get_mut(&language_server_id))
{
workspace_refresh_task.progress_tx.try_send(()).ok();
self.apply_workspace_diagnostic_report(language_server_id, report, cx)
}
}
}
}
fn handle_work_done_progress(
&mut self,
progress: lsp::WorkDoneProgress,
language_server_id: LanguageServerId,
disk_based_diagnostics_progress_token: Option<String>,
token: String,
cx: &mut Context<Self>,
) {
let language_server_status =
if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
status
@ -11297,13 +11326,13 @@ impl LspStore {
pub fn pull_workspace_diagnostics(&mut self, server_id: LanguageServerId) {
if let Some(LanguageServerState::Running {
workspace_refresh_task: Some((tx, _)),
workspace_refresh_task: Some(workspace_refresh_task),
..
}) = self
.as_local_mut()
.and_then(|local| local.language_servers.get_mut(&server_id))
{
tx.try_send(()).ok();
workspace_refresh_task.refresh_tx.try_send(()).ok();
}
}
@ -11319,11 +11348,83 @@ impl LspStore {
local.language_server_ids_for_buffer(buffer, cx)
}) {
if let Some(LanguageServerState::Running {
workspace_refresh_task: Some((tx, _)),
workspace_refresh_task: Some(workspace_refresh_task),
..
}) = local.language_servers.get_mut(&server_id)
{
tx.try_send(()).ok();
workspace_refresh_task.refresh_tx.try_send(()).ok();
}
}
}
fn apply_workspace_diagnostic_report(
&mut self,
server_id: LanguageServerId,
report: lsp::WorkspaceDiagnosticReportResult,
cx: &mut Context<Self>,
) {
let workspace_diagnostics =
GetDocumentDiagnostics::deserialize_workspace_diagnostics_report(report, server_id);
for workspace_diagnostics in workspace_diagnostics {
let LspPullDiagnostics::Response {
server_id,
uri,
diagnostics,
} = workspace_diagnostics.diagnostics
else {
continue;
};
let adapter = self.language_server_adapter_for_id(server_id);
let disk_based_sources = adapter
.as_ref()
.map(|adapter| adapter.disk_based_diagnostic_sources.as_slice())
.unwrap_or(&[]);
match diagnostics {
PulledDiagnostics::Unchanged { result_id } => {
self.merge_diagnostics(
server_id,
lsp::PublishDiagnosticsParams {
uri: uri.clone(),
diagnostics: Vec::new(),
version: None,
},
Some(result_id),
DiagnosticSourceKind::Pulled,
disk_based_sources,
|_, _, _| true,
cx,
)
.log_err();
}
PulledDiagnostics::Changed {
diagnostics,
result_id,
} => {
self.merge_diagnostics(
server_id,
lsp::PublishDiagnosticsParams {
uri: uri.clone(),
diagnostics,
version: workspace_diagnostics.version,
},
result_id,
DiagnosticSourceKind::Pulled,
disk_based_sources,
|buffer, old_diagnostic, cx| match old_diagnostic.source_kind {
DiagnosticSourceKind::Pulled => {
let buffer_url = File::from_dyn(buffer.file())
.map(|f| f.abs_path(cx))
.and_then(|abs_path| file_path_to_lsp_url(&abs_path).ok());
buffer_url.is_none_or(|buffer_url| buffer_url != uri)
}
DiagnosticSourceKind::Other | DiagnosticSourceKind::Pushed => true,
},
cx,
)
.log_err();
}
}
}
}
@ -11379,7 +11480,7 @@ fn subscribe_to_binary_statuses(
fn lsp_workspace_diagnostics_refresh(
server: Arc<LanguageServer>,
cx: &mut Context<'_, LspStore>,
) -> Option<(mpsc::Sender<()>, Task<()>)> {
) -> Option<WorkspaceRefreshTask> {
let identifier = match server.capabilities().diagnostic_provider? {
lsp::DiagnosticServerCapabilities::Options(diagnostic_options) => {
if !diagnostic_options.workspace_diagnostics {
@ -11396,19 +11497,22 @@ fn lsp_workspace_diagnostics_refresh(
}
};
let (mut tx, mut rx) = mpsc::channel(1);
tx.try_send(()).ok();
let (progress_tx, mut progress_rx) = mpsc::channel(1);
let (mut refresh_tx, mut refresh_rx) = mpsc::channel(1);
refresh_tx.try_send(()).ok();
let workspace_query_language_server = cx.spawn(async move |lsp_store, cx| {
let mut attempts = 0;
let max_attempts = 50;
let mut requests = 0;
loop {
let Some(()) = rx.recv().await else {
let Some(()) = refresh_rx.recv().await else {
return;
};
'request: loop {
requests += 1;
if attempts > max_attempts {
log::error!(
"Failed to pull workspace diagnostics {max_attempts} times, aborting"
@ -11437,14 +11541,29 @@ fn lsp_workspace_diagnostics_refresh(
return;
};
let token = format!("workspace/diagnostic-{}-{}", server.server_id(), requests);
progress_rx.try_recv().ok();
let timer =
LanguageServer::default_request_timer(cx.background_executor().clone()).fuse();
let progress = pin!(progress_rx.recv().fuse());
let response_result = server
.request::<lsp::WorkspaceDiagnosticRequest>(lsp::WorkspaceDiagnosticParams {
previous_result_ids,
identifier: identifier.clone(),
work_done_progress_params: Default::default(),
partial_result_params: Default::default(),
})
.request_with_timer::<lsp::WorkspaceDiagnosticRequest, _>(
lsp::WorkspaceDiagnosticParams {
previous_result_ids,
identifier: identifier.clone(),
work_done_progress_params: Default::default(),
partial_result_params: lsp::PartialResultParams {
partial_result_token: Some(lsp::ProgressToken::String(token)),
},
},
select(timer, progress).then(|either| match either {
Either::Left((message, ..)) => ready(message).left_future(),
Either::Right(..) => pending::<String>().right_future(),
}),
)
.await;
// https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#diagnostic_refresh
// > If a server closes a workspace diagnostic pull request the client should re-trigger the request.
match response_result {
@ -11464,72 +11583,11 @@ fn lsp_workspace_diagnostics_refresh(
attempts = 0;
if lsp_store
.update(cx, |lsp_store, cx| {
let workspace_diagnostics =
GetDocumentDiagnostics::deserialize_workspace_diagnostics_report(pulled_diagnostics, server.server_id());
for workspace_diagnostics in workspace_diagnostics {
let LspPullDiagnostics::Response {
server_id,
uri,
diagnostics,
} = workspace_diagnostics.diagnostics
else {
continue;
};
let adapter = lsp_store.language_server_adapter_for_id(server_id);
let disk_based_sources = adapter
.as_ref()
.map(|adapter| adapter.disk_based_diagnostic_sources.as_slice())
.unwrap_or(&[]);
match diagnostics {
PulledDiagnostics::Unchanged { result_id } => {
lsp_store
.merge_diagnostics(
server_id,
lsp::PublishDiagnosticsParams {
uri: uri.clone(),
diagnostics: Vec::new(),
version: None,
},
Some(result_id),
DiagnosticSourceKind::Pulled,
disk_based_sources,
|_, _, _| true,
cx,
)
.log_err();
}
PulledDiagnostics::Changed {
diagnostics,
result_id,
} => {
lsp_store
.merge_diagnostics(
server_id,
lsp::PublishDiagnosticsParams {
uri: uri.clone(),
diagnostics,
version: workspace_diagnostics.version,
},
result_id,
DiagnosticSourceKind::Pulled,
disk_based_sources,
|buffer, old_diagnostic, cx| match old_diagnostic.source_kind {
DiagnosticSourceKind::Pulled => {
let buffer_url = File::from_dyn(buffer.file()).map(|f| f.abs_path(cx))
.and_then(|abs_path| file_path_to_lsp_url(&abs_path).ok());
buffer_url.is_none_or(|buffer_url| buffer_url != uri)
},
DiagnosticSourceKind::Other
| DiagnosticSourceKind::Pushed => true,
},
cx,
)
.log_err();
}
}
}
lsp_store.apply_workspace_diagnostic_report(
server.server_id(),
pulled_diagnostics,
cx,
)
})
.is_err()
{
@ -11542,7 +11600,11 @@ fn lsp_workspace_diagnostics_refresh(
}
});
Some((tx, workspace_query_language_server))
Some(WorkspaceRefreshTask {
refresh_tx,
progress_tx,
task: workspace_query_language_server,
})
}
fn resolve_word_completion(snapshot: &BufferSnapshot, completion: &mut Completion) {
@ -11912,6 +11974,13 @@ impl LanguageServerLogType {
}
}
pub struct WorkspaceRefreshTask {
refresh_tx: mpsc::Sender<()>,
progress_tx: mpsc::Sender<()>,
#[allow(dead_code)]
task: Task<()>,
}
pub enum LanguageServerState {
Starting {
startup: Task<Option<Arc<LanguageServer>>>,
@ -11923,7 +11992,7 @@ pub enum LanguageServerState {
adapter: Arc<CachedLspAdapter>,
server: Arc<LanguageServer>,
simulate_disk_based_diagnostics_completion: Option<Task<()>>,
workspace_refresh_task: Option<(mpsc::Sender<()>, Task<()>)>,
workspace_refresh_task: Option<WorkspaceRefreshTask>,
},
}