Allow FakeLanguageServer handlers to handle multiple requests

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Max Brunsfeld 2022-02-16 16:19:27 -08:00
parent c4dff12d69
commit 90f31bb123
3 changed files with 170 additions and 145 deletions

View file

@ -1,5 +1,5 @@
use anyhow::{anyhow, Context, Result};
use futures::{io::BufWriter, AsyncRead, AsyncWrite};
use futures::{channel::mpsc, io::BufWriter, AsyncRead, AsyncWrite};
use gpui::{executor, Task};
use parking_lot::{Mutex, RwLock};
use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
@ -481,16 +481,10 @@ impl Drop for Subscription {
#[cfg(any(test, feature = "test-support"))]
pub struct FakeLanguageServer {
handlers: Arc<
Mutex<
HashMap<
&'static str,
Box<dyn Send + FnOnce(usize, &[u8]) -> (Vec<u8>, barrier::Sender)>,
>,
>,
>,
outgoing_tx: channel::Sender<Vec<u8>>,
incoming_rx: channel::Receiver<Vec<u8>>,
handlers:
Arc<Mutex<HashMap<&'static str, Box<dyn Send + Sync + FnMut(usize, &[u8]) -> Vec<u8>>>>>,
outgoing_tx: mpsc::UnboundedSender<Vec<u8>>,
incoming_rx: mpsc::UnboundedReceiver<Vec<u8>>,
}
#[cfg(any(test, feature = "test-support"))]
@ -508,8 +502,9 @@ impl LanguageServer {
let mut fake = FakeLanguageServer::new(executor.clone(), stdin_reader, stdout_writer);
fake.handle_request::<request::Initialize, _>({
let capabilities = capabilities.clone();
move |_| InitializeResult {
capabilities,
capabilities: capabilities.clone(),
..Default::default()
}
});
@ -530,8 +525,8 @@ impl FakeLanguageServer {
) -> Self {
use futures::StreamExt as _;
let (incoming_tx, incoming_rx) = channel::unbounded();
let (outgoing_tx, mut outgoing_rx) = channel::unbounded();
let (incoming_tx, incoming_rx) = mpsc::unbounded();
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded();
let this = Self {
outgoing_tx: outgoing_tx.clone(),
incoming_rx,
@ -545,36 +540,31 @@ impl FakeLanguageServer {
let mut buffer = Vec::new();
let mut stdin = smol::io::BufReader::new(stdin);
while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
if let Ok(request) = serde_json::from_slice::<AnyRequest>(&mut buffer) {
if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
let handler = handlers.lock().remove(request.method);
if let Some(handler) = handler {
let (response, sent) =
handler(request.id, request.params.get().as_bytes());
if let Some(handler) = handlers.lock().get_mut(request.method) {
let response = handler(request.id, request.params.get().as_bytes());
log::debug!("handled lsp request. method:{}", request.method);
outgoing_tx.send(response).await.unwrap();
drop(sent);
outgoing_tx.unbounded_send(response)?;
} else {
log::debug!("unhandled lsp request. method:{}", request.method);
outgoing_tx
.send(
serde_json::to_vec(&AnyResponse {
id: request.id,
error: Some(Error {
message: "no handler".to_string(),
}),
result: None,
})
.unwrap(),
)
.await
.unwrap();
outgoing_tx.unbounded_send(
serde_json::to_vec(&AnyResponse {
id: request.id,
error: Some(Error {
message: "no handler".to_string(),
}),
result: None,
})
.unwrap(),
)?;
}
} else {
incoming_tx.send(buffer.clone()).await.unwrap();
incoming_tx.unbounded_send(buffer.clone())?;
}
}
Ok::<_, anyhow::Error>(())
})
.detach();
@ -598,7 +588,7 @@ impl FakeLanguageServer {
params,
})
.unwrap();
self.outgoing_tx.send(message).await.unwrap();
self.outgoing_tx.unbounded_send(message).unwrap();
}
pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
@ -618,15 +608,15 @@ impl FakeLanguageServer {
}
}
pub fn handle_request<T, F>(&mut self, handler: F) -> barrier::Receiver
pub fn handle_request<T, F>(&mut self, mut handler: F) -> mpsc::UnboundedReceiver<()>
where
T: 'static + request::Request,
F: 'static + Send + FnOnce(T::Params) -> T::Result,
F: 'static + Send + Sync + FnMut(T::Params) -> T::Result,
{
let (responded_tx, responded_rx) = barrier::channel();
let prev_handler = self.handlers.lock().insert(
let (responded_tx, responded_rx) = mpsc::unbounded();
self.handlers.lock().insert(
T::METHOD,
Box::new(|id, params| {
Box::new(move |id, params| {
let result = handler(serde_json::from_slice::<T::Params>(params).unwrap());
let result = serde_json::to_string(&result).unwrap();
let result = serde_json::from_str::<&RawValue>(&result).unwrap();
@ -635,18 +625,20 @@ impl FakeLanguageServer {
error: None,
result: Some(result),
};
(serde_json::to_vec(&response).unwrap(), responded_tx)
responded_tx.unbounded_send(()).ok();
serde_json::to_vec(&response).unwrap()
}),
);
if prev_handler.is_some() {
panic!(
"registered a new handler for LSP method '{}' before the previous handler was called",
T::METHOD
);
}
responded_rx
}
pub fn remove_request_handler<T>(&mut self)
where
T: 'static + request::Request,
{
self.handlers.lock().remove(T::METHOD);
}
pub async fn start_progress(&mut self, token: impl Into<String>) {
self.notify::<notification::Progress>(ProgressParams {
token: NumberOrString::String(token.into()),