Separate timeout and connection dropped errors out (#30457)

This commit is contained in:
Kirill Bulatov 2025-05-10 15:12:58 +03:00 committed by GitHub
parent 39da72161f
commit 471e02d48f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 313 additions and 115 deletions

View file

@ -49,7 +49,7 @@ use telemetry::Telemetry;
use thiserror::Error;
use tokio::net::TcpStream;
use url::Url;
use util::{ResultExt, TryFutureExt};
use util::{ConnectionResult, ResultExt};
pub use rpc::*;
pub use telemetry_events::Event;
@ -151,9 +151,19 @@ pub fn init(client: &Arc<Client>, cx: &mut App) {
let client = client.clone();
move |_: &SignIn, cx| {
if let Some(client) = client.upgrade() {
cx.spawn(async move |cx| {
client.authenticate_and_connect(true, &cx).log_err().await
})
cx.spawn(
async move |cx| match client.authenticate_and_connect(true, &cx).await {
ConnectionResult::Timeout => {
log::error!("Initial authentication timed out");
}
ConnectionResult::ConnectionReset => {
log::error!("Initial authentication connection reset");
}
ConnectionResult::Result(r) => {
r.log_err();
}
},
)
.detach();
}
}
@ -658,7 +668,7 @@ impl Client {
state._reconnect_task = None;
}
Status::ConnectionLost => {
let this = self.clone();
let client = self.clone();
state._reconnect_task = Some(cx.spawn(async move |cx| {
#[cfg(any(test, feature = "test-support"))]
let mut rng = StdRng::seed_from_u64(0);
@ -666,10 +676,25 @@ impl Client {
let mut rng = StdRng::from_entropy();
let mut delay = INITIAL_RECONNECTION_DELAY;
while let Err(error) = this.authenticate_and_connect(true, &cx).await {
log::error!("failed to connect {}", error);
if matches!(*this.status().borrow(), Status::ConnectionError) {
this.set_status(
loop {
match client.authenticate_and_connect(true, &cx).await {
ConnectionResult::Timeout => {
log::error!("client connect attempt timed out")
}
ConnectionResult::ConnectionReset => {
log::error!("client connect attempt reset")
}
ConnectionResult::Result(r) => {
if let Err(error) = r {
log::error!("failed to connect: {error}");
} else {
break;
}
}
}
if matches!(*client.status().borrow(), Status::ConnectionError) {
client.set_status(
Status::ReconnectionError {
next_reconnection: Instant::now() + delay,
},
@ -827,7 +852,7 @@ impl Client {
self: &Arc<Self>,
try_provider: bool,
cx: &AsyncApp,
) -> anyhow::Result<()> {
) -> ConnectionResult<()> {
let was_disconnected = match *self.status().borrow() {
Status::SignedOut => true,
Status::ConnectionError
@ -836,9 +861,14 @@ impl Client {
| Status::Reauthenticating { .. }
| Status::ReconnectionError { .. } => false,
Status::Connected { .. } | Status::Connecting { .. } | Status::Reconnecting { .. } => {
return Ok(());
return ConnectionResult::Result(Ok(()));
}
Status::UpgradeRequired => {
return ConnectionResult::Result(
Err(EstablishConnectionError::UpgradeRequired)
.context("client auth and connect"),
);
}
Status::UpgradeRequired => return Err(EstablishConnectionError::UpgradeRequired)?,
};
if was_disconnected {
self.set_status(Status::Authenticating, cx);
@ -862,12 +892,12 @@ impl Client {
Ok(creds) => credentials = Some(creds),
Err(err) => {
self.set_status(Status::ConnectionError, cx);
return Err(err);
return ConnectionResult::Result(Err(err));
}
}
}
_ = status_rx.next().fuse() => {
return Err(anyhow!("authentication canceled"));
return ConnectionResult::Result(Err(anyhow!("authentication canceled")));
}
}
}
@ -892,10 +922,10 @@ impl Client {
}
futures::select_biased! {
result = self.set_connection(conn, cx).fuse() => result,
result = self.set_connection(conn, cx).fuse() => ConnectionResult::Result(result.context("client auth and connect")),
_ = timeout => {
self.set_status(Status::ConnectionError, cx);
Err(anyhow!("timed out waiting on hello message from server"))
ConnectionResult::Timeout
}
}
}
@ -907,22 +937,22 @@ impl Client {
self.authenticate_and_connect(false, cx).await
} else {
self.set_status(Status::ConnectionError, cx);
Err(EstablishConnectionError::Unauthorized)?
ConnectionResult::Result(Err(EstablishConnectionError::Unauthorized).context("client auth and connect"))
}
}
Err(EstablishConnectionError::UpgradeRequired) => {
self.set_status(Status::UpgradeRequired, cx);
Err(EstablishConnectionError::UpgradeRequired)?
ConnectionResult::Result(Err(EstablishConnectionError::UpgradeRequired).context("client auth and connect"))
}
Err(error) => {
self.set_status(Status::ConnectionError, cx);
Err(error)?
ConnectionResult::Result(Err(error).context("client auth and connect"))
}
}
}
_ = &mut timeout => {
self.set_status(Status::ConnectionError, cx);
Err(anyhow!("timed out trying to establish connection"))
ConnectionResult::Timeout
}
}
}
@ -938,10 +968,7 @@ impl Client {
let peer_id = async {
log::debug!("waiting for server hello");
let message = incoming
.next()
.await
.ok_or_else(|| anyhow!("no hello message received"))?;
let message = incoming.next().await.context("no hello message received")?;
log::debug!("got server hello");
let hello_message_type_name = message.payload_type_name().to_string();
let hello = message
@ -1743,7 +1770,7 @@ mod tests {
status.next().await,
Some(Status::ConnectionError { .. })
));
auth_and_connect.await.unwrap_err();
auth_and_connect.await.into_response().unwrap_err();
// Allow the connection to be established.
let server = FakeServer::for_client(user_id, &client, cx).await;

View file

@ -107,6 +107,7 @@ impl FakeServer {
client
.authenticate_and_connect(false, &cx.to_async())
.await
.into_response()
.unwrap();
server