Merge pull request #990 from zed-industries/more-tracing

Improve tracing support
This commit is contained in:
Nathan Sobo 2022-05-12 14:45:50 -06:00 committed by GitHub
commit 7847707090
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 178 additions and 98 deletions

33
Cargo.lock generated
View file

@ -876,6 +876,7 @@ dependencies = [
"tonic", "tonic",
"tower", "tower",
"tracing", "tracing",
"tracing-log",
"tracing-opentelemetry", "tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
"util", "util",
@ -2590,6 +2591,15 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]] [[package]]
name = "matches" name = "matches"
version = "0.1.8" version = "0.1.8"
@ -3736,6 +3746,15 @@ dependencies = [
"regex-syntax", "regex-syntax",
] ]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]] [[package]]
name = "regex-syntax" name = "regex-syntax"
version = "0.6.25" version = "0.6.25"
@ -3848,7 +3867,6 @@ dependencies = [
"collections", "collections",
"futures", "futures",
"gpui", "gpui",
"log",
"parking_lot", "parking_lot",
"prost 0.8.0", "prost 0.8.0",
"prost-build 0.8.0", "prost-build 0.8.0",
@ -3858,6 +3876,7 @@ dependencies = [
"smol", "smol",
"smol-timeout", "smol-timeout",
"tempdir", "tempdir",
"tracing",
"util", "util",
"zstd", "zstd",
] ]
@ -5246,9 +5265,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.26" version = "0.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"log", "log",
@ -5259,9 +5278,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-attributes" name = "tracing-attributes"
version = "0.1.15" version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2" checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -5319,9 +5338,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
dependencies = [ dependencies = [
"ansi_term 0.12.1", "ansi_term 0.12.1",
"lazy_static",
"matchers",
"regex",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
] ]

View file

