diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 8b16a3a05b..6a8ae61ed0 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -488,9 +488,9 @@ impl Server { move |duration| executor.sleep(duration) }); - tracing::info!(%user_id, %login, ?connection_id, %address, "connection opened"); + tracing::info!(%user_id, %login, %connection_id, %address, "connection opened"); this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?; - tracing::info!(%user_id, %login, ?connection_id, %address, "sent hello message"); + tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message"); if let Some(send_connection_id) = send_connection_id.take() { let _ = send_connection_id.send(connection_id); @@ -552,7 +552,7 @@ impl Server { _ = teardown.changed().fuse() => return Ok(()), result = handle_io => { if let Err(error) = result { - tracing::error!(?error, %user_id, %login, ?connection_id, %address, "error handling I/O"); + tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O"); } break; } @@ -560,7 +560,7 @@ impl Server { message = next_message => { if let Some(message) = message { let type_name = message.payload_type_name(); - let span = tracing::info_span!("receive message", %user_id, %login, ?connection_id, %address, type_name); + let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name); let span_enter = span.enter(); if let Some(handler) = this.handlers.get(&message.payload_type_id()) { let is_background = message.is_background(); @@ -574,10 +574,10 @@ impl Server { foreground_message_handlers.push(handle_message); } } else { - tracing::error!(%user_id, %login, ?connection_id, %address, "no message handler"); + tracing::error!(%user_id, %login, %connection_id, %address, "no message handler"); } } else { - tracing::info!(%user_id, %login, ?connection_id, %address, "connection closed"); + tracing::info!(%user_id, %login, %connection_id, %address, "connection closed"); break; } } @@ -585,9 +585,9 @@ impl Server { } drop(foreground_message_handlers); - tracing::info!(%user_id, %login, ?connection_id, %address, "signing out"); + tracing::info!(%user_id, %login, %connection_id, %address, "signing out"); if let Err(error) = sign_out(session, teardown, executor).await { - tracing::error!(%user_id, %login, ?connection_id, %address, ?error, "error signing out"); + tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out"); } Ok(()) diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 0bd181862b..9528bd10b7 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -9,7 +9,7 @@ message PeerId { message Envelope { uint32 id = 1; optional uint32 responding_to = 2; - PeerId original_sender_id = 3; + optional PeerId original_sender_id = 3; oneof payload { Hello hello = 4; Ack ack = 5; diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 59a30a0f47..d2a4e6e080 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -81,7 +81,6 @@ pub struct TypedEnvelope { impl TypedEnvelope { pub fn original_sender_id(&self) -> Result { self.original_sender_id - .clone() .ok_or_else(|| anyhow!("missing original_sender_id")) } } @@ -171,12 +170,12 @@ impl Peer { let this = self.clone(); let response_channels = connection_state.response_channels.clone(); let handle_io = async move { - tracing::debug!(?connection_id, "handle io future: start"); + tracing::debug!(%connection_id, "handle io future: start"); let _end_connection = util::defer(|| { response_channels.lock().take(); this.connections.write().remove(&connection_id); - tracing::debug!(?connection_id, "handle io future: end"); + tracing::debug!(%connection_id, "handle io future: end"); }); // Send messages on this frequency so the connection isn't closed. @@ -188,68 +187,68 @@ impl Peer { futures::pin_mut!(receive_timeout); loop { - tracing::debug!(?connection_id, "outer loop iteration start"); + tracing::debug!(%connection_id, "outer loop iteration start"); let read_message = reader.read().fuse(); futures::pin_mut!(read_message); loop { - tracing::debug!(?connection_id, "inner loop iteration start"); + tracing::debug!(%connection_id, "inner loop iteration start"); futures::select_biased! { outgoing = outgoing_rx.next().fuse() => match outgoing { Some(outgoing) => { - tracing::debug!(?connection_id, "outgoing rpc message: writing"); + tracing::debug!(%connection_id, "outgoing rpc message: writing"); futures::select_biased! { result = writer.write(outgoing).fuse() => { - tracing::debug!(?connection_id, "outgoing rpc message: done writing"); + tracing::debug!(%connection_id, "outgoing rpc message: done writing"); result.context("failed to write RPC message")?; - tracing::debug!(?connection_id, "keepalive interval: resetting after sending message"); + tracing::debug!(%connection_id, "keepalive interval: resetting after sending message"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } _ = create_timer(WRITE_TIMEOUT).fuse() => { - tracing::debug!(?connection_id, "outgoing rpc message: writing timed out"); + tracing::debug!(%connection_id, "outgoing rpc message: writing timed out"); Err(anyhow!("timed out writing message"))?; } } } None => { - tracing::debug!(?connection_id, "outgoing rpc message: channel closed"); + tracing::debug!(%connection_id, "outgoing rpc message: channel closed"); return Ok(()) }, }, _ = keepalive_timer => { - tracing::debug!(?connection_id, "keepalive interval: pinging"); + tracing::debug!(%connection_id, "keepalive interval: pinging"); futures::select_biased! { result = writer.write(proto::Message::Ping).fuse() => { - tracing::debug!(?connection_id, "keepalive interval: done pinging"); + tracing::debug!(%connection_id, "keepalive interval: done pinging"); result.context("failed to send keepalive")?; - tracing::debug!(?connection_id, "keepalive interval: resetting after pinging"); + tracing::debug!(%connection_id, "keepalive interval: resetting after pinging"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } _ = create_timer(WRITE_TIMEOUT).fuse() => { - tracing::debug!(?connection_id, "keepalive interval: pinging timed out"); + tracing::debug!(%connection_id, "keepalive interval: pinging timed out"); Err(anyhow!("timed out sending keepalive"))?; } } } incoming = read_message => { let incoming = incoming.context("error reading rpc message from socket")?; - tracing::debug!(?connection_id, "incoming rpc message: received"); - tracing::debug!(?connection_id, "receive timeout: resetting"); + tracing::debug!(%connection_id, "incoming rpc message: received"); + tracing::debug!(%connection_id, "receive timeout: resetting"); receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { - tracing::debug!(?connection_id, "incoming rpc message: processing"); + tracing::debug!(%connection_id, "incoming rpc message: processing"); futures::select_biased! { result = incoming_tx.send(incoming).fuse() => match result { Ok(_) => { - tracing::debug!(?connection_id, "incoming rpc message: processed"); + tracing::debug!(%connection_id, "incoming rpc message: processed"); } Err(_) => { - tracing::debug!(?connection_id, "incoming rpc message: channel closed"); + tracing::debug!(%connection_id, "incoming rpc message: channel closed"); return Ok(()) } }, _ = create_timer(WRITE_TIMEOUT).fuse() => { - tracing::debug!(?connection_id, "incoming rpc message: processing timed out"); + tracing::debug!(%connection_id, "incoming rpc message: processing timed out"); Err(anyhow!("timed out processing incoming message"))? } } @@ -257,7 +256,7 @@ impl Peer { break; }, _ = receive_timeout => { - tracing::debug!(?connection_id, "receive timeout: delay between messages too long"); + tracing::debug!(%connection_id, "receive timeout: delay between messages too long"); Err(anyhow!("delay between messages too long"))? } } @@ -276,12 +275,12 @@ impl Peer { let message_id = incoming.id; tracing::debug!(?incoming, "incoming message future: start"); let _end = util::defer(move || { - tracing::debug!(?connection_id, message_id, "incoming message future: end"); + tracing::debug!(%connection_id, message_id, "incoming message future: end"); }); if let Some(responding_to) = incoming.responding_to { tracing::debug!( - ?connection_id, + %connection_id, message_id, responding_to, "incoming response: received" @@ -291,7 +290,7 @@ impl Peer { let requester_resumed = oneshot::channel(); if let Err(error) = tx.send((incoming, requester_resumed.0)) { tracing::debug!( - ?connection_id, + %connection_id, message_id, responding_to = responding_to, ?error, @@ -300,21 +299,21 @@ impl Peer { } tracing::debug!( - ?connection_id, + %connection_id, message_id, responding_to, "incoming response: waiting to resume requester" ); let _ = requester_resumed.1.await; tracing::debug!( - ?connection_id, + %connection_id, message_id, responding_to, "incoming response: requester resumed" ); } else { tracing::warn!( - ?connection_id, + %connection_id, message_id, responding_to, "incoming response: unknown request" @@ -323,10 +322,10 @@ impl Peer { None } else { - tracing::debug!(?connection_id, message_id, "incoming message: received"); + tracing::debug!(%connection_id, message_id, "incoming message: received"); proto::build_typed_envelope(connection_id, incoming).or_else(|| { tracing::error!( - ?connection_id, + %connection_id, message_id, "unable to construct a typed envelope" ); @@ -499,7 +498,7 @@ impl Peer { let connections = self.connections.read(); let connection = connections .get(&connection_id) - .ok_or_else(|| anyhow!("no such connection: {:?}", connection_id))?; + .ok_or_else(|| anyhow!("no such connection: {}", connection_id))?; Ok(connection.clone()) } } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 07c15e0f16..385caf3565 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -72,7 +72,7 @@ impl AnyTypedEnvelope for TypedEnvelope { } fn original_sender_id(&self) -> Option { - self.original_sender_id.clone() + self.original_sender_id } } diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 2aee12d314..c30dc2ea29 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -2050,7 +2050,7 @@ impl Workspace { .get(&leader_id) .map(|c| c.replica_id) }) - .ok_or_else(|| anyhow!("no such collaborator {:?}", leader_id))?; + .ok_or_else(|| anyhow!("no such collaborator {}", leader_id))?; let item_builders = cx.update(|cx| { cx.default_global::()