Split protobufs into separate files (#28130)

The one big protobuf file was getting a bit difficult to navigate. I
split it into separate topic-specific files that import each other.

Release Notes:

- N/A
This commit is contained in:
Max Brunsfeld 2025-04-04 16:15:49 -07:00 committed by GitHub
parent e74af03065
commit 8ab252c42d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 3454 additions and 3387 deletions

View file

@ -1,8 +1,8 @@
use super::{
Connection,
message_stream::{Message, MessageStream},
proto::{
self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, Receipt, RequestMessage,
TypedEnvelope,
self, AnyTypedEnvelope, EnvelopedMessage, PeerId, Receipt, RequestMessage, TypedEnvelope,
},
};
use anyhow::{Context as _, Result, anyhow};
@ -67,7 +67,7 @@ pub struct Peer {
#[derive(Clone, Serialize)]
pub struct ConnectionState {
#[serde(skip)]
outgoing_tx: mpsc::UnboundedSender<proto::Message>,
outgoing_tx: mpsc::UnboundedSender<Message>,
next_message_id: Arc<AtomicU32>,
#[allow(clippy::type_complexity)]
#[serde(skip)]
@ -209,7 +209,7 @@ impl Peer {
_ = keepalive_timer => {
tracing::trace!(%connection_id, "keepalive interval: pinging");
futures::select_biased! {
result = writer.write(proto::Message::Ping).fuse() => {
result = writer.write(Message::Ping).fuse() => {
tracing::trace!(%connection_id, "keepalive interval: done pinging");
result.context("failed to send keepalive")?;
tracing::trace!(%connection_id, "keepalive interval: resetting after pinging");
@ -226,7 +226,7 @@ impl Peer {
tracing::trace!(%connection_id, "incoming rpc message: received");
tracing::trace!(%connection_id, "receive timeout: resetting");
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
if let (proto::Message::Envelope(incoming), received_at) = incoming {
if let (Message::Envelope(incoming), received_at) = incoming {
tracing::trace!(%connection_id, "incoming rpc message: processing");
futures::select_biased! {
result = incoming_tx.send((incoming, received_at)).fuse() => match result {
@ -469,7 +469,7 @@ impl Peer {
.insert(envelope.id, tx);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(envelope))
.unbounded_send(Message::Envelope(envelope))
.map_err(|_| anyhow!("connection was closed"))?;
Ok(())
});
@ -500,7 +500,7 @@ impl Peer {
.insert(message_id, tx);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(
.unbounded_send(Message::Envelope(
request.into_envelope(message_id, None, None),
))
.map_err(|_| anyhow!("connection was closed"))?;
@ -545,11 +545,9 @@ impl Peer {
let message_id = connection
.next_message_id
.fetch_add(1, atomic::Ordering::SeqCst);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(
message.into_envelope(message_id, None, None),
))?;
connection.outgoing_tx.unbounded_send(Message::Envelope(
message.into_envelope(message_id, None, None),
))?;
Ok(())
}
@ -557,7 +555,7 @@ impl Peer {
let connection = self.connection_state(receiver_id)?;
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(message))?;
.unbounded_send(Message::Envelope(message))?;
Ok(())
}
@ -573,7 +571,7 @@ impl Peer {
.fetch_add(1, atomic::Ordering::SeqCst);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(message.into_envelope(
.unbounded_send(Message::Envelope(message.into_envelope(
message_id,
None,
Some(sender_id.into()),
@ -592,7 +590,7 @@ impl Peer {
.fetch_add(1, atomic::Ordering::SeqCst);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(response.into_envelope(
.unbounded_send(Message::Envelope(response.into_envelope(
message_id,
Some(receipt.message_id),
None,
@ -610,7 +608,7 @@ impl Peer {
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(message.into_envelope(
.unbounded_send(Message::Envelope(message.into_envelope(
message_id,
Some(receipt.message_id),
None,
@ -629,7 +627,7 @@ impl Peer {
.fetch_add(1, atomic::Ordering::SeqCst);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(response.into_envelope(
.unbounded_send(Message::Envelope(response.into_envelope(
message_id,
Some(receipt.message_id),
None,
@ -652,7 +650,7 @@ impl Peer {
.fetch_add(1, atomic::Ordering::SeqCst);
connection
.outgoing_tx
.unbounded_send(proto::Message::Envelope(response.into_envelope(
.unbounded_send(Message::Envelope(response.into_envelope(
message_id,
Some(request_message_id),
None,