Start moving from logging to tracing on collab server
Install some spans. Probably more work to do here. Co-Authored-By: Antonio Scandurra <me@as-cii.com>
This commit is contained in:
parent
2d9d30f74a
commit
1fe964ac16
8 changed files with 63 additions and 85 deletions
|
@ -25,7 +25,6 @@ use axum::{
|
|||
use collections::{HashMap, HashSet};
|
||||
use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
|
||||
use lazy_static::lazy_static;
|
||||
use log::{as_debug, as_display};
|
||||
use rpc::{
|
||||
proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
|
||||
Connection, ConnectionId, Peer, TypedEnvelope,
|
||||
|
@ -38,7 +37,7 @@ use std::{
|
|||
ops::{Deref, DerefMut},
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
time::Duration,
|
||||
};
|
||||
use store::{Store, Worktree};
|
||||
use time::OffsetDateTime;
|
||||
|
@ -47,12 +46,11 @@ use tokio::{
|
|||
time::Sleep,
|
||||
};
|
||||
use tower::ServiceBuilder;
|
||||
use tracing::{info_span, Instrument};
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
use util::ResultExt;
|
||||
|
||||
type MessageHandler = Box<
|
||||
dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, Result<()>>,
|
||||
>;
|
||||
type MessageHandler =
|
||||
Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
|
||||
|
||||
pub struct Server {
|
||||
peer: Arc<Peer>,
|
||||
|
@ -161,7 +159,14 @@ impl Server {
|
|||
"handle message",
|
||||
payload_type = envelope.payload_type_name()
|
||||
);
|
||||
(handler)(server, *envelope).instrument(span).boxed()
|
||||
let future = (handler)(server, *envelope);
|
||||
async move {
|
||||
if let Err(error) = future.await {
|
||||
tracing::error!(%error, "error handling message");
|
||||
}
|
||||
}
|
||||
.instrument(span)
|
||||
.boxed()
|
||||
}),
|
||||
);
|
||||
if prev_handler.is_some() {
|
||||
|
@ -238,12 +243,13 @@ impl Server {
|
|||
pub fn handle_connection<E: Executor>(
|
||||
self: &Arc<Self>,
|
||||
connection: Connection,
|
||||
addr: String,
|
||||
address: String,
|
||||
user_id: UserId,
|
||||
mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
|
||||
executor: E,
|
||||
) -> impl Future<Output = ()> {
|
||||
let mut this = self.clone();
|
||||
let span = info_span!("handle connection", %user_id, %address);
|
||||
async move {
|
||||
let (connection_id, handle_io, mut incoming_rx) = this
|
||||
.peer
|
||||
|
@ -258,6 +264,8 @@ impl Server {
|
|||
})
|
||||
.await;
|
||||
|
||||
tracing::info!(%user_id, %connection_id, %address, "connection opened");
|
||||
|
||||
if let Some(send_connection_id) = send_connection_id.as_mut() {
|
||||
let _ = send_connection_id.send(connection_id).await;
|
||||
}
|
||||
|
@ -275,50 +283,47 @@ impl Server {
|
|||
futures::pin_mut!(next_message);
|
||||
futures::select_biased! {
|
||||
result = handle_io => {
|
||||
if let Err(err) = result {
|
||||
log::error!("error handling rpc connection {:?} - {:?}", addr, err);
|
||||
if let Err(error) = result {
|
||||
tracing::error!(%error, "error handling I/O");
|
||||
}
|
||||
break;
|
||||
}
|
||||
message = next_message => {
|
||||
if let Some(message) = message {
|
||||
let start_time = Instant::now();
|
||||
let type_name = message.payload_type_name();
|
||||
log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
|
||||
if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
|
||||
let notifications = this.notifications.clone();
|
||||
let is_background = message.is_background();
|
||||
let handle_message = (handler)(this.clone(), message);
|
||||
let handle_message = async move {
|
||||
if let Err(err) = handle_message.await {
|
||||
log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
|
||||
let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
|
||||
async {
|
||||
if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
|
||||
let notifications = this.notifications.clone();
|
||||
let is_background = message.is_background();
|
||||
let handle_message = (handler)(this.clone(), message);
|
||||
let handle_message = async move {
|
||||
handle_message.await;
|
||||
if let Some(mut notifications) = notifications {
|
||||
let _ = notifications.send(()).await;
|
||||
}
|
||||
};
|
||||
if is_background {
|
||||
executor.spawn_detached(handle_message);
|
||||
} else {
|
||||
log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
|
||||
handle_message.await;
|
||||
}
|
||||
if let Some(mut notifications) = notifications {
|
||||
let _ = notifications.send(()).await;
|
||||
}
|
||||
};
|
||||
if is_background {
|
||||
executor.spawn_detached(handle_message);
|
||||
} else {
|
||||
handle_message.await;
|
||||
tracing::error!("no message handler");
|
||||
}
|
||||
} else {
|
||||
log::warn!("unhandled message: {}", type_name);
|
||||
}
|
||||
}.instrument(span).await;
|
||||
} else {
|
||||
log::info!(address = as_debug!(addr); "rpc connection closed");
|
||||
tracing::info!(%user_id, %connection_id, %address, "connection closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = this.sign_out(connection_id).await {
|
||||
log::error!("error signing out connection {:?} - {:?}", addr, err);
|
||||
if let Err(error) = this.sign_out(connection_id).await {
|
||||
tracing::error!(%error, "error signing out");
|
||||
}
|
||||
}
|
||||
}.instrument(span)
|
||||
}
|
||||
|
||||
async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
|
||||
|
@ -854,6 +859,7 @@ impl Server {
|
|||
Ok(proto::GetUsersResponse { users })
|
||||
}
|
||||
|
||||
#[instrument(skip(self, state, user_ids))]
|
||||
fn update_contacts_for_users<'a>(
|
||||
self: &Arc<Self>,
|
||||
state: &Store,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue