Extract a proto
crate out of rpc
(#12852)
Release Notes: - N/A --------- Co-authored-by: Nathan <nathan@zed.dev>
This commit is contained in:
parent
57c40299a5
commit
77e88c1ded
16 changed files with 856 additions and 727 deletions
|
@ -1,7 +1,8 @@
|
|||
use crate::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError};
|
||||
|
||||
use super::{
|
||||
proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage},
|
||||
proto::{
|
||||
self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, Receipt, RequestMessage,
|
||||
TypedEnvelope,
|
||||
},
|
||||
Connection,
|
||||
};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
|
@ -12,11 +13,11 @@ use futures::{
|
|||
FutureExt, SinkExt, Stream, StreamExt, TryFutureExt,
|
||||
};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use proto::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError};
|
||||
use serde::{ser::SerializeStruct, Serialize};
|
||||
use std::{
|
||||
fmt, future,
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
sync::atomic::Ordering::SeqCst,
|
||||
sync::{
|
||||
atomic::{self, AtomicU32},
|
||||
|
@ -57,46 +58,6 @@ impl fmt::Display for ConnectionId {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct Receipt<T> {
|
||||
pub sender_id: ConnectionId,
|
||||
pub message_id: u32,
|
||||
payload_type: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> Clone for Receipt<T> {
|
||||
fn clone(&self) -> Self {
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Copy for Receipt<T> {}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TypedEnvelope<T> {
|
||||
pub sender_id: ConnectionId,
|
||||
pub original_sender_id: Option<PeerId>,
|
||||
pub message_id: u32,
|
||||
pub payload: T,
|
||||
pub received_at: Instant,
|
||||
}
|
||||
|
||||
impl<T> TypedEnvelope<T> {
|
||||
pub fn original_sender_id(&self) -> Result<PeerId> {
|
||||
self.original_sender_id
|
||||
.ok_or_else(|| anyhow!("missing original_sender_id"))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: RequestMessage> TypedEnvelope<T> {
|
||||
pub fn receipt(&self) -> Receipt<T> {
|
||||
Receipt {
|
||||
sender_id: self.sender_id,
|
||||
message_id: self.message_id,
|
||||
payload_type: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Peer {
|
||||
epoch: AtomicU32,
|
||||
pub connections: RwLock<HashMap<ConnectionId, ConnectionState>>,
|
||||
|
@ -376,9 +337,12 @@ impl Peer {
|
|||
"incoming stream response: requester resumed"
|
||||
);
|
||||
} else {
|
||||
let message_type =
|
||||
proto::build_typed_envelope(connection_id, received_at, incoming)
|
||||
.map(|p| p.payload_type_name());
|
||||
let message_type = proto::build_typed_envelope(
|
||||
connection_id.into(),
|
||||
received_at,
|
||||
incoming,
|
||||
)
|
||||
.map(|p| p.payload_type_name());
|
||||
tracing::warn!(
|
||||
%connection_id,
|
||||
message_id,
|
||||
|
@ -391,16 +355,15 @@ impl Peer {
|
|||
None
|
||||
} else {
|
||||
tracing::trace!(%connection_id, message_id, "incoming message: received");
|
||||
proto::build_typed_envelope(connection_id, received_at, incoming).or_else(
|
||||
|| {
|
||||
proto::build_typed_envelope(connection_id.into(), received_at, incoming)
|
||||
.or_else(|| {
|
||||
tracing::error!(
|
||||
%connection_id,
|
||||
message_id,
|
||||
"unable to construct a typed envelope"
|
||||
);
|
||||
None
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -475,7 +438,7 @@ impl Peer {
|
|||
let (response, received_at) = response.await?;
|
||||
Ok(TypedEnvelope {
|
||||
message_id: response.id,
|
||||
sender_id: receiver_id,
|
||||
sender_id: receiver_id.into(),
|
||||
original_sender_id: response.original_sender_id,
|
||||
payload: T::Response::from_envelope(response)
|
||||
.ok_or_else(|| anyhow!("received response of the wrong type"))?,
|
||||
|
@ -619,7 +582,7 @@ impl Peer {
|
|||
receipt: Receipt<T>,
|
||||
response: T::Response,
|
||||
) -> Result<()> {
|
||||
let connection = self.connection_state(receipt.sender_id)?;
|
||||
let connection = self.connection_state(receipt.sender_id.into())?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
|
@ -634,7 +597,7 @@ impl Peer {
|
|||
}
|
||||
|
||||
pub fn end_stream<T: RequestMessage>(&self, receipt: Receipt<T>) -> Result<()> {
|
||||
let connection = self.connection_state(receipt.sender_id)?;
|
||||
let connection = self.connection_state(receipt.sender_id.into())?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
|
@ -656,7 +619,7 @@ impl Peer {
|
|||
receipt: Receipt<T>,
|
||||
response: proto::Error,
|
||||
) -> Result<()> {
|
||||
let connection = self.connection_state(receipt.sender_id)?;
|
||||
let connection = self.connection_state(receipt.sender_id.into())?;
|
||||
let message_id = connection
|
||||
.next_message_id
|
||||
.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
|
@ -674,7 +637,7 @@ impl Peer {
|
|||
&self,
|
||||
envelope: Box<dyn AnyTypedEnvelope>,
|
||||
) -> Result<()> {
|
||||
let connection = self.connection_state(envelope.sender_id())?;
|
||||
let connection = self.connection_state(envelope.sender_id().into())?;
|
||||
let response = ErrorCode::Internal
|
||||
.message(format!(
|
||||
"message {} was not handled",
|
||||
|
@ -717,7 +680,6 @@ impl Serialize for Peer {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::TypedEnvelope;
|
||||
use async_tungstenite::tungstenite::Message as WebSocketMessage;
|
||||
use gpui::TestAppContext;
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue