ZIm/crates/rpc/src/conn.rs
Kirill Bulatov 16366cf9f2
Use anyhow more idiomatically (#31052)
https://github.com/zed-industries/zed/issues/30972 brought up another
case where our context is not enough to track the actual source of the
issue: we get a general top-level error without inner error.

The reason for this was `.ok_or_else(|| anyhow!("failed to read HEAD
SHA"))?; ` on the top level.

The PR finally reworks the way we use anyhow to reduce such issues (or
at least make it simpler to bubble them up later in a fix).
On top of that, uses a few more anyhow methods for better readability.

* `.ok_or_else(|| anyhow!("..."))`, `map_err` and other similar error
conversion/option reporting cases are replaced with `context` and
`with_context` calls
* in addition to that, various `anyhow!("failed to do ...")` are
stripped with `.context("Doing ...")` messages instead to remove the
parasitic `failed to` text
* `anyhow::ensure!` is used instead of `if ... { return Err(...); }`
calls
* `anyhow::bail!` is used instead of `return Err(anyhow!(...));`

Release Notes:

- N/A
2025-05-20 23:06:07 +00:00

103 lines
3.4 KiB
Rust

use async_tungstenite::tungstenite::Message as WebSocketMessage;
use futures::{SinkExt as _, StreamExt as _};
pub struct Connection {
pub(crate) tx:
Box<dyn 'static + Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
pub(crate) rx:
Box<dyn 'static + Send + Unpin + futures::Stream<Item = anyhow::Result<WebSocketMessage>>>,
}
impl Connection {
pub fn new<S>(stream: S) -> Self
where
S: 'static
+ Send
+ Unpin
+ futures::Sink<WebSocketMessage, Error = anyhow::Error>
+ futures::Stream<Item = anyhow::Result<WebSocketMessage>>,
{
let (tx, rx) = stream.split();
Self {
tx: Box::new(tx),
rx: Box::new(rx),
}
}
pub async fn send(&mut self, message: WebSocketMessage) -> anyhow::Result<()> {
self.tx.send(message).await
}
#[cfg(any(test, feature = "test-support"))]
pub fn in_memory(
executor: gpui::BackgroundExecutor,
) -> (Self, Self, std::sync::Arc<std::sync::atomic::AtomicBool>) {
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering::SeqCst},
};
let killed = Arc::new(AtomicBool::new(false));
let (a_tx, a_rx) = channel(killed.clone(), executor.clone());
let (b_tx, b_rx) = channel(killed.clone(), executor);
return (
Self { tx: a_tx, rx: b_rx },
Self { tx: b_tx, rx: a_rx },
killed,
);
#[allow(clippy::type_complexity)]
fn channel(
killed: Arc<AtomicBool>,
executor: gpui::BackgroundExecutor,
) -> (
Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
Box<dyn Send + Unpin + futures::Stream<Item = anyhow::Result<WebSocketMessage>>>,
) {
use anyhow::anyhow;
use futures::channel::mpsc;
use std::io::{Error, ErrorKind};
let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
let tx = tx.sink_map_err(|error| anyhow!(error)).with({
let killed = killed.clone();
let executor = executor.clone();
move |msg| {
let killed = killed.clone();
let executor = executor.clone();
Box::pin(async move {
executor.simulate_random_delay().await;
// Writes to a half-open TCP connection will error.
if killed.load(SeqCst) {
std::io::Result::Err(Error::new(ErrorKind::Other, "connection lost"))?;
}
Ok(msg)
})
}
});
let rx = rx.then({
let executor = executor.clone();
move |msg| {
let killed = killed.clone();
let executor = executor.clone();
Box::pin(async move {
executor.simulate_random_delay().await;
// Reads from a half-open TCP connection will hang.
if killed.load(SeqCst) {
futures::future::pending::<()>().await;
}
Ok(msg)
})
}
});
(Box::new(tx), Box::new(rx))
}
}
}