Get zed.dev working with new collab backend

Co-Authored-By: Antonio Scandurra <me@as-cii.com>
This commit is contained in:
Nathan Sobo 2022-04-26 11:15:41 -06:00
parent be040b60b7
commit 2adb9fe472
10 changed files with 6632 additions and 6624 deletions

View file

@ -1,14 +1,14 @@
use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
use async_tungstenite::tungstenite::Message as WebSocketMessage;
use futures::{SinkExt as _, StreamExt as _};
pub struct Connection {
pub(crate) tx:
Box<dyn 'static + Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
Box<dyn 'static + Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
pub(crate) rx: Box<
dyn 'static
+ Send
+ Unpin
+ futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
+ futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>,
>,
}
@ -18,8 +18,8 @@ impl Connection {
S: 'static
+ Send
+ Unpin
+ futures::Sink<WebSocketMessage, Error = WebSocketError>
+ futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
+ futures::Sink<WebSocketMessage, Error = anyhow::Error>
+ futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>,
{
let (tx, rx) = stream.split();
Self {
@ -28,7 +28,7 @@ impl Connection {
}
}
pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), WebSocketError> {
pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), anyhow::Error> {
self.tx.send(message).await
}
@ -54,40 +54,37 @@ impl Connection {
killed: Arc<AtomicBool>,
executor: Arc<gpui::executor::Background>,
) -> (
Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
Box<
dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
>,
Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
Box<dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>>,
) {
use anyhow::anyhow;
use futures::channel::mpsc;
use std::io::{Error, ErrorKind};
let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
let tx = tx
.sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e)))
.with({
let tx = tx.sink_map_err(|error| anyhow!(error)).with({
let killed = killed.clone();
let executor = Arc::downgrade(&executor);
move |msg| {
let killed = killed.clone();
let executor = Arc::downgrade(&executor);
move |msg| {
let killed = killed.clone();
let executor = executor.clone();
Box::pin(async move {
if let Some(executor) = executor.upgrade() {
executor.simulate_random_delay().await;
}
let executor = executor.clone();
Box::pin(async move {
if let Some(executor) = executor.upgrade() {
executor.simulate_random_delay().await;
}
// Writes to a half-open TCP connection will error.
if killed.load(SeqCst) {
std::io::Result::Err(
Error::new(ErrorKind::Other, "connection lost").into(),
)?;
}
// Writes to a half-open TCP connection will error.
if killed.load(SeqCst) {
std::io::Result::Err(
Error::new(ErrorKind::Other, "connection lost").into(),
)?;
}
Ok(msg)
})
}
});
Ok(msg)
})
}
});
let rx = rx.then({
let killed = killed.clone();

View file

@ -1,6 +1,6 @@
use super::{ConnectionId, PeerId, TypedEnvelope};
use anyhow::Result;
use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
use anyhow::{anyhow, Result};
use async_tungstenite::tungstenite::Message as WebSocketMessage;
use futures::{SinkExt as _, StreamExt as _};
use prost::Message as _;
use std::any::{Any, TypeId};
@ -318,9 +318,9 @@ impl<S> MessageStream<S> {
impl<S> MessageStream<S>
where
S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
{
pub async fn write(&mut self, message: Message) -> Result<(), WebSocketError> {
pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
#[cfg(any(test, feature = "test-support"))]
const COMPRESSION_LEVEL: i32 = -7;
@ -357,9 +357,9 @@ where
impl<S> MessageStream<S>
where
S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
{
pub async fn read(&mut self) -> Result<Message, WebSocketError> {
pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
while let Some(bytes) = self.stream.next().await {
match bytes? {
WebSocketMessage::Binary(bytes) => {
@ -375,7 +375,7 @@ where
_ => {}
}
}
Err(WebSocketError::ConnectionClosed)
Err(anyhow!("connection closed"))
}
}