Upgrade async-tungstenite to tokio (#26193)

We're seeing panics caused by a buggy implementation of AsyncWrite
that is being passed to rustls: 

https://github.com/rustls/rustls/issues/2316#issuecomment-2662838186

One hypothesis was that we're using (comparatively) non-standard async
tools for connecting over websockets; so this attempts to make us be
(comparitvely) more standard.

Release Notes:

- N/A
This commit is contained in:
Conrad Irwin 2025-04-08 09:17:08 -06:00 committed by GitHub
parent ea33d78ae4
commit ca4cc4764b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 84 additions and 140 deletions

View file

@ -18,7 +18,7 @@ test-support = ["clock/test-support", "collections/test-support", "gpui/test-sup
[dependencies]
anyhow.workspace = true
async-recursion = "0.3"
async-tungstenite = { workspace = true, features = ["async-std", "async-tls"] }
async-tungstenite = { workspace = true, features = ["tokio", "tokio-rustls-manual-roots"] }
chrono = { workspace = true, features = ["serde"] }
clock.workspace = true
collections.workspace = true
@ -26,6 +26,7 @@ credentials_provider.workspace = true
feature_flags.workspace = true
futures.workspace = true
gpui.workspace = true
gpui_tokio.workspace = true
http_client.workspace = true
http_client_tls.workspace = true
log.workspace = true
@ -51,6 +52,7 @@ url.workspace = true
util.workspace = true
worktree.workspace = true
telemetry.workspace = true
tokio.workspace = true
workspace-hack.workspace = true
[dev-dependencies]
@ -67,4 +69,3 @@ windows.workspace = true
[target.'cfg(target_os = "macos")'.dependencies]
cocoa.workspace = true
async-native-tls = { version = "0.5.0", features = ["vendored"] }

View file

@ -20,7 +20,7 @@ use futures::{
AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt, TryFutureExt as _, TryStreamExt,
channel::oneshot, future::BoxFuture,
};
use gpui::{App, AppContext as _, AsyncApp, Entity, Global, Task, WeakEntity, actions};
use gpui::{App, AsyncApp, Entity, Global, Task, WeakEntity, actions};
use http_client::{AsyncBody, HttpClient, HttpClientWithUrl};
use parking_lot::RwLock;
use postage::watch;
@ -1086,7 +1086,7 @@ impl Client {
let rpc_url = self.rpc_url(http, release_channel);
let system_id = self.telemetry.system_id();
let metrics_id = self.telemetry.metrics_id();
cx.background_spawn(async move {
cx.spawn(async move |cx| {
use HttpOrHttps::*;
#[derive(Debug)]
@ -1105,7 +1105,12 @@ impl Client {
.host_str()
.zip(rpc_url.port_or_known_default())
.ok_or_else(|| anyhow!("missing host in rpc url"))?;
let stream = connect_socks_proxy_stream(proxy.as_ref(), rpc_host).await?;
let stream = {
let handle = cx.update(|cx| gpui_tokio::Tokio::handle(cx)).ok().unwrap();
let _guard = handle.enter();
connect_socks_proxy_stream(proxy.as_ref(), rpc_host).await?
};
log::info!("connected to rpc endpoint {}", rpc_url);
@ -1144,30 +1149,19 @@ impl Client {
request_headers.insert("x-zed-metrics-id", HeaderValue::from_str(&metrics_id)?);
}
match url_scheme {
Https => {
let (stream, _) =
async_tungstenite::async_tls::client_async_tls_with_connector(
request,
stream,
Some(http_client_tls::tls_config().into()),
)
.await?;
Ok(Connection::new(
stream
.map_err(|error| anyhow!(error))
.sink_map_err(|error| anyhow!(error)),
))
}
Http => {
let (stream, _) = async_tungstenite::client_async(request, stream).await?;
Ok(Connection::new(
stream
.map_err(|error| anyhow!(error))
.sink_map_err(|error| anyhow!(error)),
))
}
}
let (stream, _) = async_tungstenite::tokio::client_async_tls_with_connector_and_config(
request,
stream,
Some(Arc::new(http_client_tls::tls_config()).into()),
None,
)
.await?;
Ok(Connection::new(
stream
.map_err(|error| anyhow!(error))
.sink_map_err(|error| anyhow!(error)),
))
})
}
@ -1639,7 +1633,7 @@ mod tests {
use crate::test::FakeServer;
use clock::FakeSystemClock;
use gpui::{BackgroundExecutor, TestAppContext};
use gpui::{AppContext as _, BackgroundExecutor, TestAppContext};
use http_client::FakeHttpClient;
use parking_lot::Mutex;
use proto::TypedEnvelope;

View file

@ -1,11 +1,7 @@
//! socks proxy
use anyhow::{Result, anyhow};
use futures::io::{AsyncRead, AsyncWrite};
use http_client::Uri;
use tokio_socks::{
io::Compat,
tcp::{Socks4Stream, Socks5Stream},
};
use tokio_socks::tcp::{Socks4Stream, Socks5Stream};
pub(crate) async fn connect_socks_proxy_stream(
proxy: Option<&Uri>,
@ -14,7 +10,7 @@ pub(crate) async fn connect_socks_proxy_stream(
let stream = match parse_socks_proxy(proxy) {
Some((socks_proxy, SocksVersion::V4)) => {
let stream = Socks4Stream::connect_with_socket(
Compat::new(smol::net::TcpStream::connect(socks_proxy).await?),
tokio::net::TcpStream::connect(socks_proxy).await?,
rpc_host,
)
.await
@ -23,13 +19,15 @@ pub(crate) async fn connect_socks_proxy_stream(
}
Some((socks_proxy, SocksVersion::V5)) => Box::new(
Socks5Stream::connect_with_socket(
Compat::new(smol::net::TcpStream::connect(socks_proxy).await?),
tokio::net::TcpStream::connect(socks_proxy).await?,
rpc_host,
)
.await
.map_err(|err| anyhow!("error connecting to socks {}", err))?,
) as Box<dyn AsyncReadWrite>,
None => Box::new(smol::net::TcpStream::connect(rpc_host).await?) as Box<dyn AsyncReadWrite>,
None => {
Box::new(tokio::net::TcpStream::connect(rpc_host).await?) as Box<dyn AsyncReadWrite>
}
};
Ok(stream)
}
@ -60,5 +58,11 @@ enum SocksVersion {
V5,
}
pub(crate) trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
impl<T: AsyncRead + AsyncWrite + Unpin + Send + 'static> AsyncReadWrite for T {}
pub(crate) trait AsyncReadWrite:
tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static
{
}
impl<T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static> AsyncReadWrite
for T
{
}