Preserve the order of responses with respect to all other incoming messages

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>
This commit is contained in:
Antonio Scandurra 2022-01-12 18:26:00 +01:00
parent 9e4b118214
commit 8b53868f8a
5 changed files with 57 additions and 46 deletions

View file

@ -1,7 +1,8 @@
use super::proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage};
use super::Connection;
use anyhow::{anyhow, Context, Result};
use futures::FutureExt as _;
use futures::stream::BoxStream;
use futures::{FutureExt as _, StreamExt};
use parking_lot::{Mutex, RwLock};
use postage::{
mpsc,
@ -109,7 +110,7 @@ impl Peer {
) -> (
ConnectionId,
impl Future<Output = anyhow::Result<()>> + Send,
mpsc::Receiver<Box<dyn AnyTypedEnvelope>>,
BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
) {
let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
let (mut incoming_tx, incoming_rx) = mpsc::channel(64);
@ -132,23 +133,9 @@ impl Peer {
futures::select_biased! {
incoming = read_message => match incoming {
Ok(incoming) => {
if let Some(responding_to) = incoming.responding_to {
let channel = response_channels.lock().as_mut().unwrap().remove(&responding_to);
if let Some(mut tx) = channel {
tx.send(incoming).await.ok();
} else {
log::warn!("received RPC response to unknown request {}", responding_to);
}
} else {
if let Some(envelope) = proto::build_typed_envelope(connection_id, incoming) {
if incoming_tx.send(envelope).await.is_err() {
break 'outer Ok(())
}
} else {
log::error!("unable to construct a typed envelope");
}
if incoming_tx.send(incoming).await.is_err() {
break 'outer Ok(());
}
break;
}
Err(error) => {
@ -174,11 +161,38 @@ impl Peer {
result
};
let response_channels = connection_state.response_channels.clone();
self.connections
.write()
.insert(connection_id, connection_state);
(connection_id, handle_io, incoming_rx)
let incoming_rx = incoming_rx.filter_map(move |incoming| {
let response_channels = response_channels.clone();
async move {
if let Some(responding_to) = incoming.responding_to {
let channel = response_channels
.lock()
.as_mut()
.unwrap()
.remove(&responding_to);
if let Some(mut tx) = channel {
tx.send(incoming).await.ok();
} else {
log::warn!("received RPC response to unknown request {}", responding_to);
}
None
} else {
if let Some(envelope) = proto::build_typed_envelope(connection_id, incoming) {
Some(envelope)
} else {
log::error!("unable to construct a typed envelope");
None
}
}
}
});
(connection_id, handle_io, incoming_rx.boxed())
}
pub fn disconnect(&self, connection_id: ConnectionId) {
@ -332,7 +346,6 @@ mod tests {
use super::*;
use crate::TypedEnvelope;
use async_tungstenite::tungstenite::Message as WebSocketMessage;
use futures::StreamExt as _;
#[test]
fn test_request_response() {
@ -421,7 +434,7 @@ mod tests {
client2.disconnect(client1_conn_id);
async fn handle_messages(
mut messages: mpsc::Receiver<Box<dyn AnyTypedEnvelope>>,
mut messages: BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
peer: Arc<Peer>,
) -> Result<()> {
while let Some(envelope) = messages.next().await {