diff --git a/Cargo.lock b/Cargo.lock index 6063530e9f..61f6f42498 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7539,6 +7539,7 @@ dependencies = [ name = "gpui_tokio" version = "0.1.0" dependencies = [ + "anyhow", "gpui", "tokio", "util", diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index ed3f114943..f9b8a10610 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1290,19 +1290,21 @@ impl Client { "http" => Http, _ => Err(anyhow!("invalid rpc url: {}", rpc_url))?, }; - let rpc_host = rpc_url - .host_str() - .zip(rpc_url.port_or_known_default()) - .context("missing host in rpc url")?; - let stream = { - let handle = cx.update(|cx| gpui_tokio::Tokio::handle(cx)).ok().unwrap(); - let _guard = handle.enter(); - match proxy { - Some(proxy) => connect_proxy_stream(&proxy, rpc_host).await?, - None => Box::new(TcpStream::connect(rpc_host).await?), + let stream = gpui_tokio::Tokio::spawn_result(cx, { + let rpc_url = rpc_url.clone(); + async move { + let rpc_host = rpc_url + .host_str() + .zip(rpc_url.port_or_known_default()) + .context("missing host in rpc url")?; + Ok(match proxy { + Some(proxy) => connect_proxy_stream(&proxy, rpc_host).await?, + None => Box::new(TcpStream::connect(rpc_host).await?), + }) } - }; + })? + .await?; log::info!("connected to rpc endpoint {}", rpc_url); diff --git a/crates/cloud_api_client/src/cloud_api_client.rs b/crates/cloud_api_client/src/cloud_api_client.rs index 92417d8319..205f3e2432 100644 --- a/crates/cloud_api_client/src/cloud_api_client.rs +++ b/crates/cloud_api_client/src/cloud_api_client.rs @@ -102,13 +102,7 @@ impl CloudApiClient { let credentials = credentials.as_ref().context("no credentials provided")?; let authorization_header = format!("{} {}", credentials.user_id, credentials.access_token); - Ok(cx.spawn(async move |cx| { - let handle = cx - .update(|cx| Tokio::handle(cx)) - .ok() - .context("failed to get Tokio handle")?; - let _guard = handle.enter(); - + Ok(Tokio::spawn_result(cx, async move { let ws = WebSocket::connect(connect_url) .with_request( request::Builder::new() diff --git a/crates/gpui_tokio/Cargo.toml b/crates/gpui_tokio/Cargo.toml index 46d5eafd5a..2d4abf4063 100644 --- a/crates/gpui_tokio/Cargo.toml +++ b/crates/gpui_tokio/Cargo.toml @@ -13,6 +13,7 @@ path = "src/gpui_tokio.rs" doctest = false [dependencies] +anyhow.workspace = true util.workspace = true gpui.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } diff --git a/crates/gpui_tokio/src/gpui_tokio.rs b/crates/gpui_tokio/src/gpui_tokio.rs index fffe18a616..8384f2a88e 100644 --- a/crates/gpui_tokio/src/gpui_tokio.rs +++ b/crates/gpui_tokio/src/gpui_tokio.rs @@ -52,6 +52,28 @@ impl Tokio { }) } + /// Spawns the given future on Tokio's thread pool, and returns it via a GPUI task + /// Note that the Tokio task will be cancelled if the GPUI task is dropped + pub fn spawn_result(cx: &C, f: Fut) -> C::Result>> + where + C: AppContext, + Fut: Future> + Send + 'static, + R: Send + 'static, + { + cx.read_global(|tokio: &GlobalTokio, cx| { + let join_handle = tokio.runtime.spawn(f); + let abort_handle = join_handle.abort_handle(); + let cancel = defer(move || { + abort_handle.abort(); + }); + cx.background_spawn(async move { + let result = join_handle.await?; + drop(cancel); + result + }) + }) + } + pub fn handle(cx: &App) -> tokio::runtime::Handle { GlobalTokio::global(cx).runtime.handle().clone() }