Merge pull request #940 from zed-industries/telemetry

Instrument the collab server with OpenTelemetry collecting into Honeycomb.io
This commit is contained in:
Antonio Scandurra 2022-04-29 17:50:55 +02:00 committed by GitHub
commit cddafa5fef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 612 additions and 153 deletions

View file

@ -25,7 +25,6 @@ use axum::{
use collections::{HashMap, HashSet};
use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use log::{as_debug, as_display};
use rpc::{
proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
Connection, ConnectionId, Peer, TypedEnvelope,
@ -38,7 +37,7 @@ use std::{
ops::{Deref, DerefMut},
rc::Rc,
sync::Arc,
time::{Duration, Instant},
time::Duration,
};
use store::{Store, Worktree};
use time::OffsetDateTime;
@ -47,11 +46,10 @@ use tokio::{
time::Sleep,
};
use tower::ServiceBuilder;
use util::ResultExt;
use tracing::{info_span, instrument, Instrument};
type MessageHandler = Box<
dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, Result<()>>,
>;
type MessageHandler =
Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
pub struct Server {
peer: Arc<Peer>,
@ -156,7 +154,21 @@ impl Server {
TypeId::of::<M>(),
Box::new(move |server, envelope| {
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
(handler)(server, *envelope).boxed()
let span = info_span!(
"handle message",
payload_type = envelope.payload_type_name(),
payload = serde_json::to_string_pretty(&envelope.payload)
.unwrap()
.as_str(),
);
let future = (handler)(server, *envelope);
async move {
if let Err(error) = future.await {
tracing::error!(%error, "error handling message");
}
}
.instrument(span)
.boxed()
}),
);
if prev_handler.is_some() {
@ -209,7 +221,7 @@ impl Server {
let receipt = envelope.receipt();
let handler = handler.clone();
async move {
let mut store = server.store.write().await;
let mut store = server.state_mut().await;
let response = (handler)(server.clone(), &mut *store, envelope);
match response {
Ok(response) => {
@ -233,12 +245,13 @@ impl Server {
pub fn handle_connection<E: Executor>(
self: &Arc<Self>,
connection: Connection,
addr: String,
address: String,
user_id: UserId,
mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
executor: E,
) -> impl Future<Output = ()> {
let mut this = self.clone();
let span = info_span!("handle connection", %user_id, %address);
async move {
let (connection_id, handle_io, mut incoming_rx) = this
.peer
@ -253,6 +266,8 @@ impl Server {
})
.await;
tracing::info!(%user_id, %connection_id, %address, "connection opened");
if let Some(send_connection_id) = send_connection_id.as_mut() {
let _ = send_connection_id.send(connection_id).await;
}
@ -270,50 +285,47 @@ impl Server {
futures::pin_mut!(next_message);
futures::select_biased! {
result = handle_io => {
if let Err(err) = result {
log::error!("error handling rpc connection {:?} - {:?}", addr, err);
if let Err(error) = result {
tracing::error!(%error, "error handling I/O");
}
break;
}
message = next_message => {
if let Some(message) = message {
let start_time = Instant::now();
let type_name = message.payload_type_name();
log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
let notifications = this.notifications.clone();
let is_background = message.is_background();
let handle_message = (handler)(this.clone(), message);
let handle_message = async move {
if let Err(err) = handle_message.await {
log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
async {
if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
let notifications = this.notifications.clone();
let is_background = message.is_background();
let handle_message = (handler)(this.clone(), message);
let handle_message = async move {
handle_message.await;
if let Some(mut notifications) = notifications {
let _ = notifications.send(()).await;
}
};
if is_background {
executor.spawn_detached(handle_message);
} else {
log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
handle_message.await;
}
if let Some(mut notifications) = notifications {
let _ = notifications.send(()).await;
}
};
if is_background {
executor.spawn_detached(handle_message);
} else {
handle_message.await;
tracing::error!("no message handler");
}
} else {
log::warn!("unhandled message: {}", type_name);
}
}.instrument(span).await;
} else {
log::info!(address = as_debug!(addr); "rpc connection closed");
tracing::info!(%user_id, %connection_id, %address, "connection closed");
break;
}
}
}
}
if let Err(err) = this.sign_out(connection_id).await {
log::error!("error signing out connection {:?} - {:?}", addr, err);
if let Err(error) = this.sign_out(connection_id).await {
tracing::error!(%error, "error signing out");
}
}
}.instrument(span)
}
async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
@ -849,6 +861,7 @@ impl Server {
Ok(proto::GetUsersResponse { users })
}
#[instrument(skip(self, state, user_ids))]
fn update_contacts_for_users<'a>(
self: &Arc<Self>,
state: &Store,
@ -864,7 +877,7 @@ impl Server {
contacts: contacts.clone(),
},
)
.log_err();
.trace_err();
}
}
}
@ -1084,6 +1097,14 @@ impl<'a> Drop for StoreWriteGuard<'a> {
fn drop(&mut self) {
#[cfg(test)]
self.check_invariants();
let metrics = self.metrics();
tracing::info!(
connections = metrics.connections,
registered_projects = metrics.registered_projects,
shared_projects = metrics.shared_projects,
"metrics"
);
}
}
@ -1099,13 +1120,14 @@ impl Executor for RealExecutor {
}
}
#[instrument(skip(f))]
fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
where
F: FnMut(ConnectionId) -> anyhow::Result<()>,
{
for receiver_id in receiver_ids {
if receiver_id != sender_id {
f(receiver_id).log_err();
f(receiver_id).trace_err();
}
}
}
@ -1206,6 +1228,29 @@ fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
}
}
pub trait ResultExt {
type Ok;
fn trace_err(self) -> Option<Self::Ok>;
}
impl<T, E> ResultExt for Result<T, E>
where
E: std::fmt::Debug,
{
type Ok = T;
fn trace_err(self) -> Option<T> {
match self {
Ok(value) => Some(value),
Err(error) => {
tracing::error!("{:?}", error);
None
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;