Use an unbounded channel for peer's outgoing messages
Using a bounded channel may have blocked the collaboration server from making progress handling RPC traffic. There's no need to apply backpressure to calling code within the same process - suspending a task that is attempting to call `send` has an even greater memory cost than just buffering a protobuf message. We do still want a bounded channel for incoming messages, so that we provide backpressure to noisy peers - blocking their writes as opposed to allowing them to buffer arbitrarily many messages in our server. Co-Authored-By: Antonio Scandurra <me@as-cii.com> Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
parent
82afacd33d
commit
d4fe1115e7
7 changed files with 341 additions and 472 deletions
|
@ -89,7 +89,7 @@ pub struct Peer {
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectionState {
|
||||
outgoing_tx: mpsc::Sender<proto::Envelope>,
|
||||
outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Envelope>,
|
||||
next_message_id: Arc<AtomicU32>,
|
||||
response_channels: Arc<Mutex<Option<HashMap<u32, mpsc::Sender<proto::Envelope>>>>>,
|
||||
}
|
||||
|
@ -112,9 +112,14 @@ impl Peer {
|
|||
impl Future<Output = anyhow::Result<()>> + Send,
|
||||
BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
|
||||
) {
|
||||
let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
|
||||
// For outgoing messages, use an unbounded channel so that application code
|
||||
// can always send messages without yielding. For incoming messages, use a
|
||||
// bounded channel so that other peers will receive backpressure if they send
|
||||
// messages faster than this peer can process them.
|
||||
let (mut incoming_tx, incoming_rx) = mpsc::channel(64);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(64);
|
||||
let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
|
||||
|
||||
let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
|
||||
let connection_state = ConnectionState {
|
||||
outgoing_tx,
|
||||
next_message_id: Default::default(),
|
||||
|
@ -131,6 +136,16 @@ impl Peer {
|
|||
futures::pin_mut!(read_message);
|
||||
loop {
|
||||
futures::select_biased! {
|
||||
outgoing = outgoing_rx.next().fuse() => match outgoing {
|
||||
Some(outgoing) => {
|
||||
match writer.write_message(&outgoing).timeout(WRITE_TIMEOUT).await {
|
||||
None => break 'outer Err(anyhow!("timed out writing RPC message")),
|
||||
Some(Err(result)) => break 'outer Err(result).context("failed to write RPC message"),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
None => break 'outer Ok(()),
|
||||
},
|
||||
incoming = read_message => match incoming {
|
||||
Ok(incoming) => {
|
||||
if incoming_tx.send(incoming).await.is_err() {
|
||||
|
@ -142,16 +157,6 @@ impl Peer {
|
|||
break 'outer Err(error).context("received invalid RPC message")
|
||||
}
|
||||
},
|
||||
outgoing = outgoing_rx.recv().fuse() => match outgoing {
|
||||
Some(outgoing) => {
|
||||
match writer.write_message(&outgoing).timeout(WRITE_TIMEOUT).await {
|
||||
None => break 'outer Err(anyhow!("timed out writing RPC message")),
|
||||
Some(Err(result)) => break 'outer Err(result).context("failed to write RPC message"),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
None => break 'outer Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -223,9 +228,9 @@ impl Peer {
|
|||
request: T,
|
||||
) -> impl Future<Output = Result<T::Response>> {
|
||||
let this = self.clone();
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
async move {
|
||||
let mut connection = this.connection_state(receiver_id)?;
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
let connection = this.connection_state(receiver_id)?;
|
||||
let message_id = connection.next_message_id.fetch_add(1, SeqCst);
|
||||
connection
|
||||
.response_channels
|
||||
|
@ -235,8 +240,11 @@ impl Peer {
|
|||
.insert(message_id, tx);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.send(request.into_envelope(message_id, None, original_sender_id.map(|id| id.0)))
|
||||
.await
|
||||
.unbounded_send(request.into_envelope(
|
||||
message_id,
|
||||
None,
|
||||
original_sender_id.map(|id| id.0),
|
||||
))
|
||||
.map_err(|_| anyhow!("connection was closed"))?;
|
||||
let response = rx
|
||||
.recv()
|
||||
|
@ -255,19 +263,15 @@ impl Peer {
|
|||
self: &Arc<Self>,
|
||||
receiver_id: ConnectionId,
|
||||
message: T,
|
||||
) -> impl Future<Output = Result<()>> {
|
||||
let this = self.clone();
|
||||
async move {
|
||||
let mut connection = this.connection_state(receiver_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.send(message.into_envelope(message_id, None, None))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
) -> Result<()> {
|
||||
let connection = self.connection_state(receiver_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.unbounded_send(message.into_envelope(message_id, None, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn forward_send<T: EnvelopedMessage>(
|
||||
|
@ -275,57 +279,45 @@ impl Peer {
|
|||
sender_id: ConnectionId,
|
||||
receiver_id: ConnectionId,
|
||||
message: T,
|
||||
) -> impl Future<Output = Result<()>> {
|
||||
let this = self.clone();
|
||||
async move {
|
||||
let mut connection = this.connection_state(receiver_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.send(message.into_envelope(message_id, None, Some(sender_id.0)))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
) -> Result<()> {
|
||||
let connection = self.connection_state(receiver_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.unbounded_send(message.into_envelope(message_id, None, Some(sender_id.0)))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn respond<T: RequestMessage>(
|
||||
self: &Arc<Self>,
|
||||
receipt: Receipt<T>,
|
||||
response: T::Response,
|
||||
) -> impl Future<Output = Result<()>> {
|
||||
let this = self.clone();
|
||||
async move {
|
||||
let mut connection = this.connection_state(receipt.sender_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.send(response.into_envelope(message_id, Some(receipt.message_id), None))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
) -> Result<()> {
|
||||
let connection = self.connection_state(receipt.sender_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn respond_with_error<T: RequestMessage>(
|
||||
self: &Arc<Self>,
|
||||
receipt: Receipt<T>,
|
||||
response: proto::Error,
|
||||
) -> impl Future<Output = Result<()>> {
|
||||
let this = self.clone();
|
||||
async move {
|
||||
let mut connection = this.connection_state(receipt.sender_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.send(response.into_envelope(message_id, Some(receipt.message_id), None))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
) -> Result<()> {
|
||||
let connection = self.connection_state(receipt.sender_id)?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
connection
|
||||
.outgoing_tx
|
||||
.unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn connection_state(&self, connection_id: ConnectionId) -> Result<ConnectionState> {
|
||||
|
@ -447,7 +439,7 @@ mod tests {
|
|||
let envelope = envelope.into_any();
|
||||
if let Some(envelope) = envelope.downcast_ref::<TypedEnvelope<proto::Ping>>() {
|
||||
let receipt = envelope.receipt();
|
||||
peer.respond(receipt, proto::Ack {}).await?
|
||||
peer.respond(receipt, proto::Ack {})?
|
||||
} else if let Some(envelope) =
|
||||
envelope.downcast_ref::<TypedEnvelope<proto::OpenBuffer>>()
|
||||
{
|
||||
|
@ -475,7 +467,7 @@ mod tests {
|
|||
}
|
||||
};
|
||||
|
||||
peer.respond(receipt, response).await?
|
||||
peer.respond(receipt, response)?
|
||||
} else {
|
||||
panic!("unknown message type");
|
||||
}
|
||||
|
@ -518,7 +510,6 @@ mod tests {
|
|||
message: "message 1".to_string(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
server
|
||||
.send(
|
||||
|
@ -527,12 +518,8 @@ mod tests {
|
|||
message: "message 2".to_string(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
server
|
||||
.respond(request.receipt(), proto::Ack {})
|
||||
.await
|
||||
.unwrap();
|
||||
server.respond(request.receipt(), proto::Ack {}).unwrap();
|
||||
|
||||
// Prevent the connection from being dropped
|
||||
server_incoming.next().await;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue