Use tracing instead of log in collab and rpc crates
Co-Authored-By: Antonio Scandurra <me@as-cii.com>
This commit is contained in:
parent
9f6e82720d
commit
9ca6e29a17
5 changed files with 52 additions and 56 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -3865,7 +3865,6 @@ dependencies = [
|
||||||
"collections",
|
"collections",
|
||||||
"futures",
|
"futures",
|
||||||
"gpui",
|
"gpui",
|
||||||
"log",
|
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"prost 0.8.0",
|
"prost 0.8.0",
|
||||||
"prost-build 0.8.0",
|
"prost-build 0.8.0",
|
||||||
|
|
|
@ -24,11 +24,9 @@ axum = { version = "0.5", features = ["json", "headers", "ws"] }
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
clap = { version = "3.1", features = ["derive"], optional = true }
|
clap = { version = "3.1", features = ["derive"], optional = true }
|
||||||
envy = "0.4.2"
|
envy = "0.4.2"
|
||||||
env_logger = "0.8"
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
lazy_static = "1.4"
|
lazy_static = "1.4"
|
||||||
lipsum = { version = "0.8", optional = true }
|
lipsum = { version = "0.8", optional = true }
|
||||||
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
|
|
||||||
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
|
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
|
||||||
opentelemetry-otlp = { version = "0.10", features = ["tls-roots"] }
|
opentelemetry-otlp = { version = "0.10", features = ["tls-roots"] }
|
||||||
parking_lot = "0.11.1"
|
parking_lot = "0.11.1"
|
||||||
|
@ -60,6 +58,7 @@ rpc = { path = "../rpc", features = ["test-support"] }
|
||||||
client = { path = "../client", features = ["test-support"] }
|
client = { path = "../client", features = ["test-support"] }
|
||||||
editor = { path = "../editor", features = ["test-support"] }
|
editor = { path = "../editor", features = ["test-support"] }
|
||||||
language = { path = "../language", features = ["test-support"] }
|
language = { path = "../language", features = ["test-support"] }
|
||||||
|
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
|
||||||
lsp = { path = "../lsp", features = ["test-support"] }
|
lsp = { path = "../lsp", features = ["test-support"] }
|
||||||
project = { path = "../project", features = ["test-support"] }
|
project = { path = "../project", features = ["test-support"] }
|
||||||
settings = { path = "../settings", features = ["test-support"] }
|
settings = { path = "../settings", features = ["test-support"] }
|
||||||
|
|
|
@ -44,7 +44,7 @@ impl AppState {
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
if let Err(error) = env::load_dotenv() {
|
if let Err(error) = env::load_dotenv() {
|
||||||
log::error!(
|
eprintln!(
|
||||||
"error loading .env.toml (this is expected in production): {}",
|
"error loading .env.toml (this is expected in production): {}",
|
||||||
error
|
error
|
||||||
);
|
);
|
||||||
|
|
|
@ -21,7 +21,6 @@ async-lock = "2.4"
|
||||||
async-tungstenite = "0.16"
|
async-tungstenite = "0.16"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
|
|
||||||
parking_lot = "0.11.1"
|
parking_lot = "0.11.1"
|
||||||
prost = "0.8"
|
prost = "0.8"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
|
|
@ -9,7 +9,6 @@ use futures::{
|
||||||
stream::BoxStream,
|
stream::BoxStream,
|
||||||
FutureExt, SinkExt, StreamExt,
|
FutureExt, SinkExt, StreamExt,
|
||||||
};
|
};
|
||||||
use log::as_debug;
|
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use smol_timeout::TimeoutExt;
|
use smol_timeout::TimeoutExt;
|
||||||
use std::sync::atomic::Ordering::SeqCst;
|
use std::sync::atomic::Ordering::SeqCst;
|
||||||
|
@ -148,12 +147,12 @@ impl Peer {
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
let response_channels = connection_state.response_channels.clone();
|
let response_channels = connection_state.response_channels.clone();
|
||||||
let handle_io = async move {
|
let handle_io = async move {
|
||||||
log::debug!(connection_id = connection_id.0; "handle io future: start");
|
tracing::debug!(%connection_id, "handle io future: start");
|
||||||
|
|
||||||
let _end_connection = util::defer(|| {
|
let _end_connection = util::defer(|| {
|
||||||
response_channels.lock().take();
|
response_channels.lock().take();
|
||||||
this.connections.write().remove(&connection_id);
|
this.connections.write().remove(&connection_id);
|
||||||
log::debug!(connection_id = connection_id.0; "handle io future: end");
|
tracing::debug!(%connection_id, "handle io future: end");
|
||||||
});
|
});
|
||||||
|
|
||||||
// Send messages on this frequency so the connection isn't closed.
|
// Send messages on this frequency so the connection isn't closed.
|
||||||
|
@ -165,48 +164,48 @@ impl Peer {
|
||||||
futures::pin_mut!(receive_timeout);
|
futures::pin_mut!(receive_timeout);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
log::debug!(connection_id = connection_id.0; "outer loop iteration start");
|
tracing::debug!(%connection_id, "outer loop iteration start");
|
||||||
let read_message = reader.read().fuse();
|
let read_message = reader.read().fuse();
|
||||||
futures::pin_mut!(read_message);
|
futures::pin_mut!(read_message);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
log::debug!(connection_id = connection_id.0; "inner loop iteration start");
|
tracing::debug!(%connection_id, "inner loop iteration start");
|
||||||
futures::select_biased! {
|
futures::select_biased! {
|
||||||
outgoing = outgoing_rx.next().fuse() => match outgoing {
|
outgoing = outgoing_rx.next().fuse() => match outgoing {
|
||||||
Some(outgoing) => {
|
Some(outgoing) => {
|
||||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: writing");
|
tracing::debug!(%connection_id, "outgoing rpc message: writing");
|
||||||
if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await {
|
if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await {
|
||||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: done writing");
|
tracing::debug!(%connection_id, "outgoing rpc message: done writing");
|
||||||
result.context("failed to write RPC message")?;
|
result.context("failed to write RPC message")?;
|
||||||
log::debug!(connection_id = connection_id.0; "keepalive interval: resetting after sending message");
|
tracing::debug!(%connection_id, "keepalive interval: resetting after sending message");
|
||||||
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
|
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
|
||||||
} else {
|
} else {
|
||||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: writing timed out");
|
tracing::debug!(%connection_id, "outgoing rpc message: writing timed out");
|
||||||
Err(anyhow!("timed out writing message"))?;
|
Err(anyhow!("timed out writing message"))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
log::debug!(connection_id = connection_id.0; "outgoing rpc message: channel closed");
|
tracing::debug!(%connection_id, "outgoing rpc message: channel closed");
|
||||||
return Ok(())
|
return Ok(())
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
incoming = read_message => {
|
incoming = read_message => {
|
||||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: received");
|
tracing::debug!(%connection_id, "incoming rpc message: received");
|
||||||
let incoming = incoming.context("received invalid RPC message")?;
|
let incoming = incoming.context("received invalid RPC message")?;
|
||||||
log::debug!(connection_id = connection_id.0; "receive timeout: resetting");
|
tracing::debug!(%connection_id, "receive timeout: resetting");
|
||||||
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
|
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
|
||||||
if let proto::Message::Envelope(incoming) = incoming {
|
if let proto::Message::Envelope(incoming) = incoming {
|
||||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: processing");
|
tracing::debug!(%connection_id, "incoming rpc message: processing");
|
||||||
match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await {
|
match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await {
|
||||||
Some(Ok(_)) => {
|
Some(Ok(_)) => {
|
||||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: processed");
|
tracing::debug!(%connection_id, "incoming rpc message: processed");
|
||||||
},
|
},
|
||||||
Some(Err(_)) => {
|
Some(Err(_)) => {
|
||||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: channel closed");
|
tracing::debug!(%connection_id, "incoming rpc message: channel closed");
|
||||||
return Ok(())
|
return Ok(())
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
log::debug!(connection_id = connection_id.0; "incoming rpc message: processing timed out");
|
tracing::debug!(%connection_id, "incoming rpc message: processing timed out");
|
||||||
Err(anyhow!("timed out processing incoming message"))?
|
Err(anyhow!("timed out processing incoming message"))?
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -214,19 +213,19 @@ impl Peer {
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
_ = keepalive_timer => {
|
_ = keepalive_timer => {
|
||||||
log::debug!(connection_id = connection_id.0; "keepalive interval: pinging");
|
tracing::debug!(%connection_id, "keepalive interval: pinging");
|
||||||
if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await {
|
if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await {
|
||||||
log::debug!(connection_id = connection_id.0; "keepalive interval: done pinging");
|
tracing::debug!(%connection_id, "keepalive interval: done pinging");
|
||||||
result.context("failed to send keepalive")?;
|
result.context("failed to send keepalive")?;
|
||||||
log::debug!(connection_id = connection_id.0; "keepalive interval: resetting after pinging");
|
tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
|
||||||
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
|
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
|
||||||
} else {
|
} else {
|
||||||
log::debug!(connection_id = connection_id.0; "keepalive interval: pinging timed out");
|
tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
|
||||||
Err(anyhow!("timed out sending keepalive"))?;
|
Err(anyhow!("timed out sending keepalive"))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = receive_timeout => {
|
_ = receive_timeout => {
|
||||||
log::debug!(connection_id = connection_id.0; "receive timeout: delay between messages too long");
|
tracing::debug!(%connection_id, "receive timeout: delay between messages too long");
|
||||||
Err(anyhow!("delay between messages too long"))?
|
Err(anyhow!("delay between messages too long"))?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,68 +242,68 @@ impl Peer {
|
||||||
let response_channels = response_channels.clone();
|
let response_channels = response_channels.clone();
|
||||||
async move {
|
async move {
|
||||||
let message_id = incoming.id;
|
let message_id = incoming.id;
|
||||||
log::debug!(incoming = as_debug!(&incoming); "incoming message future: start");
|
tracing::debug!(?incoming, "incoming message future: start");
|
||||||
let _end = util::defer(move || {
|
let _end = util::defer(move || {
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id;
|
message_id,
|
||||||
"incoming message future: end"
|
"incoming message future: end"
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Some(responding_to) = incoming.responding_to {
|
if let Some(responding_to) = incoming.responding_to {
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id,
|
message_id,
|
||||||
responding_to = responding_to;
|
responding_to,
|
||||||
"incoming response: received"
|
"incoming response: received"
|
||||||
);
|
);
|
||||||
let channel = response_channels.lock().as_mut()?.remove(&responding_to);
|
let channel = response_channels.lock().as_mut()?.remove(&responding_to);
|
||||||
if let Some(tx) = channel {
|
if let Some(tx) = channel {
|
||||||
let requester_resumed = oneshot::channel();
|
let requester_resumed = oneshot::channel();
|
||||||
if let Err(error) = tx.send((incoming, requester_resumed.0)) {
|
if let Err(error) = tx.send((incoming, requester_resumed.0)) {
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id,
|
message_id,
|
||||||
responding_to = responding_to,
|
responding_to = responding_to,
|
||||||
error = as_debug!(error);
|
?error,
|
||||||
"incoming response: request future dropped",
|
"incoming response: request future dropped",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id,
|
message_id,
|
||||||
responding_to = responding_to;
|
responding_to,
|
||||||
"incoming response: waiting to resume requester"
|
"incoming response: waiting to resume requester"
|
||||||
);
|
);
|
||||||
let _ = requester_resumed.1.await;
|
let _ = requester_resumed.1.await;
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id,
|
message_id,
|
||||||
responding_to = responding_to;
|
responding_to,
|
||||||
"incoming response: requester resumed"
|
"incoming response: requester resumed"
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
log::warn!(
|
tracing::warn!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id,
|
message_id,
|
||||||
responding_to = responding_to;
|
responding_to,
|
||||||
"incoming response: unknown request"
|
"incoming response: unknown request"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
log::debug!(
|
tracing::debug!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id;
|
message_id,
|
||||||
"incoming message: received"
|
"incoming message: received"
|
||||||
);
|
);
|
||||||
proto::build_typed_envelope(connection_id, incoming).or_else(|| {
|
proto::build_typed_envelope(connection_id, incoming).or_else(|| {
|
||||||
log::error!(
|
tracing::error!(
|
||||||
connection_id = connection_id.0,
|
%connection_id,
|
||||||
message_id = message_id;
|
message_id,
|
||||||
"unable to construct a typed envelope"
|
"unable to construct a typed envelope"
|
||||||
);
|
);
|
||||||
None
|
None
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue