Implement the rest of the worktree pulls (#32269)
Follow-up of https://github.com/zed-industries/zed/pull/19230 Implements the workspace diagnostics pulling, and replaces "pull diagnostics every open editors' buffer" strategy with "pull changed buffer's diagnostics" + "schedule workspace diagnostics pull" for the rest of the diagnostics. This means that if the server does not support the workspace diagnostics and does not return more in linked files, only the currently edited buffer has its diagnostics updated. This is better than the existing implementation that causes a lot of diagnostics pulls to be done instead, and we can add more heuristics on top later for querying more diagnostics. Release Notes: - N/A
This commit is contained in:
parent
9e5f89dc26
commit
77ead25f8c
8 changed files with 429 additions and 85 deletions
|
@ -4,8 +4,8 @@ pub mod rust_analyzer_ext;
|
|||
|
||||
use crate::{
|
||||
CodeAction, Completion, CompletionResponse, CompletionSource, CoreCompletion, Hover, InlayHint,
|
||||
LspAction, LspPullDiagnostics, ProjectItem, ProjectPath, ProjectTransaction, ResolveState,
|
||||
Symbol, ToolchainStore,
|
||||
LspAction, LspPullDiagnostics, ProjectItem, ProjectPath, ProjectTransaction, PulledDiagnostics,
|
||||
ResolveState, Symbol, ToolchainStore,
|
||||
buffer_store::{BufferStore, BufferStoreEvent},
|
||||
environment::ProjectEnvironment,
|
||||
lsp_command::{self, *},
|
||||
|
@ -61,7 +61,7 @@ use lsp::{
|
|||
};
|
||||
use node_runtime::read_package_installed_version;
|
||||
use parking_lot::Mutex;
|
||||
use postage::watch;
|
||||
use postage::{mpsc, sink::Sink, stream::Stream, watch};
|
||||
use rand::prelude::*;
|
||||
|
||||
use rpc::{
|
||||
|
@ -90,7 +90,7 @@ use std::{
|
|||
use text::{Anchor, BufferId, LineEnding, OffsetRangeExt};
|
||||
use url::Url;
|
||||
use util::{
|
||||
ResultExt as _, debug_panic, defer, maybe, merge_json_value_into,
|
||||
ConnectionResult, ResultExt as _, debug_panic, defer, maybe, merge_json_value_into,
|
||||
paths::{PathExt, SanitizedPath},
|
||||
post_inc,
|
||||
};
|
||||
|
@ -166,6 +166,7 @@ pub struct LocalLspStore {
|
|||
_subscription: gpui::Subscription,
|
||||
lsp_tree: Entity<LanguageServerTree>,
|
||||
registered_buffers: HashMap<BufferId, usize>,
|
||||
buffer_pull_diagnostics_result_ids: HashMap<BufferId, Option<String>>,
|
||||
}
|
||||
|
||||
impl LocalLspStore {
|
||||
|
@ -871,13 +872,17 @@ impl LocalLspStore {
|
|||
let this = this.clone();
|
||||
let mut cx = cx.clone();
|
||||
async move {
|
||||
this.update(&mut cx, |this, cx| {
|
||||
cx.emit(LspStoreEvent::PullWorkspaceDiagnostics);
|
||||
this.downstream_client.as_ref().map(|(client, project_id)| {
|
||||
client.send(proto::PullWorkspaceDiagnostics {
|
||||
project_id: *project_id,
|
||||
this.update(&mut cx, |lsp_store, _| {
|
||||
lsp_store.pull_workspace_diagnostics(server_id);
|
||||
lsp_store
|
||||
.downstream_client
|
||||
.as_ref()
|
||||
.map(|(client, project_id)| {
|
||||
client.send(proto::PullWorkspaceDiagnostics {
|
||||
project_id: *project_id,
|
||||
server_id: server_id.to_proto(),
|
||||
})
|
||||
})
|
||||
})
|
||||
})?
|
||||
.transpose()?;
|
||||
Ok(())
|
||||
|
@ -2290,9 +2295,11 @@ impl LocalLspStore {
|
|||
|
||||
let set = DiagnosticSet::new(sanitized_diagnostics, &snapshot);
|
||||
buffer.update(cx, |buffer, cx| {
|
||||
buffer.set_result_id(result_id);
|
||||
self.buffer_pull_diagnostics_result_ids
|
||||
.insert(buffer.remote_id(), result_id);
|
||||
buffer.update_diagnostics(server_id, set, cx)
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -3497,7 +3504,6 @@ pub enum LspStoreEvent {
|
|||
edits: Vec<(lsp::Range, Snippet)>,
|
||||
most_recent_edit: clock::Lamport,
|
||||
},
|
||||
PullWorkspaceDiagnostics,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
|
@ -3680,7 +3686,8 @@ impl LspStore {
|
|||
this.as_local_mut().unwrap().shutdown_language_servers(cx)
|
||||
}),
|
||||
lsp_tree: LanguageServerTree::new(manifest_tree, languages.clone(), cx),
|
||||
registered_buffers: Default::default(),
|
||||
registered_buffers: HashMap::default(),
|
||||
buffer_pull_diagnostics_result_ids: HashMap::default(),
|
||||
}),
|
||||
last_formatting_failure: None,
|
||||
downstream_client: None,
|
||||
|
@ -3784,6 +3791,11 @@ impl LspStore {
|
|||
}
|
||||
}
|
||||
}
|
||||
BufferStoreEvent::BufferDropped(buffer_id) => {
|
||||
if let Some(local) = self.as_local_mut() {
|
||||
local.buffer_pull_diagnostics_result_ids.remove(buffer_id);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
@ -5733,6 +5745,7 @@ impl LspStore {
|
|||
) -> Task<Result<Vec<LspPullDiagnostics>>> {
|
||||
let buffer = buffer_handle.read(cx);
|
||||
let buffer_id = buffer.remote_id();
|
||||
let result_id = self.result_id(buffer_id);
|
||||
|
||||
if let Some((client, upstream_project_id)) = self.upstream_client() {
|
||||
let request_task = client.request(proto::MultiLspQuery {
|
||||
|
@ -5743,7 +5756,10 @@ impl LspStore {
|
|||
proto::AllLanguageServers {},
|
||||
)),
|
||||
request: Some(proto::multi_lsp_query::Request::GetDocumentDiagnostics(
|
||||
GetDocumentDiagnostics {}.to_proto(upstream_project_id, buffer_handle.read(cx)),
|
||||
GetDocumentDiagnostics {
|
||||
previous_result_id: result_id.clone(),
|
||||
}
|
||||
.to_proto(upstream_project_id, buffer_handle.read(cx)),
|
||||
)),
|
||||
});
|
||||
let buffer = buffer_handle.clone();
|
||||
|
@ -5765,7 +5781,10 @@ impl LspStore {
|
|||
}
|
||||
})
|
||||
.map(|diagnostics_response| {
|
||||
GetDocumentDiagnostics {}.response_from_proto(
|
||||
GetDocumentDiagnostics {
|
||||
previous_result_id: result_id.clone(),
|
||||
}
|
||||
.response_from_proto(
|
||||
diagnostics_response,
|
||||
project.clone(),
|
||||
buffer.clone(),
|
||||
|
@ -5786,7 +5805,9 @@ impl LspStore {
|
|||
let all_actions_task = self.request_multiple_lsp_locally(
|
||||
&buffer_handle,
|
||||
None::<PointUtf16>,
|
||||
GetDocumentDiagnostics {},
|
||||
GetDocumentDiagnostics {
|
||||
previous_result_id: result_id,
|
||||
},
|
||||
cx,
|
||||
);
|
||||
cx.spawn(async move |_, _| Ok(all_actions_task.await.into_iter().flatten().collect()))
|
||||
|
@ -6323,6 +6344,7 @@ impl LspStore {
|
|||
},
|
||||
)
|
||||
.ok();
|
||||
self.pull_workspace_diagnostics(language_server.server_id());
|
||||
}
|
||||
|
||||
None
|
||||
|
@ -8172,12 +8194,13 @@ impl LspStore {
|
|||
}
|
||||
|
||||
async fn handle_pull_workspace_diagnostics(
|
||||
this: Entity<Self>,
|
||||
_: TypedEnvelope<proto::PullWorkspaceDiagnostics>,
|
||||
lsp_store: Entity<Self>,
|
||||
envelope: TypedEnvelope<proto::PullWorkspaceDiagnostics>,
|
||||
mut cx: AsyncApp,
|
||||
) -> Result<proto::Ack> {
|
||||
this.update(&mut cx, |_, cx| {
|
||||
cx.emit(LspStoreEvent::PullWorkspaceDiagnostics);
|
||||
let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
|
||||
lsp_store.update(&mut cx, |lsp_store, _| {
|
||||
lsp_store.pull_workspace_diagnostics(server_id);
|
||||
})?;
|
||||
Ok(proto::Ack {})
|
||||
}
|
||||
|
@ -9097,14 +9120,19 @@ impl LspStore {
|
|||
// Update language_servers collection with Running variant of LanguageServerState
|
||||
// indicating that the server is up and running and ready
|
||||
let workspace_folders = workspace_folders.lock().clone();
|
||||
language_server.set_workspace_folders(workspace_folders);
|
||||
|
||||
local.language_servers.insert(
|
||||
server_id,
|
||||
LanguageServerState::running(
|
||||
workspace_folders,
|
||||
adapter.clone(),
|
||||
language_server.clone(),
|
||||
None,
|
||||
),
|
||||
LanguageServerState::Running {
|
||||
workspace_refresh_task: lsp_workspace_diagnostics_refresh(
|
||||
language_server.clone(),
|
||||
cx,
|
||||
),
|
||||
adapter: adapter.clone(),
|
||||
server: language_server.clone(),
|
||||
simulate_disk_based_diagnostics_completion: None,
|
||||
},
|
||||
);
|
||||
if let Some(file_ops_caps) = language_server
|
||||
.capabilities()
|
||||
|
@ -9675,6 +9703,229 @@ impl LspStore {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn result_id(&self, buffer_id: BufferId) -> Option<String> {
|
||||
self.as_local()?
|
||||
.buffer_pull_diagnostics_result_ids
|
||||
.get(&buffer_id)
|
||||
.cloned()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
pub fn all_result_ids(&self) -> HashMap<BufferId, String> {
|
||||
let Some(local) = self.as_local() else {
|
||||
return HashMap::default();
|
||||
};
|
||||
local
|
||||
.buffer_pull_diagnostics_result_ids
|
||||
.iter()
|
||||
.filter_map(|(buffer_id, result_id)| Some((*buffer_id, result_id.clone()?)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn pull_workspace_diagnostics(&mut self, server_id: LanguageServerId) {
|
||||
if let Some(LanguageServerState::Running {
|
||||
workspace_refresh_task: Some((tx, _)),
|
||||
..
|
||||
}) = self
|
||||
.as_local_mut()
|
||||
.and_then(|local| local.language_servers.get_mut(&server_id))
|
||||
{
|
||||
tx.try_send(()).ok();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pull_workspace_diagnostics_for_buffer(&mut self, buffer_id: BufferId, cx: &mut App) {
|
||||
let Some(buffer) = self.buffer_store().read(cx).get_existing(buffer_id).ok() else {
|
||||
return;
|
||||
};
|
||||
let Some(local) = self.as_local_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
for server_id in buffer.update(cx, |buffer, cx| {
|
||||
local.language_server_ids_for_buffer(buffer, cx)
|
||||
}) {
|
||||
if let Some(LanguageServerState::Running {
|
||||
workspace_refresh_task: Some((tx, _)),
|
||||
..
|
||||
}) = local.language_servers.get_mut(&server_id)
|
||||
{
|
||||
tx.try_send(()).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn lsp_workspace_diagnostics_refresh(
|
||||
server: Arc<LanguageServer>,
|
||||
cx: &mut Context<'_, LspStore>,
|
||||
) -> Option<(mpsc::Sender<()>, Task<()>)> {
|
||||
let identifier = match server.capabilities().diagnostic_provider? {
|
||||
lsp::DiagnosticServerCapabilities::Options(diagnostic_options) => {
|
||||
if !diagnostic_options.workspace_diagnostics {
|
||||
return None;
|
||||
}
|
||||
diagnostic_options.identifier
|
||||
}
|
||||
lsp::DiagnosticServerCapabilities::RegistrationOptions(registration_options) => {
|
||||
let diagnostic_options = registration_options.diagnostic_options;
|
||||
if !diagnostic_options.workspace_diagnostics {
|
||||
return None;
|
||||
}
|
||||
diagnostic_options.identifier
|
||||
}
|
||||
};
|
||||
|
||||
let (mut tx, mut rx) = mpsc::channel(1);
|
||||
tx.try_send(()).ok();
|
||||
|
||||
let workspace_query_language_server = cx.spawn(async move |lsp_store, cx| {
|
||||
let mut attempts = 0;
|
||||
let max_attempts = 50;
|
||||
|
||||
loop {
|
||||
let Some(()) = rx.recv().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
'request: loop {
|
||||
if attempts > max_attempts {
|
||||
log::error!(
|
||||
"Failed to pull workspace diagnostics {max_attempts} times, aborting"
|
||||
);
|
||||
return;
|
||||
}
|
||||
let backoff_millis = (50 * (1 << attempts)).clamp(30, 1000);
|
||||
cx.background_executor()
|
||||
.timer(Duration::from_millis(backoff_millis))
|
||||
.await;
|
||||
attempts += 1;
|
||||
|
||||
let Ok(previous_result_ids) = lsp_store.update(cx, |lsp_store, cx| {
|
||||
lsp_store
|
||||
.all_result_ids()
|
||||
.into_iter()
|
||||
.filter_map(|(buffer_id, result_id)| {
|
||||
let buffer = lsp_store
|
||||
.buffer_store()
|
||||
.read(cx)
|
||||
.get_existing(buffer_id)
|
||||
.ok()?;
|
||||
let abs_path = buffer.read(cx).file()?.as_local()?.abs_path(cx);
|
||||
let uri = file_path_to_lsp_url(&abs_path).ok()?;
|
||||
Some(lsp::PreviousResultId {
|
||||
uri,
|
||||
value: result_id,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let response_result = server
|
||||
.request::<lsp::WorkspaceDiagnosticRequest>(lsp::WorkspaceDiagnosticParams {
|
||||
previous_result_ids,
|
||||
identifier: identifier.clone(),
|
||||
work_done_progress_params: Default::default(),
|
||||
partial_result_params: Default::default(),
|
||||
})
|
||||
.await;
|
||||
// https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#diagnostic_refresh
|
||||
// > If a server closes a workspace diagnostic pull request the client should re-trigger the request.
|
||||
match response_result {
|
||||
ConnectionResult::Timeout => {
|
||||
log::error!("Timeout during workspace diagnostics pull");
|
||||
continue 'request;
|
||||
}
|
||||
ConnectionResult::ConnectionReset => {
|
||||
log::error!("Server closed a workspace diagnostics pull request");
|
||||
continue 'request;
|
||||
}
|
||||
ConnectionResult::Result(Err(e)) => {
|
||||
log::error!("Error during workspace diagnostics pull: {e:#}");
|
||||
break 'request;
|
||||
}
|
||||
ConnectionResult::Result(Ok(pulled_diagnostics)) => {
|
||||
attempts = 0;
|
||||
if lsp_store
|
||||
.update(cx, |lsp_store, cx| {
|
||||
let workspace_diagnostics =
|
||||
GetDocumentDiagnostics::deserialize_workspace_diagnostics_report(pulled_diagnostics, server.server_id());
|
||||
for workspace_diagnostics in workspace_diagnostics {
|
||||
let LspPullDiagnostics::Response {
|
||||
server_id,
|
||||
uri,
|
||||
diagnostics,
|
||||
} = workspace_diagnostics.diagnostics
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let adapter = lsp_store.language_server_adapter_for_id(server_id);
|
||||
let disk_based_sources = adapter
|
||||
.as_ref()
|
||||
.map(|adapter| adapter.disk_based_diagnostic_sources.as_slice())
|
||||
.unwrap_or(&[]);
|
||||
|
||||
match diagnostics {
|
||||
PulledDiagnostics::Unchanged { result_id } => {
|
||||
lsp_store
|
||||
.merge_diagnostics(
|
||||
server_id,
|
||||
lsp::PublishDiagnosticsParams {
|
||||
uri: uri.clone(),
|
||||
diagnostics: Vec::new(),
|
||||
version: None,
|
||||
},
|
||||
Some(result_id),
|
||||
DiagnosticSourceKind::Pulled,
|
||||
disk_based_sources,
|
||||
|_, _| true,
|
||||
cx,
|
||||
)
|
||||
.log_err();
|
||||
}
|
||||
PulledDiagnostics::Changed {
|
||||
diagnostics,
|
||||
result_id,
|
||||
} => {
|
||||
lsp_store
|
||||
.merge_diagnostics(
|
||||
server_id,
|
||||
lsp::PublishDiagnosticsParams {
|
||||
uri: uri.clone(),
|
||||
diagnostics,
|
||||
version: workspace_diagnostics.version,
|
||||
},
|
||||
result_id,
|
||||
DiagnosticSourceKind::Pulled,
|
||||
disk_based_sources,
|
||||
|old_diagnostic, _| match old_diagnostic.source_kind {
|
||||
DiagnosticSourceKind::Pulled => false,
|
||||
DiagnosticSourceKind::Other
|
||||
| DiagnosticSourceKind::Pushed => true,
|
||||
},
|
||||
cx,
|
||||
)
|
||||
.log_err();
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
break 'request;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Some((tx, workspace_query_language_server))
|
||||
}
|
||||
|
||||
fn resolve_word_completion(snapshot: &BufferSnapshot, completion: &mut Completion) {
|
||||
|
@ -10055,6 +10306,7 @@ pub enum LanguageServerState {
|
|||
adapter: Arc<CachedLspAdapter>,
|
||||
server: Arc<LanguageServer>,
|
||||
simulate_disk_based_diagnostics_completion: Option<Task<()>>,
|
||||
workspace_refresh_task: Option<(mpsc::Sender<()>, Task<()>)>,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -10083,19 +10335,6 @@ impl LanguageServerState {
|
|||
LanguageServerState::Running { server, .. } => server.remove_workspace_folder(uri),
|
||||
}
|
||||
}
|
||||
fn running(
|
||||
workspace_folders: BTreeSet<Url>,
|
||||
adapter: Arc<CachedLspAdapter>,
|
||||
server: Arc<LanguageServer>,
|
||||
simulate_disk_based_diagnostics_completion: Option<Task<()>>,
|
||||
) -> Self {
|
||||
server.set_workspace_folders(workspace_folders);
|
||||
Self::Running {
|
||||
adapter,
|
||||
server,
|
||||
simulate_disk_based_diagnostics_completion,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for LanguageServerState {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue