sibling of 93e6b01486
This commit is contained in:
parent
00725273e4
commit
bc075340bb
264 changed files with 5974 additions and 14197 deletions
|
@ -1,6 +1,6 @@
|
|||
use anyhow::{Context as _, Result, anyhow};
|
||||
use collections::HashMap;
|
||||
use futures::{FutureExt, StreamExt, channel::oneshot, future, select};
|
||||
use futures::{FutureExt, StreamExt, channel::oneshot, select};
|
||||
use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
|
||||
use parking_lot::Mutex;
|
||||
use postage::barrier;
|
||||
|
@ -10,19 +10,15 @@ use smol::channel;
|
|||
use std::{
|
||||
fmt,
|
||||
path::PathBuf,
|
||||
pin::pin,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicI32, Ordering::SeqCst},
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use util::{ResultExt, TryFutureExt};
|
||||
use util::TryFutureExt;
|
||||
|
||||
use crate::{
|
||||
transport::{StdioTransport, Transport},
|
||||
types::{CancelledParams, ClientNotification, Notification as _, notifications::Cancelled},
|
||||
};
|
||||
use crate::transport::{StdioTransport, Transport};
|
||||
|
||||
const JSON_RPC_VERSION: &str = "2.0";
|
||||
const REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
@ -36,7 +32,6 @@ pub const INTERNAL_ERROR: i32 = -32603;
|
|||
|
||||
type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
|
||||
type NotificationHandler = Box<dyn Send + FnMut(Value, AsyncApp)>;
|
||||
type RequestHandler = Box<dyn Send + FnMut(RequestId, &RawValue, AsyncApp)>;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
|
@ -83,15 +78,6 @@ pub struct Request<'a, T> {
|
|||
pub params: T,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AnyRequest<'a> {
|
||||
pub jsonrpc: &'a str,
|
||||
pub id: RequestId,
|
||||
pub method: &'a str,
|
||||
#[serde(skip_serializing_if = "is_null_value")]
|
||||
pub params: Option<&'a RawValue>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AnyResponse<'a> {
|
||||
jsonrpc: &'a str,
|
||||
|
@ -191,23 +177,15 @@ impl Client {
|
|||
Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
|
||||
let response_handlers =
|
||||
Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
|
||||
let request_handlers = Arc::new(Mutex::new(HashMap::<_, RequestHandler>::default()));
|
||||
|
||||
let receive_input_task = cx.spawn({
|
||||
let notification_handlers = notification_handlers.clone();
|
||||
let response_handlers = response_handlers.clone();
|
||||
let request_handlers = request_handlers.clone();
|
||||
let transport = transport.clone();
|
||||
async move |cx| {
|
||||
Self::handle_input(
|
||||
transport,
|
||||
notification_handlers,
|
||||
request_handlers,
|
||||
response_handlers,
|
||||
cx,
|
||||
)
|
||||
.log_err()
|
||||
.await
|
||||
Self::handle_input(transport, notification_handlers, response_handlers, cx)
|
||||
.log_err()
|
||||
.await
|
||||
}
|
||||
});
|
||||
let receive_err_task = cx.spawn({
|
||||
|
@ -253,24 +231,13 @@ impl Client {
|
|||
async fn handle_input(
|
||||
transport: Arc<dyn Transport>,
|
||||
notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
|
||||
request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
|
||||
response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
|
||||
cx: &mut AsyncApp,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut receiver = transport.receive();
|
||||
|
||||
while let Some(message) = receiver.next().await {
|
||||
log::trace!("recv: {}", &message);
|
||||
if let Ok(request) = serde_json::from_str::<AnyRequest>(&message) {
|
||||
let mut request_handlers = request_handlers.lock();
|
||||
if let Some(handler) = request_handlers.get_mut(request.method) {
|
||||
handler(
|
||||
request.id,
|
||||
request.params.unwrap_or(RawValue::NULL),
|
||||
cx.clone(),
|
||||
);
|
||||
}
|
||||
} else if let Ok(response) = serde_json::from_str::<AnyResponse>(&message) {
|
||||
if let Ok(response) = serde_json::from_str::<AnyResponse>(&message) {
|
||||
if let Some(handlers) = response_handlers.lock().as_mut() {
|
||||
if let Some(handler) = handlers.remove(&response.id) {
|
||||
handler(Ok(message.to_string()));
|
||||
|
@ -281,8 +248,6 @@ impl Client {
|
|||
if let Some(handler) = notification_handlers.get_mut(notification.method.as_str()) {
|
||||
handler(notification.params.unwrap_or(Value::Null), cx.clone());
|
||||
}
|
||||
} else {
|
||||
log::error!("Unhandled JSON from context_server: {}", message);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -330,17 +295,6 @@ impl Client {
|
|||
&self,
|
||||
method: &str,
|
||||
params: impl Serialize,
|
||||
) -> Result<T> {
|
||||
self.request_with(method, params, None, Some(REQUEST_TIMEOUT))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn request_with<T: DeserializeOwned>(
|
||||
&self,
|
||||
method: &str,
|
||||
params: impl Serialize,
|
||||
cancel_rx: Option<oneshot::Receiver<()>>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<T> {
|
||||
let id = self.next_id.fetch_add(1, SeqCst);
|
||||
let request = serde_json::to_string(&Request {
|
||||
|
@ -376,23 +330,7 @@ impl Client {
|
|||
handle_response?;
|
||||
send?;
|
||||
|
||||
let mut timeout_fut = pin!(
|
||||
match timeout {
|
||||
Some(timeout) => future::Either::Left(executor.timer(timeout)),
|
||||
None => future::Either::Right(future::pending()),
|
||||
}
|
||||
.fuse()
|
||||
);
|
||||
let mut cancel_fut = pin!(
|
||||
match cancel_rx {
|
||||
Some(rx) => future::Either::Left(async {
|
||||
rx.await.log_err();
|
||||
}),
|
||||
None => future::Either::Right(future::pending()),
|
||||
}
|
||||
.fuse()
|
||||
);
|
||||
|
||||
let mut timeout = executor.timer(REQUEST_TIMEOUT).fuse();
|
||||
select! {
|
||||
response = rx.fuse() => {
|
||||
let elapsed = started.elapsed();
|
||||
|
@ -411,18 +349,8 @@ impl Client {
|
|||
Err(_) => anyhow::bail!("cancelled")
|
||||
}
|
||||
}
|
||||
_ = cancel_fut => {
|
||||
self.notify(
|
||||
Cancelled::METHOD,
|
||||
ClientNotification::Cancelled(CancelledParams {
|
||||
request_id: RequestId::Int(id),
|
||||
reason: None
|
||||
})
|
||||
).log_err();
|
||||
anyhow::bail!(RequestCanceled)
|
||||
}
|
||||
_ = timeout_fut => {
|
||||
log::error!("cancelled csp request task for {method:?} id {id} which took over {:?}", timeout.unwrap());
|
||||
_ = timeout => {
|
||||
log::error!("cancelled csp request task for {method:?} id {id} which took over {:?}", REQUEST_TIMEOUT);
|
||||
anyhow::bail!("Context server request timeout");
|
||||
}
|
||||
}
|
||||
|
@ -452,17 +380,6 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RequestCanceled;
|
||||
|
||||
impl std::error::Error for RequestCanceled {}
|
||||
|
||||
impl std::fmt::Display for RequestCanceled {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str("Context server request was canceled")
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ContextServerId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.0.fmt(f)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue