diff --git a/Cargo.lock b/Cargo.lock index cfd27bbdd0..87e32ed97d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3157,6 +3157,7 @@ dependencies = [ name = "journal" version = "0.1.0" dependencies = [ + "anyhow", "chrono", "dirs 4.0.0", "editor", diff --git a/crates/client/src/telemetry.rs b/crates/client/src/telemetry.rs index 748eb48f7e..9d486619d2 100644 --- a/crates/client/src/telemetry.rs +++ b/crates/client/src/telemetry.rs @@ -224,7 +224,7 @@ impl Telemetry { .header("Content-Type", "application/json") .body(json_bytes.into())?; this.http_client.send(request).await?; - Ok(()) + anyhow::Ok(()) } .log_err(), ) @@ -320,7 +320,7 @@ impl Telemetry { .header("Content-Type", "application/json") .body(json_bytes.into())?; this.http_client.send(request).await?; - Ok(()) + anyhow::Ok(()) } .log_err(), ) diff --git a/crates/collab_ui/src/contact_finder.rs b/crates/collab_ui/src/contact_finder.rs index ff783c6274..3b93414e1f 100644 --- a/crates/collab_ui/src/contact_finder.rs +++ b/crates/collab_ui/src/contact_finder.rs @@ -68,7 +68,7 @@ impl PickerDelegate for ContactFinder { this.potential_contacts = potential_contacts.into(); cx.notify(); }); - Ok(()) + anyhow::Ok(()) } .log_err() .await; diff --git a/crates/journal/Cargo.toml b/crates/journal/Cargo.toml index b532397dd1..a2656ad219 100644 --- a/crates/journal/Cargo.toml +++ b/crates/journal/Cargo.toml @@ -13,6 +13,7 @@ editor = { path = "../editor" } gpui = { path = "../gpui" } util = { path = "../util" } workspace = { path = "../workspace" } +anyhow = "1.0" chrono = "0.4" dirs = "4.0" log = { version = "0.4.16", features = ["kv_unstable_serde"] } diff --git a/crates/journal/src/journal.rs b/crates/journal/src/journal.rs index 76a56af93d..1ad97e61b1 100644 --- a/crates/journal/src/journal.rs +++ b/crates/journal/src/journal.rs @@ -73,7 +73,7 @@ pub fn new_journal_entry(app_state: Arc, cx: &mut MutableAppContext) { } } - Ok(()) + anyhow::Ok(()) } .log_err() }) diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index 6982445982..81568b9e3e 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -160,15 +160,13 @@ impl LanguageServer { server: Option, root_path: &Path, cx: AsyncAppContext, - mut on_unhandled_notification: F, + on_unhandled_notification: F, ) -> Self where Stdin: AsyncWrite + Unpin + Send + 'static, Stdout: AsyncRead + Unpin + Send + 'static, F: FnMut(AnyNotification) + 'static + Send, { - let mut stdin = BufWriter::new(stdin); - let mut stdout = BufReader::new(stdout); let (outbound_tx, outbound_rx) = channel::unbounded::>(); let notification_handlers = Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default())); @@ -177,89 +175,19 @@ impl LanguageServer { let input_task = cx.spawn(|cx| { let notification_handlers = notification_handlers.clone(); let response_handlers = response_handlers.clone(); - async move { - let _clear_response_handlers = util::defer({ - let response_handlers = response_handlers.clone(); - move || { - response_handlers.lock().take(); - } - }); - let mut buffer = Vec::new(); - loop { - buffer.clear(); - stdout.read_until(b'\n', &mut buffer).await?; - stdout.read_until(b'\n', &mut buffer).await?; - let message_len: usize = std::str::from_utf8(&buffer)? - .strip_prefix(CONTENT_LEN_HEADER) - .ok_or_else(|| anyhow!("invalid header"))? - .trim_end() - .parse()?; - - buffer.resize(message_len, 0); - stdout.read_exact(&mut buffer).await?; - log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer)); - - if let Ok(msg) = serde_json::from_slice::(&buffer) { - if let Some(handler) = notification_handlers.lock().get_mut(msg.method) { - handler(msg.id, msg.params.get(), cx.clone()); - } else { - on_unhandled_notification(msg); - } - } else if let Ok(AnyResponse { - id, error, result, .. - }) = serde_json::from_slice(&buffer) - { - if let Some(handler) = response_handlers - .lock() - .as_mut() - .and_then(|handlers| handlers.remove(&id)) - { - if let Some(error) = error { - handler(Err(error)); - } else if let Some(result) = result { - handler(Ok(result.get())); - } else { - handler(Ok("null")); - } - } - } else { - warn!( - "Failed to deserialize message:\n{}", - std::str::from_utf8(&buffer)? - ); - } - - // Don't starve the main thread when receiving lots of messages at once. - smol::future::yield_now().await; - } - } + Self::handle_input( + stdout, + on_unhandled_notification, + notification_handlers, + response_handlers, + cx, + ) .log_err() }); let (output_done_tx, output_done_rx) = barrier::channel(); let output_task = cx.background().spawn({ let response_handlers = response_handlers.clone(); - async move { - let _clear_response_handlers = util::defer({ - let response_handlers = response_handlers.clone(); - move || { - response_handlers.lock().take(); - } - }); - let mut content_len_buffer = Vec::new(); - while let Ok(message) = outbound_rx.recv().await { - log::trace!("outgoing message:{}", String::from_utf8_lossy(&message)); - content_len_buffer.clear(); - write!(content_len_buffer, "{}", message.len()).unwrap(); - stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?; - stdin.write_all(&content_len_buffer).await?; - stdin.write_all("\r\n\r\n".as_bytes()).await?; - stdin.write_all(&message).await?; - stdin.flush().await?; - } - drop(output_done_tx); - Ok(()) - } - .log_err() + Self::handle_output(stdin, outbound_rx, output_done_tx, response_handlers).log_err() }); Self { @@ -278,6 +206,105 @@ impl LanguageServer { } } + async fn handle_input( + stdout: Stdout, + mut on_unhandled_notification: F, + notification_handlers: Arc>>, + response_handlers: Arc>>>, + cx: AsyncAppContext, + ) -> anyhow::Result<()> + where + Stdout: AsyncRead + Unpin + Send + 'static, + F: FnMut(AnyNotification) + 'static + Send, + { + let mut stdout = BufReader::new(stdout); + let _clear_response_handlers = util::defer({ + let response_handlers = response_handlers.clone(); + move || { + response_handlers.lock().take(); + } + }); + let mut buffer = Vec::new(); + loop { + buffer.clear(); + stdout.read_until(b'\n', &mut buffer).await?; + stdout.read_until(b'\n', &mut buffer).await?; + let message_len: usize = std::str::from_utf8(&buffer)? + .strip_prefix(CONTENT_LEN_HEADER) + .ok_or_else(|| anyhow!("invalid header"))? + .trim_end() + .parse()?; + + buffer.resize(message_len, 0); + stdout.read_exact(&mut buffer).await?; + log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer)); + + if let Ok(msg) = serde_json::from_slice::(&buffer) { + if let Some(handler) = notification_handlers.lock().get_mut(msg.method) { + handler(msg.id, msg.params.get(), cx.clone()); + } else { + on_unhandled_notification(msg); + } + } else if let Ok(AnyResponse { + id, error, result, .. + }) = serde_json::from_slice(&buffer) + { + if let Some(handler) = response_handlers + .lock() + .as_mut() + .and_then(|handlers| handlers.remove(&id)) + { + if let Some(error) = error { + handler(Err(error)); + } else if let Some(result) = result { + handler(Ok(result.get())); + } else { + handler(Ok("null")); + } + } + } else { + warn!( + "Failed to deserialize message:\n{}", + std::str::from_utf8(&buffer)? + ); + } + + // Don't starve the main thread when receiving lots of messages at once. + smol::future::yield_now().await; + } + } + + async fn handle_output( + stdin: Stdin, + outbound_rx: channel::Receiver>, + output_done_tx: barrier::Sender, + response_handlers: Arc>>>, + ) -> anyhow::Result<()> + where + Stdin: AsyncWrite + Unpin + Send + 'static, + { + let mut stdin = BufWriter::new(stdin); + let _clear_response_handlers = util::defer({ + let response_handlers = response_handlers.clone(); + move || { + response_handlers.lock().take(); + } + }); + let mut content_len_buffer = Vec::new(); + while let Ok(message) = outbound_rx.recv().await { + log::trace!("outgoing message:{}", String::from_utf8_lossy(&message)); + content_len_buffer.clear(); + write!(content_len_buffer, "{}", message.len()).unwrap(); + stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?; + stdin.write_all(&content_len_buffer).await?; + stdin.write_all("\r\n\r\n".as_bytes()).await?; + stdin.write_all(&message).await?; + stdin.flush().await?; + } + drop(output_done_tx); + Ok(()) + } + /// Initializes a language server. /// Note that `options` is used directly to construct [`InitializeParams`], /// which is why it is owned. @@ -389,7 +416,7 @@ impl LanguageServer { output_done.recv().await; log::debug!("language server shutdown finished"); drop(tasks); - Ok(()) + anyhow::Ok(()) } .log_err(), ) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 2ebea8d07d..a970c7bb19 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -5831,7 +5831,7 @@ impl Project { })?; } - Ok(()) + anyhow::Ok(()) } .log_err(), ) diff --git a/crates/util/src/util.rs b/crates/util/src/util.rs index 5ecf889f9b..3824312a4f 100644 --- a/crates/util/src/util.rs +++ b/crates/util/src/util.rs @@ -124,11 +124,15 @@ pub trait TryFutureExt { fn warn_on_err(self) -> LogErrorFuture where Self: Sized; + fn unwrap(self) -> UnwrapFuture + where + Self: Sized; } -impl TryFutureExt for F +impl TryFutureExt for F where - F: Future>, + F: Future>, + E: std::fmt::Debug, { fn log_err(self) -> LogErrorFuture where @@ -143,17 +147,25 @@ where { LogErrorFuture(self, log::Level::Warn) } + + fn unwrap(self) -> UnwrapFuture + where + Self: Sized, + { + UnwrapFuture(self) + } } pub struct LogErrorFuture(F, log::Level); -impl Future for LogErrorFuture +impl Future for LogErrorFuture where - F: Future>, + F: Future>, + E: std::fmt::Debug, { type Output = Option; - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let level = self.1; let inner = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }; match inner.poll(cx) { @@ -169,6 +181,24 @@ where } } +pub struct UnwrapFuture(F); + +impl Future for UnwrapFuture +where + F: Future>, + E: std::fmt::Debug, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }; + match inner.poll(cx) { + Poll::Ready(result) => Poll::Ready(result.unwrap()), + Poll::Pending => Poll::Pending, + } + } +} + struct Defer(Option); impl Drop for Defer {