diff --git a/Cargo.lock b/Cargo.lock index 57dcade5b9..8e9934e357 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,6 +876,7 @@ dependencies = [ "tonic", "tower", "tracing", + "tracing-log", "tracing-opentelemetry", "tracing-subscriber", "util", @@ -2588,6 +2589,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.8" @@ -3734,6 +3744,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.25" @@ -3856,6 +3875,7 @@ dependencies = [ "smol", "smol-timeout", "tempdir", + "tracing", "util", "zstd", ] @@ -5244,9 +5264,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.26" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if 1.0.0", "log", @@ -5257,9 +5277,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.15" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" dependencies = [ "proc-macro2", "quote", @@ -5317,9 +5337,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" dependencies = [ "ansi_term 0.12.1", + "lazy_static", + "matchers", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index a5541990d3..9fdab47d3e 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -44,9 +44,10 @@ tokio-tungstenite = "0.17" tonic = "0.6" tower = "0.4" toml = "0.5.8" -tracing = "0.1" +tracing = "0.1.34" +tracing-log = "0.1.3" tracing-opentelemetry = "0.17" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } [dependencies.sqlx] version = "0.5.2" diff --git a/crates/collab/k8s/environments/production.sh b/crates/collab/k8s/environments/production.sh index bac7fbedf7..039c1f60a5 100644 --- a/crates/collab/k8s/environments/production.sh +++ b/crates/collab/k8s/environments/production.sh @@ -1,3 +1,2 @@ ZED_ENVIRONMENT=production -RUST_LOG=info -TRACE_LEVEL=debug +RUST_LOG=info,rpc=debug diff --git a/crates/collab/k8s/environments/staging.sh b/crates/collab/k8s/environments/staging.sh index ed7121715d..ece0851ea1 100644 --- a/crates/collab/k8s/environments/staging.sh +++ b/crates/collab/k8s/environments/staging.sh @@ -1,3 +1,2 @@ ZED_ENVIRONMENT=staging -RUST_LOG=info -TRACE_LEVEL=debug +RUST_LOG=info,rpc=debug diff --git a/crates/collab/k8s/manifest.template.yml b/crates/collab/k8s/manifest.template.yml index 2e9f0ae298..73b20409fd 100644 --- a/crates/collab/k8s/manifest.template.yml +++ b/crates/collab/k8s/manifest.template.yml @@ -83,8 +83,6 @@ spec: key: token - name: RUST_LOG value: ${RUST_LOG} - - name: TRACE_LEVEL - value: ${TRACE_LEVEL} - name: HONEYCOMB_DATASET value: "collab" - name: HONEYCOMB_API_KEY diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index d7f8905a9a..772a518853 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -11,7 +11,9 @@ use std::{ net::{SocketAddr, TcpListener}, sync::Arc, }; -use tracing::metadata::LevelFilter; +use tracing_log::LogTracer; +use tracing_subscriber::filter::EnvFilter; +use util::ResultExt; #[derive(Default, Deserialize)] pub struct Config { @@ -20,7 +22,7 @@ pub struct Config { pub api_token: String, pub honeycomb_api_key: Option, pub honeycomb_dataset: Option, - pub trace_level: Option, + pub rust_log: Option, } pub struct AppState { @@ -41,8 +43,6 @@ impl AppState { #[tokio::main] async fn main() -> Result<()> { - env_logger::init(); - if let Err(error) = env::load_dotenv() { log::error!( "error loading .env.toml (this is expected in production): {}", @@ -119,42 +119,44 @@ pub fn init_tracing(config: &Config) -> Option<()> { use std::str::FromStr; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::layer::SubscriberExt; + let rust_trace = config.rust_log.clone()?; - let (honeycomb_api_key, honeycomb_dataset) = config + LogTracer::init().log_err()?; + + let open_telemetry_layer = config .honeycomb_api_key .clone() - .zip(config.honeycomb_dataset.clone())?; + .zip(config.honeycomb_dataset.clone()) + .map(|(honeycomb_api_key, honeycomb_dataset)| { + let mut metadata = tonic::metadata::MetadataMap::new(); + metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap()); + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint("https://api.honeycomb.io") + .with_metadata(metadata), + ) + .with_trace_config(opentelemetry::sdk::trace::config().with_resource( + opentelemetry::sdk::Resource::new(vec![KeyValue::new( + "service.name", + honeycomb_dataset, + )]), + )) + .install_batch(opentelemetry::runtime::Tokio) + .expect("failed to initialize tracing"); - let mut metadata = tonic::metadata::MetadataMap::new(); - metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap()); - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint("https://api.honeycomb.io") - .with_metadata(metadata), - ) - .with_trace_config(opentelemetry::sdk::trace::config().with_resource( - opentelemetry::sdk::Resource::new(vec![KeyValue::new( - "service.name", - honeycomb_dataset, - )]), - )) - .install_batch(opentelemetry::runtime::Tokio) - .expect("failed to initialize tracing"); + OpenTelemetryLayer::new(tracer) + }); let subscriber = tracing_subscriber::Registry::default() - .with(OpenTelemetryLayer::new(tracer)) - .with(tracing_subscriber::fmt::layer()) + .with(open_telemetry_layer) .with( - config - .trace_level - .as_ref() - .map_or(LevelFilter::INFO, |level| { - LevelFilter::from_str(level).unwrap() - }), - ); + tracing_subscriber::fmt::layer() + .event_format(tracing_subscriber::fmt::format().pretty()), + ) + .with(EnvFilter::from_str(rust_trace.as_str()).log_err()?); tracing::subscriber::set_global_default(subscriber).unwrap(); diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 2750d9078f..7421a66312 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -28,6 +28,7 @@ rand = "0.8" rsa = "0.4" serde = { version = "1", features = ["derive"] } smol-timeout = "0.6" +tracing = "0.1.34" zstd = "0.9" [build-dependencies] diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 7d7d1c7194..05f4b33f0e 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -9,6 +9,7 @@ use futures::{ stream::BoxStream, FutureExt, SinkExt, StreamExt, }; +use log::as_debug; use parking_lot::{Mutex, RwLock}; use smol_timeout::TimeoutExt; use std::sync::atomic::Ordering::SeqCst; @@ -22,6 +23,7 @@ use std::{ }, time::Duration, }; +use tracing::instrument; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub struct ConnectionId(pub u32); @@ -108,6 +110,7 @@ impl Peer { }) } + #[instrument(skip_all)] pub async fn add_connection( self: &Arc, connection: Connection, @@ -145,9 +148,12 @@ impl Peer { let this = self.clone(); let response_channels = connection_state.response_channels.clone(); let handle_io = async move { + log::debug!(connection_id = connection_id.0; "handle io future: start"); + let _end_connection = util::defer(|| { response_channels.lock().take(); this.connections.write().remove(&connection_id); + log::debug!(connection_id = connection_id.0; "handle io future: end"); }); // Send messages on this frequency so the connection isn't closed. @@ -159,49 +165,68 @@ impl Peer { futures::pin_mut!(receive_timeout); loop { + log::debug!(connection_id = connection_id.0; "outer loop iteration start"); let read_message = reader.read().fuse(); futures::pin_mut!(read_message); loop { + log::debug!(connection_id = connection_id.0; "inner loop iteration start"); futures::select_biased! { outgoing = outgoing_rx.next().fuse() => match outgoing { Some(outgoing) => { + log::debug!(connection_id = connection_id.0; "outgoing rpc message: writing"); if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await { + log::debug!(connection_id = connection_id.0; "outgoing rpc message: done writing"); result.context("failed to write RPC message")?; + log::debug!(connection_id = connection_id.0; "keepalive interval: resetting after sending message"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } else { + log::debug!(connection_id = connection_id.0; "outgoing rpc message: writing timed out"); Err(anyhow!("timed out writing message"))?; } } None => { - log::info!("outgoing channel closed"); + log::debug!(connection_id = connection_id.0; "outgoing rpc message: channel closed"); return Ok(()) }, }, incoming = read_message => { + log::debug!(connection_id = connection_id.0; "incoming rpc message: received"); let incoming = incoming.context("received invalid RPC message")?; + log::debug!(connection_id = connection_id.0; "receive timeout: resetting"); receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { + log::debug!(connection_id = connection_id.0; "incoming rpc message: processing"); match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await { - Some(Ok(_)) => {}, + Some(Ok(_)) => { + log::debug!(connection_id = connection_id.0; "incoming rpc message: processed"); + }, Some(Err(_)) => { - log::info!("incoming channel closed"); + log::debug!(connection_id = connection_id.0; "incoming rpc message: channel closed"); return Ok(()) }, - None => Err(anyhow!("timed out processing incoming message"))?, + None => { + log::debug!(connection_id = connection_id.0; "incoming rpc message: processing timed out"); + Err(anyhow!("timed out processing incoming message"))? + }, } } break; }, _ = keepalive_timer => { + log::debug!(connection_id = connection_id.0; "keepalive interval: pinging"); if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await { + log::debug!(connection_id = connection_id.0; "keepalive interval: done pinging"); result.context("failed to send keepalive")?; + log::debug!(connection_id = connection_id.0; "keepalive interval: resetting after pinging"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } else { + log::debug!(connection_id = connection_id.0; "keepalive interval: pinging timed out"); Err(anyhow!("timed out sending keepalive"))?; } } _ = receive_timeout => { + log::debug!(connection_id = connection_id.0; "receive timeout: delay between messages too long"); Err(anyhow!("delay between messages too long"))? } } @@ -217,25 +242,71 @@ impl Peer { let incoming_rx = incoming_rx.filter_map(move |incoming| { let response_channels = response_channels.clone(); async move { + let message_id = incoming.id; + log::debug!(incoming = as_debug!(&incoming); "incoming message future: start"); + let _end = util::defer(move || { + log::debug!( + connection_id = connection_id.0, + message_id = message_id; + "incoming message future: end" + ); + }); + if let Some(responding_to) = incoming.responding_to { + log::debug!( + connection_id = connection_id.0, + message_id = message_id, + responding_to = responding_to; + "incoming response: received" + ); let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(tx) = channel { let requester_resumed = oneshot::channel(); if let Err(error) = tx.send((incoming, requester_resumed.0)) { log::debug!( - "received RPC but request future was dropped {:?}", - error.0 + connection_id = connection_id.0, + message_id = message_id, + responding_to = responding_to, + error = as_debug!(error); + "incoming response: request future dropped", ); } + + log::debug!( + connection_id = connection_id.0, + message_id = message_id, + responding_to = responding_to; + "incoming response: waiting to resume requester" + ); let _ = requester_resumed.1.await; + log::debug!( + connection_id = connection_id.0, + message_id = message_id, + responding_to = responding_to; + "incoming response: requester resumed" + ); } else { - log::warn!("received RPC response to unknown request {}", responding_to); + log::warn!( + connection_id = connection_id.0, + message_id = message_id, + responding_to = responding_to; + "incoming response: unknown request" + ); } None } else { + log::debug!( + connection_id = connection_id.0, + message_id = message_id; + "incoming message: received" + ); proto::build_typed_envelope(connection_id, incoming).or_else(|| { - log::error!("unable to construct a typed envelope"); + log::error!( + connection_id = connection_id.0, + message_id = message_id; + "unable to construct a typed envelope" + ); None }) }