@ -24,11 +24,9 @@ axum = { version = "0.5", features = ["json", "headers", "ws"] }
base64 = "0.13" base64 = "0.13"
clap = { version = "3.1", features = ["derive"], optional = true } clap = { version = "3.1", features = ["derive"], optional = true }
envy = "0.4.2" envy = "0.4.2"
env_logger = "0.8"
futures = "0.3" futures = "0.3"
lazy_static = "1.4" lazy_static = "1.4"
lipsum = { version = "0.8", optional = true } lipsum = { version = "0.8", optional = true }
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
opentelemetry = { version = "0.17", features = ["rt-tokio"] } opentelemetry = { version = "0.17", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.10", features = ["tls-roots"] } opentelemetry-otlp = { version = "0.10", features = ["tls-roots"] }
parking_lot = "0.11.1" parking_lot = "0.11.1"
@ -44,9 +42,10 @@ tokio-tungstenite = "0.17"
tonic = "0.6" tonic = "0.6"
tower = "0.4" tower = "0.4"
toml = "0.5.8" toml = "0.5.8"
tracing = "0.1" tracing = "0.1.34"
tracing-log = "0.1.3"
tracing-opentelemetry = "0.17" tracing-opentelemetry = "0.17"
tracing-subscriber = "0.3" tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
[dependencies.sqlx] [dependencies.sqlx]
version = "0.5.2" version = "0.5.2"
@ -59,6 +58,7 @@ rpc = { path = "../rpc", features = ["test-support"] }
client = { path = "../client", features = ["test-support"] } client = { path = "../client", features = ["test-support"] }
editor = { path = "../editor", features = ["test-support"] } editor = { path = "../editor", features = ["test-support"] }
language = { path = "../language", features = ["test-support"] } language = { path = "../language", features = ["test-support"] }
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
lsp = { path = "../lsp", features = ["test-support"] } lsp = { path = "../lsp", features = ["test-support"] }
project = { path = "../project", features = ["test-support"] } project = { path = "../project", features = ["test-support"] }
settings = { path = "../settings", features = ["test-support"] } settings = { path = "../settings", features = ["test-support"] }

View file

@ -1,3 +1,2 @@
ZED_ENVIRONMENT=production ZED_ENVIRONMENT=production
RUST_LOG=info RUST_LOG=info,rpc=debug
TRACE_LEVEL=debug

View file

@ -1,3 +1,2 @@
ZED_ENVIRONMENT=staging ZED_ENVIRONMENT=staging
RUST_LOG=info RUST_LOG=info,rpc=debug
TRACE_LEVEL=debug

View file

@ -83,8 +83,6 @@ spec:
key: token key: token
- name: RUST_LOG - name: RUST_LOG
value: ${RUST_LOG} value: ${RUST_LOG}
- name: TRACE_LEVEL
value: ${TRACE_LEVEL}
- name: HONEYCOMB_DATASET - name: HONEYCOMB_DATASET
value: "collab" value: "collab"
- name: HONEYCOMB_API_KEY - name: HONEYCOMB_API_KEY

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use super::db::{self, UserId}; use super::db::{self, UserId};
use crate::{AppState, Error}; use crate::{AppState, Error};
use anyhow::{Context, Result}; use anyhow::{anyhow, Context, Result};
use axum::{ use axum::{
http::{self, Request, StatusCode}, http::{self, Request, StatusCode},
middleware::Next, middleware::Next,
@ -51,7 +51,12 @@ pub async fn validate_header<B>(mut req: Request<B>, next: Next<B>) -> impl Into
} }
if credentials_valid { if credentials_valid {
req.extensions_mut().insert(user_id); let user = state
.db
.get_user_by_id(user_id)
.await?
.ok_or_else(|| anyhow!("user {} not found", user_id))?;
req.extensions_mut().insert(user);
Ok::<_, Error>(next.run(req).await) Ok::<_, Error>(next.run(req).await)
} else { } else {
Err(Error::Http( Err(Error::Http(

View file

@ -11,7 +11,9 @@ use std::{
net::{SocketAddr, TcpListener}, net::{SocketAddr, TcpListener},
sync::Arc, sync::Arc,
}; };
use tracing::metadata::LevelFilter; use tracing_log::LogTracer;
use tracing_subscriber::filter::EnvFilter;
use util::ResultExt;
#[derive(Default, Deserialize)] #[derive(Default, Deserialize)]
pub struct Config { pub struct Config {
@ -20,7 +22,7 @@ pub struct Config {
pub api_token: String, pub api_token: String,
pub honeycomb_api_key: Option<String>, pub honeycomb_api_key: Option<String>,
pub honeycomb_dataset: Option<String>, pub honeycomb_dataset: Option<String>,
pub trace_level: Option<String>, pub rust_log: Option<String>,
} }
pub struct AppState { pub struct AppState {
@ -41,10 +43,8 @@ impl AppState {
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
env_logger::init();
if let Err(error) = env::load_dotenv() { if let Err(error) = env::load_dotenv() {
log::error!( eprintln!(
"error loading .env.toml (this is expected in production): {}", "error loading .env.toml (this is expected in production): {}",
error error
); );
@ -119,42 +119,44 @@ pub fn init_tracing(config: &Config) -> Option<()> {
use std::str::FromStr; use std::str::FromStr;
use tracing_opentelemetry::OpenTelemetryLayer; use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::layer::SubscriberExt;
let rust_log = config.rust_log.clone()?;
let (honeycomb_api_key, honeycomb_dataset) = config LogTracer::init().log_err()?;
let open_telemetry_layer = config
.honeycomb_api_key .honeycomb_api_key
.clone() .clone()
.zip(config.honeycomb_dataset.clone())?; .zip(config.honeycomb_dataset.clone())
.map(|(honeycomb_api_key, honeycomb_dataset)| {
let mut metadata = tonic::metadata::MetadataMap::new();
metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap());
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("https://api.honeycomb.io")
.with_metadata(metadata),
)
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
opentelemetry::sdk::Resource::new(vec![KeyValue::new(
"service.name",
honeycomb_dataset,
)]),
))
.install_batch(opentelemetry::runtime::Tokio)
.expect("failed to initialize tracing");
let mut metadata = tonic::metadata::MetadataMap::new(); OpenTelemetryLayer::new(tracer)
metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap()); });
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("https://api.honeycomb.io")
.with_metadata(metadata),
)
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
opentelemetry::sdk::Resource::new(vec![KeyValue::new(
"service.name",
honeycomb_dataset,
)]),
))
.install_batch(opentelemetry::runtime::Tokio)
.expect("failed to initialize tracing");
let subscriber = tracing_subscriber::Registry::default() let subscriber = tracing_subscriber::Registry::default()
.with(OpenTelemetryLayer::new(tracer)) .with(open_telemetry_layer)
.with(tracing_subscriber::fmt::layer())
.with( .with(
config tracing_subscriber::fmt::layer()
.trace_level .event_format(tracing_subscriber::fmt::format().pretty()),
.as_ref() )
.map_or(LevelFilter::INFO, |level| { .with(EnvFilter::from_str(rust_log.as_str()).log_err()?);
LevelFilter::from_str(level).unwrap()
}),
);
tracing::subscriber::set_global_default(subscriber).unwrap(); tracing::subscriber::set_global_default(subscriber).unwrap();

View file

@ -2,7 +2,7 @@ mod store;
use crate::{ use crate::{
auth, auth,
db::{self, ChannelId, MessageId, UserId}, db::{self, ChannelId, MessageId, User, UserId},
AppState, Result, AppState, Result,
}; };
use anyhow::anyhow; use anyhow::anyhow;
@ -49,7 +49,7 @@ use tokio::{
time::Sleep, time::Sleep,
}; };
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tracing::{info_span, Instrument}; use tracing::{info_span, instrument, Instrument};
type MessageHandler = type MessageHandler =
Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>; Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
@ -244,12 +244,14 @@ impl Server {
self: &Arc<Self>, self: &Arc<Self>,
connection: Connection, connection: Connection,
address: String, address: String,
user_id: UserId, user: User,
mut send_connection_id: Option<mpsc::Sender<ConnectionId>>, mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
executor: E, executor: E,
) -> impl Future<Output = Result<()>> { ) -> impl Future<Output = Result<()>> {
let mut this = self.clone(); let mut this = self.clone();
let span = info_span!("handle connection", %user_id, %address); let user_id = user.id;
let login = user.github_login;
let span = info_span!("handle connection", %user_id, %login, %address);
async move { async move {
let (connection_id, handle_io, mut incoming_rx) = this let (connection_id, handle_io, mut incoming_rx) = this
.peer .peer
@ -264,7 +266,7 @@ impl Server {
}) })
.await; .await;
tracing::info!(%user_id, %connection_id, %address, "connection opened"); tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
if let Some(send_connection_id) = send_connection_id.as_mut() { if let Some(send_connection_id) = send_connection_id.as_mut() {
let _ = send_connection_id.send(connection_id).await; let _ = send_connection_id.send(connection_id).await;
@ -287,14 +289,14 @@ impl Server {
futures::select_biased! { futures::select_biased! {
result = handle_io => { result = handle_io => {
if let Err(error) = result { if let Err(error) = result {
tracing::error!(%error, "error handling I/O"); tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
} }
break; break;
} }
message = next_message => { message = next_message => {
if let Some(message) = message { if let Some(message) = message {
let type_name = message.payload_type_name(); let type_name = message.payload_type_name();
let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name); let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
async { async {
if let Some(handler) = this.handlers.get(&message.payload_type_id()) { if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
let notifications = this.notifications.clone(); let notifications = this.notifications.clone();
@ -312,25 +314,27 @@ impl Server {
handle_message.await; handle_message.await;
} }
} else { } else {
tracing::error!("no message handler"); tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
} }
}.instrument(span).await; }.instrument(span).await;
} else { } else {
tracing::info!(%user_id, %connection_id, %address, "connection closed"); tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
break; break;
} }
} }
} }
} }
tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
if let Err(error) = this.sign_out(connection_id).await { if let Err(error) = this.sign_out(connection_id).await {
tracing::error!(%error, "error signing out"); tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
} }
Ok(()) Ok(())
}.instrument(span) }.instrument(span)
} }
#[instrument(skip(self), err)]
async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> { async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
self.peer.disconnect(connection_id); self.peer.disconnect(connection_id);
let removed_connection = self.store_mut().await.remove_connection(connection_id)?; let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
@ -1116,29 +1120,6 @@ impl Server {
Ok(()) Ok(())
} }
// #[instrument(skip(self, state, user_ids))]
// fn update_contacts_for_users<'a>(
// self: &Arc<Self>,
// state: &Store,
// user_ids: impl IntoIterator<Item = &'a UserId>,
// ) {
// for user_id in user_ids {
// let contacts = state.contacts_for_user(*user_id);
// for connection_id in state.connection_ids_for_user(*user_id) {
// self.peer
// .send(
// connection_id,
// proto::UpdateContacts {
// contacts: contacts.clone(),
// pending_requests_from_user_ids: Default::default(),
// pending_requests_to_user_ids: Default::default(),
// },
// )
// .trace_err();
// }
// }
// }
async fn join_channel( async fn join_channel(
self: Arc<Self>, self: Arc<Self>,
request: TypedEnvelope<proto::JoinChannel>, request: TypedEnvelope<proto::JoinChannel>,
@ -1443,7 +1424,7 @@ pub async fn handle_websocket_request(
TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>, TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
ConnectInfo(socket_address): ConnectInfo<SocketAddr>, ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
Extension(server): Extension<Arc<Server>>, Extension(server): Extension<Arc<Server>>,
Extension(user_id): Extension<UserId>, Extension(user): Extension<User>,
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
) -> axum::response::Response { ) -> axum::response::Response {
if protocol_version != rpc::PROTOCOL_VERSION { if protocol_version != rpc::PROTOCOL_VERSION {
@ -1463,7 +1444,7 @@ pub async fn handle_websocket_request(
let connection = Connection::new(Box::pin(socket)); let connection = Connection::new(Box::pin(socket));
async move { async move {
server server
.handle_connection(connection, socket_address, user_id, None, RealExecutor) .handle_connection(connection, socket_address, user, None, RealExecutor)
.await .await
.log_err(); .log_err();
} }
@ -6462,6 +6443,7 @@ mod tests {
let client_name = name.to_string(); let client_name = name.to_string();
let mut client = Client::new(http.clone()); let mut client = Client::new(http.clone());
let server = self.server.clone(); let server = self.server.clone();
let db = self.app_state.db.clone();
let connection_killers = self.connection_killers.clone(); let connection_killers = self.connection_killers.clone();
let forbid_connections = self.forbid_connections.clone(); let forbid_connections = self.forbid_connections.clone();
let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16); let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
@ -6482,6 +6464,7 @@ mod tests {
assert_eq!(credentials.access_token, "the-token"); assert_eq!(credentials.access_token, "the-token");
let server = server.clone(); let server = server.clone();
let db = db.clone();
let connection_killers = connection_killers.clone(); let connection_killers = connection_killers.clone();
let forbid_connections = forbid_connections.clone(); let forbid_connections = forbid_connections.clone();
let client_name = client_name.clone(); let client_name = client_name.clone();
@ -6495,11 +6478,12 @@ mod tests {
let (client_conn, server_conn, killed) = let (client_conn, server_conn, killed) =
Connection::in_memory(cx.background()); Connection::in_memory(cx.background());
connection_killers.lock().insert(user_id, killed); connection_killers.lock().insert(user_id, killed);
let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
cx.background() cx.background()
.spawn(server.handle_connection( .spawn(server.handle_connection(
server_conn, server_conn,
client_name, client_name,
user_id, user,
Some(connection_id_tx), Some(connection_id_tx),
cx.background(), cx.background(),
)) ))

View file

@ -21,13 +21,13 @@ async-lock = "2.4"
async-tungstenite = "0.16" async-tungstenite = "0.16"
base64 = "0.13" base64 = "0.13"
futures = "0.3" futures = "0.3"
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
parking_lot = "0.11.1" parking_lot = "0.11.1"
prost = "0.8" prost = "0.8"
rand = "0.8" rand = "0.8"
rsa = "0.4" rsa = "0.4"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
smol-timeout = "0.6" smol-timeout = "0.6"
tracing = { version = "0.1.34", features = ["log"] }
zstd = "0.9" zstd = "0.9"
[build-dependencies] [build-dependencies]

View file

@ -22,6 +22,7 @@ use std::{
}, },
time::Duration, time::Duration,
}; };
use tracing::instrument;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct ConnectionId(pub u32); pub struct ConnectionId(pub u32);
@ -108,6 +109,7 @@ impl Peer {
}) })
} }
#[instrument(skip_all)]
pub async fn add_connection<F, Fut, Out>( pub async fn add_connection<F, Fut, Out>(
self: &Arc<Self>, self: &Arc<Self>,
connection: Connection, connection: Connection,
@ -145,9 +147,12 @@ impl Peer {
let this = self.clone(); let this = self.clone();
let response_channels = connection_state.response_channels.clone(); let response_channels = connection_state.response_channels.clone();
let handle_io = async move { let handle_io = async move {
tracing::debug!(%connection_id, "handle io future: start");
let _end_connection = util::defer(|| { let _end_connection = util::defer(|| {
response_channels.lock().take(); response_channels.lock().take();
this.connections.write().remove(&connection_id); this.connections.write().remove(&connection_id);
tracing::debug!(%connection_id, "handle io future: end");
}); });
// Send messages on this frequency so the connection isn't closed. // Send messages on this frequency so the connection isn't closed.
@ -159,49 +164,68 @@ impl Peer {
futures::pin_mut!(receive_timeout); futures::pin_mut!(receive_timeout);
loop { loop {
tracing::debug!(%connection_id, "outer loop iteration start");
let read_message = reader.read().fuse(); let read_message = reader.read().fuse();
futures::pin_mut!(read_message); futures::pin_mut!(read_message);
loop { loop {
tracing::debug!(%connection_id, "inner loop iteration start");
futures::select_biased! { futures::select_biased! {
outgoing = outgoing_rx.next().fuse() => match outgoing { outgoing = outgoing_rx.next().fuse() => match outgoing {
Some(outgoing) => { Some(outgoing) => {
tracing::debug!(%connection_id, "outgoing rpc message: writing");
if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await { if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await {
tracing::debug!(%connection_id, "outgoing rpc message: done writing");
result.context("failed to write RPC message")?; result.context("failed to write RPC message")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after sending message");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
} else { } else {
tracing::debug!(%connection_id, "outgoing rpc message: writing timed out");
Err(anyhow!("timed out writing message"))?; Err(anyhow!("timed out writing message"))?;
} }
} }
None => { None => {
log::info!("outgoing channel closed"); tracing::debug!(%connection_id, "outgoing rpc message: channel closed");
return Ok(()) return Ok(())
}, },
}, },
incoming = read_message => { incoming = read_message => {
let incoming = incoming.context("received invalid RPC message")?; let incoming = incoming.context("error reading rpc message from socket")?;
tracing::debug!(%connection_id, "incoming rpc message: received");
tracing::debug!(%connection_id, "receive timeout: resetting");
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
if let proto::Message::Envelope(incoming) = incoming { if let proto::Message::Envelope(incoming) = incoming {
tracing::debug!(%connection_id, "incoming rpc message: processing");
match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await { match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await {
Some(Ok(_)) => {}, Some(Ok(_)) => {
tracing::debug!(%connection_id, "incoming rpc message: processed");
},
Some(Err(_)) => { Some(Err(_)) => {
log::info!("incoming channel closed"); tracing::debug!(%connection_id, "incoming rpc message: channel closed");
return Ok(()) return Ok(())
}, },
None => Err(anyhow!("timed out processing incoming message"))?, None => {
tracing::debug!(%connection_id, "incoming rpc message: processing timed out");
Err(anyhow!("timed out processing incoming message"))?
},
} }
} }
break; break;
}, },
_ = keepalive_timer => { _ = keepalive_timer => {
tracing::debug!(%connection_id, "keepalive interval: pinging");
if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await { if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await {
tracing::debug!(%connection_id, "keepalive interval: done pinging");
result.context("failed to send keepalive")?; result.context("failed to send keepalive")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
} else { } else {
tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
Err(anyhow!("timed out sending keepalive"))?; Err(anyhow!("timed out sending keepalive"))?;
} }
} }
_ = receive_timeout => { _ = receive_timeout => {
tracing::debug!(%connection_id, "receive timeout: delay between messages too long");
Err(anyhow!("delay between messages too long"))? Err(anyhow!("delay between messages too long"))?
} }
} }
@ -217,25 +241,71 @@ impl Peer {
let incoming_rx = incoming_rx.filter_map(move |incoming| { let incoming_rx = incoming_rx.filter_map(move |incoming| {
let response_channels = response_channels.clone(); let response_channels = response_channels.clone();
async move { async move {
let message_id = incoming.id;
tracing::debug!(?incoming, "incoming message future: start");
let _end = util::defer(move || {
tracing::debug!(
%connection_id,
message_id,
"incoming message future: end"
);
});
if let Some(responding_to) = incoming.responding_to { if let Some(responding_to) = incoming.responding_to {
tracing::debug!(
%connection_id,
message_id,
responding_to,
"incoming response: received"
);
let channel = response_channels.lock().as_mut()?.remove(&responding_to); let channel = response_channels.lock().as_mut()?.remove(&responding_to);
if let Some(tx) = channel { if let Some(tx) = channel {
let requester_resumed = oneshot::channel(); let requester_resumed = oneshot::channel();
if let Err(error) = tx.send((incoming, requester_resumed.0)) { if let Err(error) = tx.send((incoming, requester_resumed.0)) {
log::debug!( tracing::debug!(
"received RPC but request future was dropped {:?}", %connection_id,
error.0 message_id,
responding_to = responding_to,
?error,
"incoming response: request future dropped",
); );
} }
tracing::debug!(
%connection_id,
message_id,
responding_to,
"incoming response: waiting to resume requester"
);
let _ = requester_resumed.1.await; let _ = requester_resumed.1.await;
tracing::debug!(
%connection_id,
message_id,
responding_to,
"incoming response: requester resumed"
);
} else { } else {
log::warn!("received RPC response to unknown request {}", responding_to); tracing::warn!(
%connection_id,
message_id,
responding_to,
"incoming response: unknown request"
);
} }
None None
} else { } else {
tracing::debug!(
%connection_id,
message_id,
"incoming message: received"
);
proto::build_typed_envelope(connection_id, incoming).or_else(|| { proto::build_typed_envelope(connection_id, incoming).or_else(|| {
log::error!("unable to construct a typed envelope"); tracing::error!(
%connection_id,
message_id,
"unable to construct a typed envelope"
);
None None
}) })
} }