diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 5362f80df2..b8f2c929b2 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -464,6 +464,7 @@ impl Server { TypeId::of::(), Box::new(move |envelope, session| { let envelope = envelope.into_any().downcast::>().unwrap(); + let received_at = envelope.received_at; let span = info_span!( "handle message", payload_type = envelope.payload_type_name() @@ -478,12 +479,14 @@ impl Server { let future = (handler)(*envelope, session); async move { let result = future.await; - let duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0; + let total_duration_ms = received_at.elapsed().as_micros() as f64 / 1000.0; + let processing_duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0; + let queue_duration_ms = processing_duration_ms - total_duration_ms; match result { Err(error) => { - tracing::error!(%error, ?duration_ms, "error handling message") + tracing::error!(%error, ?total_duration_ms, ?processing_duration_ms, ?queue_duration_ms, "error handling message") } - Ok(()) => tracing::info!(?duration_ms, "finished handling message"), + Ok(()) => tracing::info!(?total_duration_ms, ?processing_duration_ms, ?queue_duration_ms, "finished handling message"), } } .instrument(span) diff --git a/crates/rpc/src/macros.rs b/crates/rpc/src/macros.rs index 85e2b0cf87..f85889a97b 100644 --- a/crates/rpc/src/macros.rs +++ b/crates/rpc/src/macros.rs @@ -1,7 +1,7 @@ #[macro_export] macro_rules! messages { ($(($name:ident, $priority:ident)),* $(,)?) => { - pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option> { + pub fn build_typed_envelope(sender_id: ConnectionId, received_at: Instant, envelope: Envelope) -> Option> { match envelope.payload { $(Some(envelope::Payload::$name(payload)) => { Some(Box::new(TypedEnvelope { @@ -12,6 +12,7 @@ macro_rules! messages { }), message_id: envelope.id, payload, + received_at, })) }, )* _ => None diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index c73f52a99e..486e758a3c 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -13,7 +13,7 @@ use futures::{ }; use parking_lot::{Mutex, RwLock}; use serde::{ser::SerializeStruct, Serialize}; -use std::{fmt, sync::atomic::Ordering::SeqCst}; +use std::{fmt, sync::atomic::Ordering::SeqCst, time::Instant}; use std::{ future::Future, marker::PhantomData, @@ -79,6 +79,7 @@ pub struct TypedEnvelope { pub original_sender_id: Option, pub message_id: u32, pub payload: T, + pub received_at: Instant, } impl TypedEnvelope { @@ -111,8 +112,16 @@ pub struct ConnectionState { next_message_id: Arc, #[allow(clippy::type_complexity)] #[serde(skip)] - response_channels: - Arc)>>>>>, + response_channels: Arc< + Mutex< + Option< + HashMap< + u32, + oneshot::Sender<(proto::Envelope, std::time::Instant, oneshot::Sender<()>)>, + >, + >, + >, + >, } const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); @@ -154,7 +163,7 @@ impl Peer { #[cfg(any(test, feature = "test-support"))] const INCOMING_BUFFER_SIZE: usize = 1; #[cfg(not(any(test, feature = "test-support")))] - const INCOMING_BUFFER_SIZE: usize = 64; + const INCOMING_BUFFER_SIZE: usize = 256; let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE); let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); @@ -238,10 +247,10 @@ impl Peer { tracing::trace!(%connection_id, "incoming rpc message: received"); tracing::trace!(%connection_id, "receive timeout: resetting"); receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); - if let proto::Message::Envelope(incoming) = incoming { + if let (proto::Message::Envelope(incoming), received_at) = incoming { tracing::trace!(%connection_id, "incoming rpc message: processing"); futures::select_biased! { - result = incoming_tx.send(incoming).fuse() => match result { + result = incoming_tx.send((incoming, received_at)).fuse() => match result { Ok(_) => { tracing::trace!(%connection_id, "incoming rpc message: processed"); } @@ -272,7 +281,7 @@ impl Peer { .write() .insert(connection_id, connection_state); - let incoming_rx = incoming_rx.filter_map(move |incoming| { + let incoming_rx = incoming_rx.filter_map(move |(incoming, received_at)| { let response_channels = response_channels.clone(); async move { let message_id = incoming.id; @@ -291,7 +300,7 @@ impl Peer { let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(tx) = channel { let requester_resumed = oneshot::channel(); - if let Err(error) = tx.send((incoming, requester_resumed.0)) { + if let Err(error) = tx.send((incoming, received_at, requester_resumed.0)) { tracing::trace!( %connection_id, message_id, @@ -315,8 +324,9 @@ impl Peer { "incoming response: requester resumed" ); } else { - let message_type = proto::build_typed_envelope(connection_id, incoming) - .map(|p| p.payload_type_name()); + let message_type = + proto::build_typed_envelope(connection_id, received_at, incoming) + .map(|p| p.payload_type_name()); tracing::warn!( %connection_id, message_id, @@ -329,14 +339,16 @@ impl Peer { None } else { tracing::trace!(%connection_id, message_id, "incoming message: received"); - proto::build_typed_envelope(connection_id, incoming).or_else(|| { - tracing::error!( - %connection_id, - message_id, - "unable to construct a typed envelope" - ); - None - }) + proto::build_typed_envelope(connection_id, received_at, incoming).or_else( + || { + tracing::error!( + %connection_id, + message_id, + "unable to construct a typed envelope" + ); + None + }, + ) } } }); @@ -425,7 +437,8 @@ impl Peer { }); async move { send?; - let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?; + let (response, received_at, _barrier) = + rx.await.map_err(|_| anyhow!("connection was closed"))?; if let Some(proto::envelope::Payload::Error(error)) = &response.payload { Err(RpcError::from_proto(&error, T::NAME)) @@ -436,6 +449,7 @@ impl Peer { original_sender_id: response.original_sender_id, payload: T::Response::from_envelope(response) .ok_or_else(|| anyhow!("received response of the wrong type"))?, + received_at, }) } } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 38e80103ca..40d0d6e3c4 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -8,6 +8,7 @@ use futures::{SinkExt as _, StreamExt as _}; use prost::Message as _; use serde::Serialize; use std::any::{Any, TypeId}; +use std::time::Instant; use std::{ cmp, fmt::Debug, @@ -515,8 +516,9 @@ impl MessageStream where S: futures::Stream> + Unpin, { - pub async fn read(&mut self) -> Result { + pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> { while let Some(bytes) = self.stream.next().await { + let received_at = Instant::now(); match bytes? { WebSocketMessage::Binary(bytes) => { zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap(); @@ -525,10 +527,10 @@ where self.encoding_buffer.clear(); self.encoding_buffer.shrink_to(MAX_BUFFER_LEN); - return Ok(Message::Envelope(envelope)); + return Ok((Message::Envelope(envelope), received_at)); } - WebSocketMessage::Ping(_) => return Ok(Message::Ping), - WebSocketMessage::Pong(_) => return Ok(Message::Pong), + WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)), + WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)), WebSocketMessage::Close(_) => break, _ => {} }