Restructure fake language server to setup request handlers in advance
Co-Authored-By: Antonio Scandurra <me@as-cii.com> Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
parent
680d1fedc2
commit
01664d494c
4 changed files with 314 additions and 282 deletions
|
@ -56,6 +56,18 @@ struct Request<'a, T> {
|
|||
params: T,
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
#[derive(Deserialize)]
|
||||
struct AnyRequest<'a> {
|
||||
id: usize,
|
||||
#[serde(borrow)]
|
||||
jsonrpc: &'a str,
|
||||
#[serde(borrow)]
|
||||
method: &'a str,
|
||||
#[serde(borrow)]
|
||||
params: &'a RawValue,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AnyResponse<'a> {
|
||||
id: usize,
|
||||
|
@ -469,19 +481,19 @@ impl Drop for Subscription {
|
|||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub struct FakeLanguageServer {
|
||||
buffer: Vec<u8>,
|
||||
stdin: smol::io::BufReader<async_pipe::PipeReader>,
|
||||
stdout: smol::io::BufWriter<async_pipe::PipeWriter>,
|
||||
executor: std::rc::Rc<executor::Foreground>,
|
||||
handlers: Arc<
|
||||
Mutex<
|
||||
HashMap<
|
||||
&'static str,
|
||||
Box<dyn Send + FnOnce(usize, &[u8]) -> (Vec<u8>, barrier::Sender)>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
outgoing_tx: channel::Sender<Vec<u8>>,
|
||||
incoming_rx: channel::Receiver<Vec<u8>>,
|
||||
pub started: Arc<std::sync::atomic::AtomicBool>,
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub struct RequestId<T> {
|
||||
id: usize,
|
||||
_type: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
impl LanguageServer {
|
||||
pub async fn fake(cx: &gpui::TestAppContext) -> (Arc<Self>, FakeLanguageServer) {
|
||||
|
@ -492,28 +504,18 @@ impl LanguageServer {
|
|||
capabilities: ServerCapabilities,
|
||||
cx: &gpui::TestAppContext,
|
||||
) -> (Arc<Self>, FakeLanguageServer) {
|
||||
let stdin = async_pipe::pipe();
|
||||
let stdout = async_pipe::pipe();
|
||||
let mut fake = FakeLanguageServer {
|
||||
stdin: smol::io::BufReader::new(stdin.1),
|
||||
stdout: smol::io::BufWriter::new(stdout.0),
|
||||
buffer: Vec::new(),
|
||||
executor: cx.foreground(),
|
||||
started: Arc::new(std::sync::atomic::AtomicBool::new(true)),
|
||||
};
|
||||
let (stdin_writer, stdin_reader) = async_pipe::pipe();
|
||||
let (stdout_writer, stdout_reader) = async_pipe::pipe();
|
||||
|
||||
let mut fake = FakeLanguageServer::new(cx, stdin_reader, stdout_writer);
|
||||
fake.handle_request::<request::Initialize, _>(move |_| InitializeResult {
|
||||
capabilities,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let server =
|
||||
Self::new_internal(stdin.0, stdout.1, Path::new("/"), cx.background()).unwrap();
|
||||
|
||||
let (init_id, _) = fake.receive_request::<request::Initialize>().await;
|
||||
fake.respond(
|
||||
init_id,
|
||||
InitializeResult {
|
||||
capabilities,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), cx.background())
|
||||
.unwrap();
|
||||
fake.receive_notification::<notification::Initialized>()
|
||||
.await;
|
||||
|
||||
|
@ -523,6 +525,75 @@ impl LanguageServer {
|
|||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
impl FakeLanguageServer {
|
||||
fn new(
|
||||
cx: &gpui::TestAppContext,
|
||||
stdin: async_pipe::PipeReader,
|
||||
stdout: async_pipe::PipeWriter,
|
||||
) -> Self {
|
||||
use futures::StreamExt as _;
|
||||
|
||||
let (incoming_tx, incoming_rx) = channel::unbounded();
|
||||
let (outgoing_tx, mut outgoing_rx) = channel::unbounded();
|
||||
let this = Self {
|
||||
outgoing_tx: outgoing_tx.clone(),
|
||||
incoming_rx,
|
||||
handlers: Default::default(),
|
||||
started: Arc::new(std::sync::atomic::AtomicBool::new(true)),
|
||||
};
|
||||
|
||||
// Receive incoming messages
|
||||
let handlers = this.handlers.clone();
|
||||
cx.background()
|
||||
.spawn(async move {
|
||||
let mut buffer = Vec::new();
|
||||
let mut stdin = smol::io::BufReader::new(stdin);
|
||||
while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
|
||||
if let Ok(request) = serde_json::from_slice::<AnyRequest>(&mut buffer) {
|
||||
assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
|
||||
|
||||
let handler = handlers.lock().remove(request.method);
|
||||
if let Some(handler) = handler {
|
||||
let (response, sent) =
|
||||
handler(request.id, request.params.get().as_bytes());
|
||||
log::debug!("handled lsp request. method:{}", request.method);
|
||||
outgoing_tx.send(response).await.unwrap();
|
||||
drop(sent);
|
||||
} else {
|
||||
log::debug!("unhandled lsp request. method:{}", request.method);
|
||||
outgoing_tx
|
||||
.send(
|
||||
serde_json::to_vec(&AnyResponse {
|
||||
id: request.id,
|
||||
error: Some(Error {
|
||||
message: "no handler".to_string(),
|
||||
}),
|
||||
result: None,
|
||||
})
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
} else {
|
||||
incoming_tx.send(buffer.clone()).await.unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
// Send outgoing messages
|
||||
cx.background()
|
||||
.spawn(async move {
|
||||
let mut stdout = smol::io::BufWriter::new(stdout);
|
||||
while let Some(notification) = outgoing_rx.next().await {
|
||||
Self::send(&mut stdout, ¬ification).await;
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
|
||||
if !self.started.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
panic!("can't simulate an LSP notification before the server has been started");
|
||||
|
@ -533,54 +604,53 @@ impl FakeLanguageServer {
|
|||
params,
|
||||
})
|
||||
.unwrap();
|
||||
self.send(message).await;
|
||||
self.outgoing_tx.send(message).await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn respond<'a, T: request::Request>(
|
||||
&mut self,
|
||||
request_id: RequestId<T>,
|
||||
result: T::Result,
|
||||
) {
|
||||
let result = serde_json::to_string(&result).unwrap();
|
||||
let message = serde_json::to_vec(&AnyResponse {
|
||||
id: request_id.id,
|
||||
error: None,
|
||||
result: Some(&RawValue::from_string(result).unwrap()),
|
||||
})
|
||||
.unwrap();
|
||||
self.send(message).await;
|
||||
}
|
||||
pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
|
||||
use futures::StreamExt as _;
|
||||
|
||||
pub async fn receive_request<T: request::Request>(&mut self) -> (RequestId<T>, T::Params) {
|
||||
let executor = self.executor.clone();
|
||||
executor.start_waiting();
|
||||
loop {
|
||||
self.receive().await;
|
||||
if let Ok(request) = serde_json::from_slice::<Request<T::Params>>(&self.buffer) {
|
||||
assert_eq!(request.method, T::METHOD);
|
||||
assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
|
||||
executor.finish_waiting();
|
||||
return (
|
||||
RequestId {
|
||||
id: request.id,
|
||||
_type: std::marker::PhantomData,
|
||||
},
|
||||
request.params,
|
||||
);
|
||||
let bytes = self.incoming_rx.next().await.unwrap();
|
||||
if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
|
||||
assert_eq!(notification.method, T::METHOD);
|
||||
return notification.params;
|
||||
} else {
|
||||
log::info!(
|
||||
"skipping message in fake language server {:?}",
|
||||
std::str::from_utf8(&self.buffer)
|
||||
std::str::from_utf8(&bytes)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
|
||||
self.receive().await;
|
||||
let notification = serde_json::from_slice::<Notification<T::Params>>(&self.buffer).unwrap();
|
||||
assert_eq!(notification.method, T::METHOD);
|
||||
notification.params
|
||||
pub fn handle_request<T, F>(&mut self, handler: F) -> barrier::Receiver
|
||||
where
|
||||
T: 'static + request::Request,
|
||||
F: 'static + Send + FnOnce(T::Params) -> T::Result,
|
||||
{
|
||||
let (responded_tx, responded_rx) = barrier::channel();
|
||||
let prev_handler = self.handlers.lock().insert(
|
||||
T::METHOD,
|
||||
Box::new(|id, params| {
|
||||
let result = handler(serde_json::from_slice::<T::Params>(params).unwrap());
|
||||
let result = serde_json::to_string(&result).unwrap();
|
||||
let result = serde_json::from_str::<&RawValue>(&result).unwrap();
|
||||
let response = AnyResponse {
|
||||
id,
|
||||
error: None,
|
||||
result: Some(result),
|
||||
};
|
||||
(serde_json::to_vec(&response).unwrap(), responded_tx)
|
||||
}),
|
||||
);
|
||||
if prev_handler.is_some() {
|
||||
panic!(
|
||||
"registered a new handler for LSP method '{}' before the previous handler was called",
|
||||
T::METHOD
|
||||
);
|
||||
}
|
||||
responded_rx
|
||||
}
|
||||
|
||||
pub async fn start_progress(&mut self, token: impl Into<String>) {
|
||||
|
@ -599,39 +669,37 @@ impl FakeLanguageServer {
|
|||
.await;
|
||||
}
|
||||
|
||||
async fn send(&mut self, message: Vec<u8>) {
|
||||
self.stdout
|
||||
async fn send(stdout: &mut smol::io::BufWriter<async_pipe::PipeWriter>, message: &[u8]) {
|
||||
stdout
|
||||
.write_all(CONTENT_LEN_HEADER.as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
self.stdout
|
||||
stdout
|
||||
.write_all((format!("{}", message.len())).as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
self.stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
|
||||
self.stdout.write_all(&message).await.unwrap();
|
||||
self.stdout.flush().await.unwrap();
|
||||
stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
|
||||
stdout.write_all(&message).await.unwrap();
|
||||
stdout.flush().await.unwrap();
|
||||
}
|
||||
|
||||
async fn receive(&mut self) {
|
||||
self.buffer.clear();
|
||||
self.stdin
|
||||
.read_until(b'\n', &mut self.buffer)
|
||||
.await
|
||||
.unwrap();
|
||||
self.stdin
|
||||
.read_until(b'\n', &mut self.buffer)
|
||||
.await
|
||||
.unwrap();
|
||||
let message_len: usize = std::str::from_utf8(&self.buffer)
|
||||
async fn receive(
|
||||
stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<()> {
|
||||
buffer.clear();
|
||||
stdin.read_until(b'\n', buffer).await?;
|
||||
stdin.read_until(b'\n', buffer).await?;
|
||||
let message_len: usize = std::str::from_utf8(buffer)
|
||||
.unwrap()
|
||||
.strip_prefix(CONTENT_LEN_HEADER)
|
||||
.unwrap()
|
||||
.trim_end()
|
||||
.parse()
|
||||
.unwrap();
|
||||
self.buffer.resize(message_len, 0);
|
||||
self.stdin.read_exact(&mut self.buffer).await.unwrap();
|
||||
buffer.resize(message_len, 0);
|
||||
stdin.read_exact(buffer).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -757,9 +825,9 @@ mod tests {
|
|||
"file://b/c"
|
||||
);
|
||||
|
||||
fake.handle_request::<request::Shutdown, _>(|_| ());
|
||||
|
||||
drop(server);
|
||||
let (shutdown_request, _) = fake.receive_request::<request::Shutdown>().await;
|
||||
fake.respond(shutdown_request, ()).await;
|
||||
fake.receive_notification::<notification::Exit>().await;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue