Include login in connection-related tracing spans/events

Also, include metadata on more events and add an event called "signing out" with all this metadata to make it easier to search for.
This commit is contained in:
Nathan Sobo 2022-05-12 12:06:06 -06:00
parent 3226e07dcc
commit a3b9ad75b0
2 changed files with 27 additions and 15 deletions

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use super::db::{self, UserId}; use super::db::{self, UserId};
use crate::{AppState, Error}; use crate::{AppState, Error};
use anyhow::{Context, Result}; use anyhow::{anyhow, Context, Result};
use axum::{ use axum::{
http::{self, Request, StatusCode}, http::{self, Request, StatusCode},
middleware::Next, middleware::Next,
@ -51,7 +51,12 @@ pub async fn validate_header<B>(mut req: Request<B>, next: Next<B>) -> impl Into
} }
if credentials_valid { if credentials_valid {
req.extensions_mut().insert(user_id); let user = state
.db
.get_user_by_id(user_id)
.await?
.ok_or_else(|| anyhow!("user {} not found", user_id))?;
req.extensions_mut().insert(user);
Ok::<_, Error>(next.run(req).await) Ok::<_, Error>(next.run(req).await)
} else { } else {
Err(Error::Http( Err(Error::Http(

View file

@ -2,7 +2,7 @@ mod store;
use crate::{ use crate::{
auth, auth,
db::{self, ChannelId, MessageId, UserId}, db::{self, ChannelId, MessageId, User, UserId},
AppState, Result, AppState, Result,
}; };
use anyhow::anyhow; use anyhow::anyhow;
@ -49,7 +49,7 @@ use tokio::{
time::Sleep, time::Sleep,
}; };
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tracing::{info_span, Instrument}; use tracing::{info_span, instrument, Instrument};
type MessageHandler = type MessageHandler =
Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>; Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
@ -244,12 +244,14 @@ impl Server {
self: &Arc<Self>, self: &Arc<Self>,
connection: Connection, connection: Connection,
address: String, address: String,
user_id: UserId, user: User,
mut send_connection_id: Option<mpsc::Sender<ConnectionId>>, mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
executor: E, executor: E,
) -> impl Future<Output = Result<()>> { ) -> impl Future<Output = Result<()>> {
let mut this = self.clone(); let mut this = self.clone();
let span = info_span!("handle connection", %user_id, %address); let user_id = user.id;
let login = user.github_login;
let span = info_span!("handle connection", %user_id, %login, %address);
async move { async move {
let (connection_id, handle_io, mut incoming_rx) = this let (connection_id, handle_io, mut incoming_rx) = this
.peer .peer
@ -264,7 +266,7 @@ impl Server {
}) })
.await; .await;
tracing::info!(%user_id, %connection_id, %address, "connection opened"); tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
if let Some(send_connection_id) = send_connection_id.as_mut() { if let Some(send_connection_id) = send_connection_id.as_mut() {
let _ = send_connection_id.send(connection_id).await; let _ = send_connection_id.send(connection_id).await;
@ -287,14 +289,14 @@ impl Server {
futures::select_biased! { futures::select_biased! {
result = handle_io => { result = handle_io => {
if let Err(error) = result { if let Err(error) = result {
tracing::error!(%error, "error handling I/O"); tracing::error!(%error, %user_id, %login, %connection_id, %address, "error handling I/O");
} }
break; break;
} }
message = next_message => { message = next_message => {
if let Some(message) = message { if let Some(message) = message {
let type_name = message.payload_type_name(); let type_name = message.payload_type_name();
let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name); let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
async { async {
if let Some(handler) = this.handlers.get(&message.payload_type_id()) { if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
let notifications = this.notifications.clone(); let notifications = this.notifications.clone();
@ -312,25 +314,27 @@ impl Server {
handle_message.await; handle_message.await;
} }
} else { } else {
tracing::error!("no message handler"); tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
} }
}.instrument(span).await; }.instrument(span).await;
} else { } else {
tracing::info!(%user_id, %connection_id, %address, "connection closed"); tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
break; break;
} }
} }
} }
} }
tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
if let Err(error) = this.sign_out(connection_id).await { if let Err(error) = this.sign_out(connection_id).await {
tracing::error!(%error, "error signing out"); tracing::error!(%user_id, %login, %connection_id, %address, %error, "error signing out");
} }
Ok(()) Ok(())
}.instrument(span) }.instrument(span)
} }
#[instrument(skip(self), err)]
async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> { async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
self.peer.disconnect(connection_id); self.peer.disconnect(connection_id);
let removed_connection = self.store_mut().await.remove_connection(connection_id)?; let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
@ -1420,7 +1424,7 @@ pub async fn handle_websocket_request(
TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>, TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
ConnectInfo(socket_address): ConnectInfo<SocketAddr>, ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
Extension(server): Extension<Arc<Server>>, Extension(server): Extension<Arc<Server>>,
Extension(user_id): Extension<UserId>, Extension(user): Extension<User>,
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
) -> axum::response::Response { ) -> axum::response::Response {
if protocol_version != rpc::PROTOCOL_VERSION { if protocol_version != rpc::PROTOCOL_VERSION {
@ -1440,7 +1444,7 @@ pub async fn handle_websocket_request(
let connection = Connection::new(Box::pin(socket)); let connection = Connection::new(Box::pin(socket));
async move { async move {
server server
.handle_connection(connection, socket_address, user_id, None, RealExecutor) .handle_connection(connection, socket_address, user, None, RealExecutor)
.await .await
.log_err(); .log_err();
} }
@ -6451,6 +6455,7 @@ mod tests {
let client_name = name.to_string(); let client_name = name.to_string();
let mut client = Client::new(http.clone()); let mut client = Client::new(http.clone());
let server = self.server.clone(); let server = self.server.clone();
let db = self.app_state.db.clone();
let connection_killers = self.connection_killers.clone(); let connection_killers = self.connection_killers.clone();
let forbid_connections = self.forbid_connections.clone(); let forbid_connections = self.forbid_connections.clone();
let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16); let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
@ -6471,6 +6476,7 @@ mod tests {
assert_eq!(credentials.access_token, "the-token"); assert_eq!(credentials.access_token, "the-token");
let server = server.clone(); let server = server.clone();
let db = db.clone();
let connection_killers = connection_killers.clone(); let connection_killers = connection_killers.clone();
let forbid_connections = forbid_connections.clone(); let forbid_connections = forbid_connections.clone();
let client_name = client_name.clone(); let client_name = client_name.clone();
@ -6484,11 +6490,12 @@ mod tests {
let (client_conn, server_conn, killed) = let (client_conn, server_conn, killed) =
Connection::in_memory(cx.background()); Connection::in_memory(cx.background());
connection_killers.lock().insert(user_id, killed); connection_killers.lock().insert(user_id, killed);
let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
cx.background() cx.background()
.spawn(server.handle_connection( .spawn(server.handle_connection(
server_conn, server_conn,
client_name, client_name,
user_id, user,
Some(connection_id_tx), Some(connection_id_tx),
cx.background(), cx.background(),
)) ))