Merge branch 'main' into request-to-join-project

This commit is contained in:
Antonio Scandurra 2022-05-17 14:55:20 +02:00
commit 225536accc
59 changed files with 3330 additions and 7719 deletions

View file

@ -21,13 +21,13 @@ async-lock = "2.4"
async-tungstenite = "0.16"
base64 = "0.13"
futures = "0.3"
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
parking_lot = "0.11.1"
prost = "0.8"
rand = "0.8"
rsa = "0.4"
serde = { version = "1", features = ["derive"] }
smol-timeout = "0.6"
tracing = { version = "0.1.34", features = ["log"] }
zstd = "0.9"
[build-dependencies]

View file

@ -22,6 +22,7 @@ use std::{
},
time::Duration,
};
use tracing::instrument;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct ConnectionId(pub u32);
@ -108,6 +109,7 @@ impl Peer {
})
}
#[instrument(skip_all)]
pub async fn add_connection<F, Fut, Out>(
self: &Arc<Self>,
connection: Connection,
@ -145,9 +147,12 @@ impl Peer {
let this = self.clone();
let response_channels = connection_state.response_channels.clone();
let handle_io = async move {
tracing::debug!(%connection_id, "handle io future: start");
let _end_connection = util::defer(|| {
response_channels.lock().take();
this.connections.write().remove(&connection_id);
tracing::debug!(%connection_id, "handle io future: end");
});
// Send messages on this frequency so the connection isn't closed.
@ -159,49 +164,68 @@ impl Peer {
futures::pin_mut!(receive_timeout);
loop {
tracing::debug!(%connection_id, "outer loop iteration start");
let read_message = reader.read().fuse();
futures::pin_mut!(read_message);
loop {
tracing::debug!(%connection_id, "inner loop iteration start");
futures::select_biased! {
outgoing = outgoing_rx.next().fuse() => match outgoing {
Some(outgoing) => {
tracing::debug!(%connection_id, "outgoing rpc message: writing");
if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await {
tracing::debug!(%connection_id, "outgoing rpc message: done writing");
result.context("failed to write RPC message")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after sending message");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
} else {
tracing::debug!(%connection_id, "outgoing rpc message: writing timed out");
Err(anyhow!("timed out writing message"))?;
}
}
None => {
log::info!("outgoing channel closed");
tracing::debug!(%connection_id, "outgoing rpc message: channel closed");
return Ok(())
},
},
incoming = read_message => {
let incoming = incoming.context("received invalid RPC message")?;
let incoming = incoming.context("error reading rpc message from socket")?;
tracing::debug!(%connection_id, "incoming rpc message: received");
tracing::debug!(%connection_id, "receive timeout: resetting");
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
if let proto::Message::Envelope(incoming) = incoming {
tracing::debug!(%connection_id, "incoming rpc message: processing");
match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await {
Some(Ok(_)) => {},
Some(Ok(_)) => {
tracing::debug!(%connection_id, "incoming rpc message: processed");
},
Some(Err(_)) => {
log::info!("incoming channel closed");
tracing::debug!(%connection_id, "incoming rpc message: channel closed");
return Ok(())
},
None => Err(anyhow!("timed out processing incoming message"))?,
None => {
tracing::debug!(%connection_id, "incoming rpc message: processing timed out");
Err(anyhow!("timed out processing incoming message"))?
},
}
}
break;
},
_ = keepalive_timer => {
tracing::debug!(%connection_id, "keepalive interval: pinging");
if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await {
tracing::debug!(%connection_id, "keepalive interval: done pinging");
result.context("failed to send keepalive")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
} else {
tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
Err(anyhow!("timed out sending keepalive"))?;
}
}
_ = receive_timeout => {
tracing::debug!(%connection_id, "receive timeout: delay between messages too long");
Err(anyhow!("delay between messages too long"))?
}
}
@ -217,25 +241,71 @@ impl Peer {
let incoming_rx = incoming_rx.filter_map(move |incoming| {
let response_channels = response_channels.clone();
async move {
let message_id = incoming.id;
tracing::debug!(?incoming, "incoming message future: start");
let _end = util::defer(move || {
tracing::debug!(
%connection_id,
message_id,
"incoming message future: end"
);
});
if let Some(responding_to) = incoming.responding_to {
tracing::debug!(
%connection_id,
message_id,
responding_to,
"incoming response: received"
);
let channel = response_channels.lock().as_mut()?.remove(&responding_to);
if let Some(tx) = channel {
let requester_resumed = oneshot::channel();
if let Err(error) = tx.send((incoming, requester_resumed.0)) {
log::debug!(
"received RPC but request future was dropped {:?}",
error.0
tracing::debug!(
%connection_id,
message_id,
responding_to = responding_to,
?error,
"incoming response: request future dropped",
);
}
tracing::debug!(
%connection_id,
message_id,
responding_to,
"incoming response: waiting to resume requester"
);
let _ = requester_resumed.1.await;
tracing::debug!(
%connection_id,
message_id,
responding_to,
"incoming response: requester resumed"
);
} else {
log::warn!("received RPC response to unknown request {}", responding_to);
tracing::warn!(
%connection_id,
message_id,
responding_to,
"incoming response: unknown request"
);
}
None
} else {
tracing::debug!(
%connection_id,
message_id,
"incoming message: received"
);
proto::build_typed_envelope(connection_id, incoming).or_else(|| {
log::error!("unable to construct a typed envelope");
tracing::error!(
%connection_id,
message_id,
"unable to construct a typed envelope"
);
None
})
}