Introduce an epoch to ConnectionId and PeerId

This commit is contained in:
Antonio Scandurra 2022-12-14 15:55:56 +01:00
parent 9bd400cf16
commit 05e99eb67e
24 changed files with 714 additions and 382 deletions

View file

@ -1,10 +1,15 @@
syntax = "proto3";
package zed.messages;
message PeerId {
uint32 epoch = 1;
uint32 id = 2;
}
message Envelope {
uint32 id = 1;
optional uint32 responding_to = 2;
optional uint32 original_sender_id = 3;
PeerId original_sender_id = 3;
oneof payload {
Hello hello = 4;
Ack ack = 5;
@ -125,7 +130,7 @@ message Envelope {
// Messages
message Hello {
uint32 peer_id = 1;
PeerId peer_id = 1;
}
message Ping {}
@ -167,7 +172,7 @@ message Room {
message Participant {
uint64 user_id = 1;
uint32 peer_id = 2;
PeerId peer_id = 2;
repeated ParticipantProject projects = 3;
ParticipantLocation location = 4;
}
@ -319,7 +324,7 @@ message AddProjectCollaborator {
message RemoveProjectCollaborator {
uint64 project_id = 1;
uint32 peer_id = 2;
PeerId peer_id = 2;
}
message GetDefinition {
@ -438,7 +443,7 @@ message OpenBufferResponse {
message CreateBufferForPeer {
uint64 project_id = 1;
uint32 peer_id = 2;
PeerId peer_id = 2;
oneof variant {
BufferState state = 3;
BufferChunk chunk = 4;
@ -794,7 +799,7 @@ message UpdateDiagnostics {
message Follow {
uint64 project_id = 1;
uint32 leader_id = 2;
PeerId leader_id = 2;
}
message FollowResponse {
@ -804,7 +809,7 @@ message FollowResponse {
message UpdateFollowers {
uint64 project_id = 1;
repeated uint32 follower_ids = 2;
repeated PeerId follower_ids = 2;
oneof variant {
UpdateActiveView update_active_view = 3;
View create_view = 4;
@ -814,7 +819,7 @@ message UpdateFollowers {
message Unfollow {
uint64 project_id = 1;
uint32 leader_id = 2;
PeerId leader_id = 2;
}
message GetPrivateUserInfo {}
@ -828,12 +833,12 @@ message GetPrivateUserInfoResponse {
message UpdateActiveView {
optional uint64 id = 1;
optional uint32 leader_id = 2;
optional PeerId leader_id = 2;
}
message UpdateView {
uint64 id = 1;
optional uint32 leader_id = 2;
optional PeerId leader_id = 2;
oneof variant {
Editor editor = 3;
@ -849,7 +854,7 @@ message UpdateView {
message View {
uint64 id = 1;
optional uint32 leader_id = 2;
optional PeerId leader_id = 2;
oneof variant {
Editor editor = 3;
@ -865,7 +870,7 @@ message View {
}
message Collaborator {
uint32 peer_id = 1;
PeerId peer_id = 1;
uint32 replica_id = 2;
uint64 user_id = 3;
}

View file

@ -6,7 +6,10 @@ macro_rules! messages {
$(Some(envelope::Payload::$name(payload)) => {
Some(Box::new(TypedEnvelope {
sender_id,
original_sender_id: envelope.original_sender_id.map(PeerId),
original_sender_id: envelope.original_sender_id.map(|original_sender| PeerId {
epoch: original_sender.epoch,
id: original_sender.id
}),
message_id: envelope.id,
payload,
}))
@ -24,7 +27,7 @@ macro_rules! messages {
self,
id: u32,
responding_to: Option<u32>,
original_sender_id: Option<u32>,
original_sender_id: Option<PeerId>,
) -> Envelope {
Envelope {
id,

View file

@ -1,5 +1,5 @@
use super::{
proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage},
proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage},
Connection,
};
use anyhow::{anyhow, Context, Result};
@ -11,9 +11,8 @@ use futures::{
};
use parking_lot::{Mutex, RwLock};
use serde::{ser::SerializeStruct, Serialize};
use std::sync::atomic::Ordering::SeqCst;
use std::{fmt, sync::atomic::Ordering::SeqCst};
use std::{
fmt,
future::Future,
marker::PhantomData,
sync::{
@ -25,20 +24,32 @@ use std::{
use tracing::instrument;
#[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize)]
pub struct ConnectionId(pub u32);
pub struct ConnectionId {
pub epoch: u32,
pub id: u32,
}
impl fmt::Display for ConnectionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
impl Into<PeerId> for ConnectionId {
fn into(self) -> PeerId {
PeerId {
epoch: self.epoch,
id: self.id,
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct PeerId(pub u32);
impl From<PeerId> for ConnectionId {
fn from(peer_id: PeerId) -> Self {
Self {
epoch: peer_id.epoch,
id: peer_id.id,
}
}
}
impl fmt::Display for PeerId {
impl fmt::Display for ConnectionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
write!(f, "{}/{}", self.epoch, self.id)
}
}
@ -70,6 +81,7 @@ pub struct TypedEnvelope<T> {
impl<T> TypedEnvelope<T> {
pub fn original_sender_id(&self) -> Result<PeerId> {
self.original_sender_id
.clone()
.ok_or_else(|| anyhow!("missing original_sender_id"))
}
}
@ -85,6 +97,7 @@ impl<T: RequestMessage> TypedEnvelope<T> {
}
pub struct Peer {
epoch: u32,
pub connections: RwLock<HashMap<ConnectionId, ConnectionState>>,
next_connection_id: AtomicU32,
}
@ -105,8 +118,9 @@ const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(5);
impl Peer {
pub fn new() -> Arc<Self> {
pub fn new(epoch: u32) -> Arc<Self> {
Arc::new(Self {
epoch,
connections: Default::default(),
next_connection_id: Default::default(),
})
@ -138,7 +152,10 @@ impl Peer {
let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE);
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded();
let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
let connection_id = ConnectionId {
epoch: self.epoch,
id: self.next_connection_id.fetch_add(1, SeqCst),
};
let connection_state = ConnectionState {
outgoing_tx,
next_message_id: Default::default(),
@ -150,12 +167,12 @@ impl Peer {
let this = self.clone();
let response_channels = connection_state.response_channels.clone();
let handle_io = async move {
tracing::debug!(%connection_id, "handle io future: start");
tracing::debug!(?connection_id, "handle io future: start");
let _end_connection = util::defer(|| {
response_channels.lock().take();
this.connections.write().remove(&connection_id);
tracing::debug!(%connection_id, "handle io future: end");
tracing::debug!(?connection_id, "handle io future: end");
});
// Send messages on this frequency so the connection isn't closed.
@ -167,68 +184,68 @@ impl Peer {
futures::pin_mut!(receive_timeout);
loop {
tracing::debug!(%connection_id, "outer loop iteration start");
tracing::debug!(?connection_id, "outer loop iteration start");
let read_message = reader.read().fuse();
futures::pin_mut!(read_message);
loop {
tracing::debug!(%connection_id, "inner loop iteration start");
tracing::debug!(?connection_id, "inner loop iteration start");
futures::select_biased! {
outgoing = outgoing_rx.next().fuse() => match outgoing {
Some(outgoing) => {
tracing::debug!(%connection_id, "outgoing rpc message: writing");
tracing::debug!(?connection_id, "outgoing rpc message: writing");
futures::select_biased! {
result = writer.write(outgoing).fuse() => {
tracing::debug!(%connection_id, "outgoing rpc message: done writing");
tracing::debug!(?connection_id, "outgoing rpc message: done writing");
result.context("failed to write RPC message")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after sending message");
tracing::debug!(?connection_id, "keepalive interval: resetting after sending message");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
}
_ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(%connection_id, "outgoing rpc message: writing timed out");
tracing::debug!(?connection_id, "outgoing rpc message: writing timed out");
Err(anyhow!("timed out writing message"))?;
}
}
}
None => {
tracing::debug!(%connection_id, "outgoing rpc message: channel closed");
tracing::debug!(?connection_id, "outgoing rpc message: channel closed");
return Ok(())
},
},
_ = keepalive_timer => {
tracing::debug!(%connection_id, "keepalive interval: pinging");
tracing::debug!(?connection_id, "keepalive interval: pinging");
futures::select_biased! {
result = writer.write(proto::Message::Ping).fuse() => {
tracing::debug!(%connection_id, "keepalive interval: done pinging");
tracing::debug!(?connection_id, "keepalive interval: done pinging");
result.context("failed to send keepalive")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
tracing::debug!(?connection_id, "keepalive interval: resetting after pinging");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
}
_ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
tracing::debug!(?connection_id, "keepalive interval: pinging timed out");
Err(anyhow!("timed out sending keepalive"))?;
}
}
}
incoming = read_message => {
let incoming = incoming.context("error reading rpc message from socket")?;
tracing::debug!(%connection_id, "incoming rpc message: received");
tracing::debug!(%connection_id, "receive timeout: resetting");
tracing::debug!(?connection_id, "incoming rpc message: received");
tracing::debug!(?connection_id, "receive timeout: resetting");
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
if let proto::Message::Envelope(incoming) = incoming {
tracing::debug!(%connection_id, "incoming rpc message: processing");
tracing::debug!(?connection_id, "incoming rpc message: processing");
futures::select_biased! {
result = incoming_tx.send(incoming).fuse() => match result {
Ok(_) => {
tracing::debug!(%connection_id, "incoming rpc message: processed");
tracing::debug!(?connection_id, "incoming rpc message: processed");
}
Err(_) => {
tracing::debug!(%connection_id, "incoming rpc message: channel closed");
tracing::debug!(?connection_id, "incoming rpc message: channel closed");
return Ok(())
}
},
_ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(%connection_id, "incoming rpc message: processing timed out");
tracing::debug!(?connection_id, "incoming rpc message: processing timed out");
Err(anyhow!("timed out processing incoming message"))?
}
}
@ -236,7 +253,7 @@ impl Peer {
break;
},
_ = receive_timeout => {
tracing::debug!(%connection_id, "receive timeout: delay between messages too long");
tracing::debug!(?connection_id, "receive timeout: delay between messages too long");
Err(anyhow!("delay between messages too long"))?
}
}
@ -255,16 +272,12 @@ impl Peer {
let message_id = incoming.id;
tracing::debug!(?incoming, "incoming message future: start");
let _end = util::defer(move || {
tracing::debug!(
%connection_id,
message_id,
"incoming message future: end"
);
tracing::debug!(?connection_id, message_id, "incoming message future: end");
});
if let Some(responding_to) = incoming.responding_to {
tracing::debug!(
%connection_id,
?connection_id,
message_id,
responding_to,
"incoming response: received"
@ -274,7 +287,7 @@ impl Peer {
let requester_resumed = oneshot::channel();
if let Err(error) = tx.send((incoming, requester_resumed.0)) {
tracing::debug!(
%connection_id,
?connection_id,
message_id,
responding_to = responding_to,
?error,
@ -283,21 +296,21 @@ impl Peer {
}
tracing::debug!(
%connection_id,
?connection_id,
message_id,
responding_to,
"incoming response: waiting to resume requester"
);
let _ = requester_resumed.1.await;
tracing::debug!(
%connection_id,
?connection_id,
message_id,
responding_to,
"incoming response: requester resumed"
);
} else {
tracing::warn!(
%connection_id,
?connection_id,
message_id,
responding_to,
"incoming response: unknown request"
@ -306,14 +319,10 @@ impl Peer {
None
} else {
tracing::debug!(
%connection_id,
message_id,
"incoming message: received"
);
tracing::debug!(?connection_id, message_id, "incoming message: received");
proto::build_typed_envelope(connection_id, incoming).or_else(|| {
tracing::error!(
%connection_id,
?connection_id,
message_id,
"unable to construct a typed envelope"
);
@ -345,6 +354,7 @@ impl Peer {
pub fn reset(&self) {
self.connections.write().clear();
self.next_connection_id.store(0, SeqCst);
}
pub fn request<T: RequestMessage>(
@ -384,7 +394,7 @@ impl Peer {
.unbounded_send(proto::Message::Envelope(request.into_envelope(
message_id,
None,
original_sender_id.map(|id| id.0),
original_sender_id.map(Into::into),
)))
.map_err(|_| anyhow!("connection was closed"))?;
Ok(())
@ -433,7 +443,7 @@ impl Peer {
.unbounded_send(proto::Message::Envelope(message.into_envelope(
message_id,
None,
Some(sender_id.0),
Some(sender_id.into()),
)))?;
Ok(())
}
@ -480,7 +490,7 @@ impl Peer {
let connections = self.connections.read();
let connection = connections
.get(&connection_id)
.ok_or_else(|| anyhow!("no such connection: {}", connection_id))?;
.ok_or_else(|| anyhow!("no such connection: {:?}", connection_id))?;
Ok(connection.clone())
}
}
@ -515,9 +525,9 @@ mod tests {
let executor = cx.foreground();
// create 2 clients connected to 1 server
let server = Peer::new();
let client1 = Peer::new();
let client2 = Peer::new();
let server = Peer::new(0);
let client1 = Peer::new(0);
let client2 = Peer::new(0);
let (client1_to_server_conn, server_to_client_1_conn, _kill) =
Connection::in_memory(cx.background());
@ -609,8 +619,8 @@ mod tests {
#[gpui::test(iterations = 50)]
async fn test_order_of_response_and_incoming(cx: &mut TestAppContext) {
let executor = cx.foreground();
let server = Peer::new();
let client = Peer::new();
let server = Peer::new(0);
let client = Peer::new(0);
let (client_to_server_conn, server_to_client_conn, _kill) =
Connection::in_memory(cx.background());
@ -707,8 +717,8 @@ mod tests {
#[gpui::test(iterations = 50)]
async fn test_dropping_request_before_completion(cx: &mut TestAppContext) {
let executor = cx.foreground();
let server = Peer::new();
let client = Peer::new();
let server = Peer::new(0);
let client = Peer::new(0);
let (client_to_server_conn, server_to_client_conn, _kill) =
Connection::in_memory(cx.background());
@ -822,7 +832,7 @@ mod tests {
let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background());
let client = Peer::new();
let client = Peer::new(0);
let (connection_id, io_handler, mut incoming) =
client.add_test_connection(client_conn, cx.background());
@ -857,7 +867,7 @@ mod tests {
let executor = cx.foreground();
let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background());
let client = Peer::new();
let client = Peer::new(0);
let (connection_id, io_handler, mut incoming) =
client.add_test_connection(client_conn, cx.background());
executor.spawn(io_handler).detach();

View file

@ -1,14 +1,16 @@
use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
use anyhow::{anyhow, Result};
use async_tungstenite::tungstenite::Message as WebSocketMessage;
use futures::{SinkExt as _, StreamExt as _};
use prost::Message as _;
use serde::Serialize;
use std::any::{Any, TypeId};
use std::{cmp, iter, mem};
use std::fmt;
use std::str::FromStr;
use std::{
cmp,
fmt::Debug,
io,
io, iter, mem,
time::{Duration, SystemTime, UNIX_EPOCH},
};
@ -21,7 +23,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's
self,
id: u32,
responding_to: Option<u32>,
original_sender_id: Option<u32>,
original_sender_id: Option<PeerId>,
) -> Envelope;
fn from_envelope(envelope: Envelope) -> Option<Self>;
}
@ -70,7 +72,67 @@ impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
}
fn original_sender_id(&self) -> Option<PeerId> {
self.original_sender_id
self.original_sender_id.clone()
}
}
impl PeerId {
pub fn from_u64(peer_id: u64) -> Self {
let epoch = (peer_id >> 32) as u32;
let id = peer_id as u32;
Self { epoch, id }
}
pub fn as_u64(self) -> u64 {
((self.epoch as u64) << 32) | (self.id as u64)
}
}
impl Copy for PeerId {}
impl Eq for PeerId {}
impl Ord for PeerId {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.epoch
.cmp(&other.epoch)
.then_with(|| self.id.cmp(&other.id))
}
}
impl PartialOrd for PeerId {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl std::hash::Hash for PeerId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.epoch.hash(state);
self.id.hash(state);
}
}
impl fmt::Display for PeerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.epoch, self.id)
}
}
impl FromStr for PeerId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut components = s.split('/');
let epoch = components
.next()
.ok_or_else(|| anyhow!("invalid peer id {:?}", s))?
.parse()?;
let id = components
.next()
.ok_or_else(|| anyhow!("invalid peer id {:?}", s))?
.parse()?;
Ok(PeerId { epoch, id })
}
}
@ -477,4 +539,25 @@ mod tests {
stream.read().await.unwrap();
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
}
#[gpui::test]
fn test_converting_peer_id_from_and_to_u64() {
let peer_id = PeerId { epoch: 10, id: 3 };
assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
let peer_id = PeerId {
epoch: u32::MAX,
id: 3,
};
assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
let peer_id = PeerId {
epoch: 10,
id: u32::MAX,
};
assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
let peer_id = PeerId {
epoch: u32::MAX,
id: u32::MAX,
};
assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
}
}

View file

@ -6,4 +6,4 @@ pub use conn::Connection;
pub use peer::*;
mod macros;
pub const PROTOCOL_VERSION: u32 = 41;
pub const PROTOCOL_VERSION: u32 = 42;