Stop waiting for part of LSP responses on remote Collab clients' part (#36557)

Instead of holding a connection for potentially long LSP queries (e.g.
rust-analyzer might take minutes to look up a definition), disconnect
right after sending the initial request and handle the follow-up
responses later.

As a bonus, this allows to cancel previously sent request on the local
Collab clients' side due to this, as instead of holding and serving the
old connection, local clients now can stop previous requests, if needed.

Current PR does not convert all LSP requests to the new paradigm, but
the problematic ones, deprecating `MultiLspQuery` and moving all its
requests to the new paradigm.

Release Notes:

- Improved resource usage when querying LSP over Collab

---------

Co-authored-by: David Kleingeld <git@davidsk.dev>
Co-authored-by: Mikayla Maki <mikayla@zed.dev>
Co-authored-by: David Kleingeld <davidsk@zed.dev>
This commit is contained in:
Kirill Bulatov 2025-08-21 09:24:34 +03:00 committed by GitHub
parent c731bb6d91
commit 5dcb90858e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1395 additions and 681 deletions

View file

@ -1,35 +1,48 @@
use anyhow::Context;
use anyhow::{Context, Result};
use collections::HashMap;
use futures::{
Future, FutureExt as _,
channel::oneshot,
future::{BoxFuture, LocalBoxFuture},
};
use gpui::{AnyEntity, AnyWeakEntity, AsyncApp, Entity};
use gpui::{AnyEntity, AnyWeakEntity, AsyncApp, BackgroundExecutor, Entity, FutureExt as _};
use parking_lot::Mutex;
use proto::{
AnyTypedEnvelope, EntityMessage, Envelope, EnvelopedMessage, RequestMessage, TypedEnvelope,
error::ErrorExt as _,
AnyTypedEnvelope, EntityMessage, Envelope, EnvelopedMessage, LspRequestId, LspRequestMessage,
RequestMessage, TypedEnvelope, error::ErrorExt as _,
};
use std::{
any::{Any, TypeId},
sync::{Arc, Weak},
sync::{
Arc, OnceLock,
atomic::{self, AtomicU64},
},
time::Duration,
};
#[derive(Clone)]
pub struct AnyProtoClient(Arc<dyn ProtoClient>);
pub struct AnyProtoClient(Arc<State>);
impl AnyProtoClient {
pub fn downgrade(&self) -> AnyWeakProtoClient {
AnyWeakProtoClient(Arc::downgrade(&self.0))
}
}
type RequestIds = Arc<
Mutex<
HashMap<
LspRequestId,
oneshot::Sender<
Result<
Option<TypedEnvelope<Vec<proto::ProtoLspResponse<Box<dyn AnyTypedEnvelope>>>>>,
>,
>,
>,
>,
>;
#[derive(Clone)]
pub struct AnyWeakProtoClient(Weak<dyn ProtoClient>);
static NEXT_LSP_REQUEST_ID: OnceLock<Arc<AtomicU64>> = OnceLock::new();
static REQUEST_IDS: OnceLock<RequestIds> = OnceLock::new();
impl AnyWeakProtoClient {
pub fn upgrade(&self) -> Option<AnyProtoClient> {
self.0.upgrade().map(AnyProtoClient)
}
struct State {
client: Arc<dyn ProtoClient>,
next_lsp_request_id: Arc<AtomicU64>,
request_ids: RequestIds,
}
pub trait ProtoClient: Send + Sync {
@ -37,11 +50,11 @@ pub trait ProtoClient: Send + Sync {
&self,
envelope: Envelope,
request_type: &'static str,
) -> BoxFuture<'static, anyhow::Result<Envelope>>;
) -> BoxFuture<'static, Result<Envelope>>;
fn send(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()>;
fn send(&self, envelope: Envelope, message_type: &'static str) -> Result<()>;
fn send_response(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()>;
fn send_response(&self, envelope: Envelope, message_type: &'static str) -> Result<()>;
fn message_handler_set(&self) -> &parking_lot::Mutex<ProtoMessageHandlerSet>;
@ -65,7 +78,7 @@ pub type ProtoMessageHandler = Arc<
Box<dyn AnyTypedEnvelope>,
AnyProtoClient,
AsyncApp,
) -> LocalBoxFuture<'static, anyhow::Result<()>>,
) -> LocalBoxFuture<'static, Result<()>>,
>;
impl ProtoMessageHandlerSet {
@ -113,7 +126,7 @@ impl ProtoMessageHandlerSet {
message: Box<dyn AnyTypedEnvelope>,
client: AnyProtoClient,
cx: AsyncApp,
) -> Option<LocalBoxFuture<'static, anyhow::Result<()>>> {
) -> Option<LocalBoxFuture<'static, Result<()>>> {
let payload_type_id = message.payload_type_id();
let mut this = this.lock();
let handler = this.message_handlers.get(&payload_type_id)?.clone();
@ -169,43 +182,195 @@ where
T: ProtoClient + 'static,
{
fn from(client: Arc<T>) -> Self {
Self(client)
Self::new(client)
}
}
impl AnyProtoClient {
pub fn new<T: ProtoClient + 'static>(client: Arc<T>) -> Self {
Self(client)
Self(Arc::new(State {
client,
next_lsp_request_id: NEXT_LSP_REQUEST_ID
.get_or_init(|| Arc::new(AtomicU64::new(0)))
.clone(),
request_ids: REQUEST_IDS.get_or_init(RequestIds::default).clone(),
}))
}
pub fn is_via_collab(&self) -> bool {
self.0.is_via_collab()
self.0.client.is_via_collab()
}
pub fn request<T: RequestMessage>(
&self,
request: T,
) -> impl Future<Output = anyhow::Result<T::Response>> + use<T> {
) -> impl Future<Output = Result<T::Response>> + use<T> {
let envelope = request.into_envelope(0, None, None);
let response = self.0.request(envelope, T::NAME);
let response = self.0.client.request(envelope, T::NAME);
async move {
T::Response::from_envelope(response.await?)
.context("received response of the wrong type")
}
}
pub fn send<T: EnvelopedMessage>(&self, request: T) -> anyhow::Result<()> {
pub fn send<T: EnvelopedMessage>(&self, request: T) -> Result<()> {
let envelope = request.into_envelope(0, None, None);
self.0.send(envelope, T::NAME)
self.0.client.send(envelope, T::NAME)
}
pub fn send_response<T: EnvelopedMessage>(
&self,
request_id: u32,
request: T,
) -> anyhow::Result<()> {
pub fn send_response<T: EnvelopedMessage>(&self, request_id: u32, request: T) -> Result<()> {
let envelope = request.into_envelope(0, Some(request_id), None);
self.0.send(envelope, T::NAME)
self.0.client.send(envelope, T::NAME)
}
pub fn request_lsp<T>(
&self,
project_id: u64,
timeout: Duration,
executor: BackgroundExecutor,
request: T,
) -> impl Future<
Output = Result<Option<TypedEnvelope<Vec<proto::ProtoLspResponse<T::Response>>>>>,
> + use<T>
where
T: LspRequestMessage,
{
let new_id = LspRequestId(
self.0
.next_lsp_request_id
.fetch_add(1, atomic::Ordering::Acquire),
);
let (tx, rx) = oneshot::channel();
{
self.0.request_ids.lock().insert(new_id, tx);
}
let query = proto::LspQuery {
project_id,
lsp_request_id: new_id.0,
request: Some(request.clone().to_proto_query()),
};
let request = self.request(query);
let request_ids = self.0.request_ids.clone();
async move {
match request.await {
Ok(_request_enqueued) => {}
Err(e) => {
request_ids.lock().remove(&new_id);
return Err(e).context("sending LSP proto request");
}
}
let response = rx.with_timeout(timeout, &executor).await;
{
request_ids.lock().remove(&new_id);
}
match response {
Ok(Ok(response)) => {
let response = response
.context("waiting for LSP proto response")?
.map(|response| {
anyhow::Ok(TypedEnvelope {
payload: response
.payload
.into_iter()
.map(|lsp_response| lsp_response.into_response::<T>())
.collect::<Result<Vec<_>>>()?,
sender_id: response.sender_id,
original_sender_id: response.original_sender_id,
message_id: response.message_id,
received_at: response.received_at,
})
})
.transpose()
.context("converting LSP proto response")?;
Ok(response)
}
Err(_cancelled_due_timeout) => Ok(None),
Ok(Err(_channel_dropped)) => Ok(None),
}
}
}
pub fn send_lsp_response<T: LspRequestMessage>(
&self,
project_id: u64,
lsp_request_id: LspRequestId,
server_responses: HashMap<u64, T::Response>,
) -> Result<()> {
self.send(proto::LspQueryResponse {
project_id,
lsp_request_id: lsp_request_id.0,
responses: server_responses
.into_iter()
.map(|(server_id, response)| proto::LspResponse {
server_id,
response: Some(T::response_to_proto_query(response)),
})
.collect(),
})
}
pub fn handle_lsp_response(&self, mut envelope: TypedEnvelope<proto::LspQueryResponse>) {
let request_id = LspRequestId(envelope.payload.lsp_request_id);
let mut response_senders = self.0.request_ids.lock();
if let Some(tx) = response_senders.remove(&request_id) {
let responses = envelope.payload.responses.drain(..).collect::<Vec<_>>();
tx.send(Ok(Some(proto::TypedEnvelope {
sender_id: envelope.sender_id,
original_sender_id: envelope.original_sender_id,
message_id: envelope.message_id,
received_at: envelope.received_at,
payload: responses
.into_iter()
.filter_map(|response| {
use proto::lsp_response::Response;
let server_id = response.server_id;
let response = match response.response? {
Response::GetReferencesResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetDocumentColorResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetHoverResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetCodeActionsResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetSignatureHelpResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetCodeLensResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetDocumentDiagnosticsResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetDefinitionResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetDeclarationResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetTypeDefinitionResponse(response) => {
to_any_envelope(&envelope, response)
}
Response::GetImplementationResponse(response) => {
to_any_envelope(&envelope, response)
}
};
Some(proto::ProtoLspResponse {
server_id,
response,
})
})
.collect(),
})))
.ok();
}
}
pub fn add_request_handler<M, E, H, F>(&self, entity: gpui::WeakEntity<E>, handler: H)
@ -213,31 +378,35 @@ impl AnyProtoClient {
M: RequestMessage,
E: 'static,
H: 'static + Sync + Fn(Entity<E>, TypedEnvelope<M>, AsyncApp) -> F + Send + Sync,
F: 'static + Future<Output = anyhow::Result<M::Response>>,
F: 'static + Future<Output = Result<M::Response>>,
{
self.0.message_handler_set().lock().add_message_handler(
TypeId::of::<M>(),
entity.into(),
Arc::new(move |entity, envelope, client, cx| {
let entity = entity.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
let request_id = envelope.message_id();
handler(entity, *envelope, cx)
.then(move |result| async move {
match result {
Ok(response) => {
client.send_response(request_id, response)?;
Ok(())
self.0
.client
.message_handler_set()
.lock()
.add_message_handler(
TypeId::of::<M>(),
entity.into(),
Arc::new(move |entity, envelope, client, cx| {
let entity = entity.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
let request_id = envelope.message_id();
handler(entity, *envelope, cx)
.then(move |result| async move {
match result {
Ok(response) => {
client.send_response(request_id, response)?;
Ok(())
}
Err(error) => {
client.send_response(request_id, error.to_proto())?;
Err(error)
}
}
Err(error) => {
client.send_response(request_id, error.to_proto())?;
Err(error)
}
}
})
.boxed_local()
}),
)
})
.boxed_local()
}),
)
}
pub fn add_entity_request_handler<M, E, H, F>(&self, handler: H)
@ -245,7 +414,7 @@ impl AnyProtoClient {
M: EnvelopedMessage + RequestMessage + EntityMessage,
E: 'static,
H: 'static + Sync + Send + Fn(gpui::Entity<E>, TypedEnvelope<M>, AsyncApp) -> F,
F: 'static + Future<Output = anyhow::Result<M::Response>>,
F: 'static + Future<Output = Result<M::Response>>,
{
let message_type_id = TypeId::of::<M>();
let entity_type_id = TypeId::of::<E>();
@ -257,6 +426,7 @@ impl AnyProtoClient {
.remote_entity_id()
};
self.0
.client
.message_handler_set()
.lock()
.add_entity_message_handler(
@ -290,7 +460,7 @@ impl AnyProtoClient {
M: EnvelopedMessage + EntityMessage,
E: 'static,
H: 'static + Sync + Send + Fn(gpui::Entity<E>, TypedEnvelope<M>, AsyncApp) -> F,
F: 'static + Future<Output = anyhow::Result<()>>,
F: 'static + Future<Output = Result<()>>,
{
let message_type_id = TypeId::of::<M>();
let entity_type_id = TypeId::of::<E>();
@ -302,6 +472,7 @@ impl AnyProtoClient {
.remote_entity_id()
};
self.0
.client
.message_handler_set()
.lock()
.add_entity_message_handler(
@ -319,7 +490,7 @@ impl AnyProtoClient {
pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Entity<E>) {
let id = (TypeId::of::<E>(), remote_id);
let mut message_handlers = self.0.message_handler_set().lock();
let mut message_handlers = self.0.client.message_handler_set().lock();
if message_handlers
.entities_by_type_and_remote_id
.contains_key(&id)
@ -335,3 +506,16 @@ impl AnyProtoClient {
);
}
}
fn to_any_envelope<T: EnvelopedMessage>(
envelope: &TypedEnvelope<proto::LspQueryResponse>,
response: T,
) -> Box<dyn AnyTypedEnvelope> {
Box::new(proto::TypedEnvelope {
sender_id: envelope.sender_id,
original_sender_id: envelope.original_sender_id,
message_id: envelope.message_id,
received_at: envelope.received_at,
payload: response,
}) as Box<_>
}