Prioritize keepalive pings over incoming message handling in Peer

This commit is contained in:
Max Brunsfeld 2022-06-22 14:44:05 -07:00
parent 36f8c68099
commit 4cb68b2966

View file

@ -194,6 +194,21 @@ impl Peer {
return Ok(())
},
},
_ = keepalive_timer => {
tracing::debug!(%connection_id, "keepalive interval: pinging");
futures::select_biased! {
result = writer.write(proto::Message::Ping).fuse() => {
tracing::debug!(%connection_id, "keepalive interval: done pinging");
result.context("failed to send keepalive")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
}
_ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
Err(anyhow!("timed out sending keepalive"))?;
}
}
}
incoming = read_message => {
let incoming = incoming.context("error reading rpc message from socket")?;
tracing::debug!(%connection_id, "incoming rpc message: received");
@ -219,21 +234,6 @@ impl Peer {
}
break;
},
_ = keepalive_timer => {
tracing::debug!(%connection_id, "keepalive interval: pinging");
futures::select_biased! {
result = writer.write(proto::Message::Ping).fuse() => {
tracing::debug!(%connection_id, "keepalive interval: done pinging");
result.context("failed to send keepalive")?;
tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
}
_ = create_timer(WRITE_TIMEOUT).fuse() => {
tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
Err(anyhow!("timed out sending keepalive"))?;
}
}
}
_ = receive_timeout => {
tracing::debug!(%connection_id, "receive timeout: delay between messages too long");
Err(anyhow!("delay between messages too long"))?