This commit is contained in:
Antonio Scandurra 2022-12-15 16:34:59 +01:00
parent 5720c43fe7
commit 5a334622ea
5 changed files with 40 additions and 41 deletions

View file

@ -488,9 +488,9 @@ impl Server {
move |duration| executor.sleep(duration) move |duration| executor.sleep(duration)
}); });
tracing::info!(%user_id, %login, ?connection_id, %address, "connection opened"); tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?; this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
tracing::info!(%user_id, %login, ?connection_id, %address, "sent hello message"); tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
if let Some(send_connection_id) = send_connection_id.take() { if let Some(send_connection_id) = send_connection_id.take() {
let _ = send_connection_id.send(connection_id); let _ = send_connection_id.send(connection_id);
@ -552,7 +552,7 @@ impl Server {
_ = teardown.changed().fuse() => return Ok(()), _ = teardown.changed().fuse() => return Ok(()),
result = handle_io => { result = handle_io => {
if let Err(error) = result { if let Err(error) = result {
tracing::error!(?error, %user_id, %login, ?connection_id, %address, "error handling I/O"); tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
} }
break; break;
} }
@ -560,7 +560,7 @@ impl Server {
message = next_message => { message = next_message => {
if let Some(message) = message { if let Some(message) = message {
let type_name = message.payload_type_name(); let type_name = message.payload_type_name();
let span = tracing::info_span!("receive message", %user_id, %login, ?connection_id, %address, type_name); let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
let span_enter = span.enter(); let span_enter = span.enter();
if let Some(handler) = this.handlers.get(&message.payload_type_id()) { if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
let is_background = message.is_background(); let is_background = message.is_background();
@ -574,10 +574,10 @@ impl Server {
foreground_message_handlers.push(handle_message); foreground_message_handlers.push(handle_message);
} }
} else { } else {
tracing::error!(%user_id, %login, ?connection_id, %address, "no message handler"); tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
} }
} else { } else {
tracing::info!(%user_id, %login, ?connection_id, %address, "connection closed"); tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
break; break;
} }
} }
@ -585,9 +585,9 @@ impl Server {
} }
drop(foreground_message_handlers); drop(foreground_message_handlers);
tracing::info!(%user_id, %login, ?connection_id, %address, "signing out"); tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
if let Err(error) = sign_out(session, teardown, executor).await { if let Err(error) = sign_out(session, teardown, executor).await {
tracing::error!(%user_id, %login, ?connection_id, %address, ?error, "error signing out"); tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
} }
Ok(()) Ok(())

View file

@ -9,7 +9,7 @@ message PeerId {
message Envelope { message Envelope {
uint32 id = 1; uint32 id = 1;
optional uint32 responding_to = 2; optional uint32 responding_to = 2;
PeerId original_sender_id = 3; optional PeerId original_sender_id = 3;
oneof payload { oneof payload {
Hello hello = 4; Hello hello = 4;
Ack ack = 5; Ack ack = 5;

View file

@ -81,7 +81,6 @@ pub struct TypedEnvelope<T> {
impl<T> TypedEnvelope<T> { impl<T> TypedEnvelope<T> {
pub fn original_sender_id(&self) -> Result<PeerId> { pub fn original_sender_id(&self) -> Result<PeerId> {
self.original_sender_id self.original_sender_id
.clone()
.ok_or_else(|| anyhow!("missing original_sender_id")) .ok_or_else(|| anyhow!("missing original_sender_id"))
} }
} }
@ -171,12 +170,12 @@ impl Peer {
let this = self.clone(); let this = self.clone();
let response_channels = connection_state.response_channels.clone(); let response_channels = connection_state.response_channels.clone();
let handle_io = async move { let handle_io = async move {
tracing::debug!(?connection_id, "handle io future: start"); tracing::debug!(%connection_id, "handle io future: start");
let _end_connection = util::defer(|| { let _end_connection = util::defer(|| {
response_channels.lock().take(); response_channels.lock().take();
this.connections.write().remove(&connection_id); this.connections.write().remove(&connection_id);
tracing::debug!(?connection_id, "handle io future: end"); tracing::debug!(%connection_id, "handle io future: end");
}); });
// Send messages on this frequency so the connection isn't closed. // Send messages on this frequency so the connection isn't closed.
@ -188,68 +187,68 @@ impl Peer {
futures::pin_mut!(receive_timeout); futures::pin_mut!(receive_timeout);
loop { loop {
tracing::debug!(?connection_id, "outer loop iteration start"); tracing::debug!(%connection_id, "outer loop iteration start");
let read_message = reader.read().fuse(); let read_message = reader.read().fuse();
futures::pin_mut!(read_message); futures::pin_mut!(read_message);
loop { loop {
tracing::debug!(?connection_id, "inner loop iteration start"); tracing::debug!(%connection_id, "inner loop iteration start");
futures::select_biased! { futures::select_biased! {
outgoing = outgoing_rx.next().fuse() => match outgoing { outgoing = outgoing_rx.next().fuse() => match outgoing {
Some(outgoing) => { Some(outgoing) => {
tracing::debug!(?connection_id, "outgoing rpc message: writing"); tracing::debug!(%connection_id, "outgoing rpc message: writing");
futures::select_biased! { futures::select_biased! {
result = writer.write(outgoing).fuse() => { result = writer.write(outgoing).fuse() => {
tracing::debug!(?connection_id, "outgoing rpc message: done writing"); tracing::debug!(%connection_id, "outgoing rpc message: done writing");
result.context("failed to write RPC message")?; result.context("failed to write RPC message")?;
tracing::debug!(?connection_id, "keepalive interval: resetting after sending message"); tracing::debug!(%connection_id, "keepalive interval: resetting after sending message");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
} }
_ = create_timer(WRITE_TIMEOUT).fuse() => { _ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(?connection_id, "outgoing rpc message: writing timed out"); tracing::debug!(%connection_id, "outgoing rpc message: writing timed out");
Err(anyhow!("timed out writing message"))?; Err(anyhow!("timed out writing message"))?;
} }
} }
} }
None => { None => {
tracing::debug!(?connection_id, "outgoing rpc message: channel closed"); tracing::debug!(%connection_id, "outgoing rpc message: channel closed");
return Ok(()) return Ok(())
}, },
}, },
_ = keepalive_timer => { _ = keepalive_timer => {
tracing::debug!(?connection_id, "keepalive interval: pinging"); tracing::debug!(%connection_id, "keepalive interval: pinging");
futures::select_biased! { futures::select_biased! {
result = writer.write(proto::Message::Ping).fuse() => { result = writer.write(proto::Message::Ping).fuse() => {
tracing::debug!(?connection_id, "keepalive interval: done pinging"); tracing::debug!(%connection_id, "keepalive interval: done pinging");
result.context("failed to send keepalive")?; result.context("failed to send keepalive")?;
tracing::debug!(?connection_id, "keepalive interval: resetting after pinging"); tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
} }
_ = create_timer(WRITE_TIMEOUT).fuse() => { _ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(?connection_id, "keepalive interval: pinging timed out"); tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
Err(anyhow!("timed out sending keepalive"))?; Err(anyhow!("timed out sending keepalive"))?;
} }
} }
} }
incoming = read_message => { incoming = read_message => {
let incoming = incoming.context("error reading rpc message from socket")?; let incoming = incoming.context("error reading rpc message from socket")?;
tracing::debug!(?connection_id, "incoming rpc message: received"); tracing::debug!(%connection_id, "incoming rpc message: received");
tracing::debug!(?connection_id, "receive timeout: resetting"); tracing::debug!(%connection_id, "receive timeout: resetting");
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
if let proto::Message::Envelope(incoming) = incoming { if let proto::Message::Envelope(incoming) = incoming {
tracing::debug!(?connection_id, "incoming rpc message: processing"); tracing::debug!(%connection_id, "incoming rpc message: processing");
futures::select_biased! { futures::select_biased! {
result = incoming_tx.send(incoming).fuse() => match result { result = incoming_tx.send(incoming).fuse() => match result {
Ok(_) => { Ok(_) => {
tracing::debug!(?connection_id, "incoming rpc message: processed"); tracing::debug!(%connection_id, "incoming rpc message: processed");
} }
Err(_) => { Err(_) => {
tracing::debug!(?connection_id, "incoming rpc message: channel closed"); tracing::debug!(%connection_id, "incoming rpc message: channel closed");
return Ok(()) return Ok(())
} }
}, },
_ = create_timer(WRITE_TIMEOUT).fuse() => { _ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(?connection_id, "incoming rpc message: processing timed out"); tracing::debug!(%connection_id, "incoming rpc message: processing timed out");
Err(anyhow!("timed out processing incoming message"))? Err(anyhow!("timed out processing incoming message"))?
} }
} }
@ -257,7 +256,7 @@ impl Peer {
break; break;
}, },
_ = receive_timeout => { _ = receive_timeout => {
tracing::debug!(?connection_id, "receive timeout: delay between messages too long"); tracing::debug!(%connection_id, "receive timeout: delay between messages too long");
Err(anyhow!("delay between messages too long"))? Err(anyhow!("delay between messages too long"))?
} }
} }
@ -276,12 +275,12 @@ impl Peer {
let message_id = incoming.id; let message_id = incoming.id;
tracing::debug!(?incoming, "incoming message future: start"); tracing::debug!(?incoming, "incoming message future: start");
let _end = util::defer(move || { let _end = util::defer(move || {
tracing::debug!(?connection_id, message_id, "incoming message future: end"); tracing::debug!(%connection_id, message_id, "incoming message future: end");
}); });
if let Some(responding_to) = incoming.responding_to { if let Some(responding_to) = incoming.responding_to {
tracing::debug!( tracing::debug!(
?connection_id, %connection_id,
message_id, message_id,
responding_to, responding_to,
"incoming response: received" "incoming response: received"
@ -291,7 +290,7 @@ impl Peer {
let requester_resumed = oneshot::channel(); let requester_resumed = oneshot::channel();
if let Err(error) = tx.send((incoming, requester_resumed.0)) { if let Err(error) = tx.send((incoming, requester_resumed.0)) {
tracing::debug!( tracing::debug!(
?connection_id, %connection_id,
message_id, message_id,
responding_to = responding_to, responding_to = responding_to,
?error, ?error,
@ -300,21 +299,21 @@ impl Peer {
} }
tracing::debug!( tracing::debug!(
?connection_id, %connection_id,
message_id, message_id,
responding_to, responding_to,
"incoming response: waiting to resume requester" "incoming response: waiting to resume requester"
); );
let _ = requester_resumed.1.await; let _ = requester_resumed.1.await;
tracing::debug!( tracing::debug!(
?connection_id, %connection_id,
message_id, message_id,
responding_to, responding_to,
"incoming response: requester resumed" "incoming response: requester resumed"
); );
} else { } else {
tracing::warn!( tracing::warn!(
?connection_id, %connection_id,
message_id, message_id,
responding_to, responding_to,
"incoming response: unknown request" "incoming response: unknown request"
@ -323,10 +322,10 @@ impl Peer {
None None
} else { } else {
tracing::debug!(?connection_id, message_id, "incoming message: received"); tracing::debug!(%connection_id, message_id, "incoming message: received");
proto::build_typed_envelope(connection_id, incoming).or_else(|| { proto::build_typed_envelope(connection_id, incoming).or_else(|| {
tracing::error!( tracing::error!(
?connection_id, %connection_id,
message_id, message_id,
"unable to construct a typed envelope" "unable to construct a typed envelope"
); );
@ -499,7 +498,7 @@ impl Peer {
let connections = self.connections.read(); let connections = self.connections.read();
let connection = connections let connection = connections
.get(&connection_id) .get(&connection_id)
.ok_or_else(|| anyhow!("no such connection: {:?}", connection_id))?; .ok_or_else(|| anyhow!("no such connection: {}", connection_id))?;
Ok(connection.clone()) Ok(connection.clone())
} }
} }

View file

@ -72,7 +72,7 @@ impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
} }
fn original_sender_id(&self) -> Option<PeerId> { fn original_sender_id(&self) -> Option<PeerId> {
self.original_sender_id.clone() self.original_sender_id
} }
} }

View file

@ -2050,7 +2050,7 @@ impl Workspace {
.get(&leader_id) .get(&leader_id)
.map(|c| c.replica_id) .map(|c| c.replica_id)
}) })
.ok_or_else(|| anyhow!("no such collaborator {:?}", leader_id))?; .ok_or_else(|| anyhow!("no such collaborator {}", leader_id))?;
let item_builders = cx.update(|cx| { let item_builders = cx.update(|cx| {
cx.default_global::<FollowableItemBuilders>() cx.default_global::<FollowableItemBuilders>()