From 00352aa185623974e18a2c250ae6ec5f2d3eeb88 Mon Sep 17 00:00:00 2001 From: Richard Feldman Date: Wed, 6 Aug 2025 21:28:41 -0400 Subject: [PATCH] Establish WebSocket connection to Cloud (#35734) This PR adds a new WebSocket connection to Cloud. This connection will be used to push down notifications from the server to the client. Release Notes: - N/A --------- Co-authored-by: Marshall Bowers --- Cargo.lock | 48 +++++++++++- Cargo.toml | 3 + crates/client/src/client.rs | 35 +++++++++ crates/cloud_api_client/Cargo.toml | 3 + .../cloud_api_client/src/cloud_api_client.rs | 43 +++++++++++ crates/cloud_api_client/src/websocket.rs | 73 +++++++++++++++++++ .../cloud_api_types/src/websocket_protocol.rs | 2 +- tooling/workspace-hack/Cargo.toml | 24 +++--- 8 files changed, 214 insertions(+), 17 deletions(-) create mode 100644 crates/cloud_api_client/src/websocket.rs diff --git a/Cargo.lock b/Cargo.lock index ea40cff81c..860dcfb2ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1411,7 +1411,7 @@ dependencies = [ "anyhow", "arrayvec", "log", - "nom", + "nom 7.1.3", "num-rational", "v_frame", ] @@ -2785,7 +2785,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom", + "nom 7.1.3", ] [[package]] @@ -3071,10 +3071,13 @@ dependencies = [ "anyhow", "cloud_api_types", "futures 0.3.31", + "gpui", + "gpui_tokio", "http_client", "parking_lot", "serde_json", "workspace-hack", + "yawc", ] [[package]] @@ -10582,6 +10585,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "noop_proc_macro" version = "0.3.0" @@ -15403,7 +15415,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" dependencies = [ - "nom", + "nom 7.1.3", "unicode_categories", ] @@ -19979,7 +19991,7 @@ dependencies = [ "naga", "nix 0.28.0", "nix 0.29.0", - "nom", + "nom 7.1.3", "num-bigint", "num-bigint-dig", "num-integer", @@ -20314,6 +20326,34 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" +[[package]] +name = "yawc" +version = "0.2.4" +source = "git+https://github.com/deviant-forks/yawc?rev=1899688f3e69ace4545aceb97b2a13881cf26142#1899688f3e69ace4545aceb97b2a13881cf26142" +dependencies = [ + "base64 0.22.1", + "bytes 1.10.1", + "flate2", + "futures 0.3.31", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "js-sys", + "nom 8.0.0", + "pin-project", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "tokio", + "tokio-rustls 0.26.2", + "tokio-util", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", +] + [[package]] name = "yazi" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index a60a65fcd8..a04d8f6099 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -661,6 +661,9 @@ which = "6.0.0" windows-core = "0.61" wit-component = "0.221" workspace-hack = "0.1.0" +# We can switch back to the published version once https://github.com/infinitefield/yawc/pull/16 is merged and a new +# version is released. +yawc = { git = "https://github.com/deviant-forks/yawc", rev = "1899688f3e69ace4545aceb97b2a13881cf26142" } zstd = "0.11" [workspace.dependencies.async-stripe] diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index b4894cddcf..0480ed1c3e 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -14,6 +14,7 @@ use async_tungstenite::tungstenite::{ }; use clock::SystemClock; use cloud_api_client::CloudApiClient; +use cloud_api_client::websocket_protocol::MessageToClient; use credentials_provider::CredentialsProvider; use futures::{ AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt, TryFutureExt as _, TryStreamExt, @@ -933,6 +934,32 @@ impl Client { } } + /// Establishes a WebSocket connection with Cloud for receiving updates from the server. + async fn connect_to_cloud(self: &Arc, cx: &AsyncApp) -> Result<()> { + let connect_task = cx.update({ + let cloud_client = self.cloud_client.clone(); + move |cx| cloud_client.connect(cx) + })??; + let connection = connect_task.await?; + + let (mut messages, task) = cx.update(|cx| connection.spawn(cx))?; + task.detach(); + + cx.spawn({ + let this = self.clone(); + async move |cx| { + while let Some(message) = messages.next().await { + if let Some(message) = message.log_err() { + this.handle_message_to_client(message, cx); + } + } + } + }) + .detach(); + + Ok(()) + } + /// Performs a sign-in and also connects to Collab. /// /// This is called in places where we *don't* need to connect in the future. We will replace these calls with calls @@ -944,6 +971,8 @@ impl Client { ) -> Result<()> { let credentials = self.sign_in(try_provider, cx).await?; + self.connect_to_cloud(cx).await.log_err(); + let connect_result = match self.connect_with_credentials(credentials, cx).await { ConnectionResult::Timeout => Err(anyhow!("connection timed out")), ConnectionResult::ConnectionReset => Err(anyhow!("connection reset")), @@ -1622,6 +1651,12 @@ impl Client { } } + fn handle_message_to_client(self: &Arc, message: MessageToClient, _cx: &AsyncApp) { + match message { + MessageToClient::UserUpdated => {} + } + } + pub fn telemetry(&self) -> &Arc { &self.telemetry } diff --git a/crates/cloud_api_client/Cargo.toml b/crates/cloud_api_client/Cargo.toml index d56aa94c6e..8e50ccb191 100644 --- a/crates/cloud_api_client/Cargo.toml +++ b/crates/cloud_api_client/Cargo.toml @@ -15,7 +15,10 @@ path = "src/cloud_api_client.rs" anyhow.workspace = true cloud_api_types.workspace = true futures.workspace = true +gpui.workspace = true +gpui_tokio.workspace = true http_client.workspace = true parking_lot.workspace = true serde_json.workspace = true workspace-hack.workspace = true +yawc.workspace = true diff --git a/crates/cloud_api_client/src/cloud_api_client.rs b/crates/cloud_api_client/src/cloud_api_client.rs index edac051a0e..ef9a1a9a55 100644 --- a/crates/cloud_api_client/src/cloud_api_client.rs +++ b/crates/cloud_api_client/src/cloud_api_client.rs @@ -1,11 +1,19 @@ +mod websocket; + use std::sync::Arc; use anyhow::{Context, Result, anyhow}; +use cloud_api_types::websocket_protocol::{PROTOCOL_VERSION, PROTOCOL_VERSION_HEADER_NAME}; pub use cloud_api_types::*; use futures::AsyncReadExt as _; +use gpui::{App, Task}; +use gpui_tokio::Tokio; use http_client::http::request; use http_client::{AsyncBody, HttpClientWithUrl, Method, Request, StatusCode}; use parking_lot::RwLock; +use yawc::WebSocket; + +use crate::websocket::Connection; struct Credentials { user_id: u32, @@ -78,6 +86,41 @@ impl CloudApiClient { Ok(serde_json::from_str(&body)?) } + pub fn connect(&self, cx: &App) -> Result>> { + let mut connect_url = self + .http_client + .build_zed_cloud_url("/client/users/connect", &[])?; + connect_url + .set_scheme(match connect_url.scheme() { + "https" => "wss", + "http" => "ws", + scheme => Err(anyhow!("invalid URL scheme: {scheme}"))?, + }) + .map_err(|_| anyhow!("failed to set URL scheme"))?; + + let credentials = self.credentials.read(); + let credentials = credentials.as_ref().context("no credentials provided")?; + let authorization_header = format!("{} {}", credentials.user_id, credentials.access_token); + + Ok(cx.spawn(async move |cx| { + let handle = cx + .update(|cx| Tokio::handle(cx)) + .ok() + .context("failed to get Tokio handle")?; + let _guard = handle.enter(); + + let ws = WebSocket::connect(connect_url) + .with_request( + request::Builder::new() + .header("Authorization", authorization_header) + .header(PROTOCOL_VERSION_HEADER_NAME, PROTOCOL_VERSION.to_string()), + ) + .await?; + + Ok(Connection::new(ws)) + })) + } + pub async fn accept_terms_of_service(&self) -> Result { let request = self.build_request( Request::builder().method(Method::POST).uri( diff --git a/crates/cloud_api_client/src/websocket.rs b/crates/cloud_api_client/src/websocket.rs new file mode 100644 index 0000000000..48a628db78 --- /dev/null +++ b/crates/cloud_api_client/src/websocket.rs @@ -0,0 +1,73 @@ +use std::pin::Pin; +use std::time::Duration; + +use anyhow::Result; +use cloud_api_types::websocket_protocol::MessageToClient; +use futures::channel::mpsc::unbounded; +use futures::stream::{SplitSink, SplitStream}; +use futures::{FutureExt as _, SinkExt as _, Stream, StreamExt as _, TryStreamExt as _, pin_mut}; +use gpui::{App, BackgroundExecutor, Task}; +use yawc::WebSocket; +use yawc::frame::{FrameView, OpCode}; + +const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); + +pub type MessageStream = Pin>>>; + +pub struct Connection { + tx: SplitSink, + rx: SplitStream, +} + +impl Connection { + pub fn new(ws: WebSocket) -> Self { + let (tx, rx) = ws.split(); + + Self { tx, rx } + } + + pub fn spawn(self, cx: &App) -> (MessageStream, Task<()>) { + let (mut tx, rx) = (self.tx, self.rx); + + let (message_tx, message_rx) = unbounded(); + + let handle_io = |executor: BackgroundExecutor| async move { + // Send messages on this frequency so the connection isn't closed. + let keepalive_timer = executor.timer(KEEPALIVE_INTERVAL).fuse(); + futures::pin_mut!(keepalive_timer); + + let rx = rx.fuse(); + pin_mut!(rx); + + loop { + futures::select_biased! { + _ = keepalive_timer => { + let _ = tx.send(FrameView::ping(Vec::new())).await; + + keepalive_timer.set(executor.timer(KEEPALIVE_INTERVAL).fuse()); + } + frame = rx.next() => { + let Some(frame) = frame else { + break; + }; + + match frame.opcode { + OpCode::Binary => { + let message_result = MessageToClient::deserialize(&frame.payload); + message_tx.unbounded_send(message_result).ok(); + } + OpCode::Close => { + break; + } + _ => {} + } + } + } + } + }; + + let task = cx.spawn(async move |cx| handle_io(cx.background_executor().clone()).await); + + (message_rx.into_stream().boxed(), task) + } +} diff --git a/crates/cloud_api_types/src/websocket_protocol.rs b/crates/cloud_api_types/src/websocket_protocol.rs index c90d09e370..75f6a73b43 100644 --- a/crates/cloud_api_types/src/websocket_protocol.rs +++ b/crates/cloud_api_types/src/websocket_protocol.rs @@ -8,7 +8,7 @@ pub const PROTOCOL_VERSION: u32 = 0; pub const PROTOCOL_VERSION_HEADER_NAME: &str = "x-zed-protocol-version"; /// A message from Cloud to the Zed client. -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum MessageToClient { /// The user was updated and should be refreshed. UserUpdated, diff --git a/tooling/workspace-hack/Cargo.toml b/tooling/workspace-hack/Cargo.toml index 5678e46236..338985ed95 100644 --- a/tooling/workspace-hack/Cargo.toml +++ b/tooling/workspace-hack/Cargo.toml @@ -305,7 +305,7 @@ scopeguard = { version = "1" } security-framework = { version = "3", features = ["OSX_10_14"] } security-framework-sys = { version = "2", features = ["OSX_10_14"] } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } tower = { version = "0.5", default-features = false, features = ["timeout", "util"] } @@ -334,7 +334,7 @@ scopeguard = { version = "1" } security-framework = { version = "3", features = ["OSX_10_14"] } security-framework-sys = { version = "2", features = ["OSX_10_14"] } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } tower = { version = "0.5", default-features = false, features = ["timeout", "util"] } @@ -362,7 +362,7 @@ scopeguard = { version = "1" } security-framework = { version = "3", features = ["OSX_10_14"] } security-framework-sys = { version = "2", features = ["OSX_10_14"] } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } tower = { version = "0.5", default-features = false, features = ["timeout", "util"] } @@ -391,7 +391,7 @@ scopeguard = { version = "1" } security-framework = { version = "3", features = ["OSX_10_14"] } security-framework-sys = { version = "2", features = ["OSX_10_14"] } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } tower = { version = "0.5", default-features = false, features = ["timeout", "util"] } @@ -429,7 +429,7 @@ rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", scopeguard = { version = "1" } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } @@ -468,7 +468,7 @@ rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["ev rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", "net", "process", "termios", "time"] } scopeguard = { version = "1" } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } @@ -509,7 +509,7 @@ rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", scopeguard = { version = "1" } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } @@ -548,7 +548,7 @@ rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["ev rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", "net", "process", "termios", "time"] } scopeguard = { version = "1" } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } @@ -568,7 +568,7 @@ ring = { version = "0.17", features = ["std"] } rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["event"] } scopeguard = { version = "1" } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } tower = { version = "0.5", default-features = false, features = ["timeout", "util"] } @@ -592,7 +592,7 @@ ring = { version = "0.17", features = ["std"] } rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["event"] } scopeguard = { version = "1" } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } tower = { version = "0.5", default-features = false, features = ["timeout", "util"] } @@ -636,7 +636,7 @@ rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", scopeguard = { version = "1" } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } @@ -675,7 +675,7 @@ rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["ev rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", "net", "process", "termios", "time"] } scopeguard = { version = "1" } sync_wrapper = { version = "1", default-features = false, features = ["futures"] } -tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] } tokio-socks = { version = "0.5", features = ["futures-io"] } tokio-stream = { version = "0.1", features = ["fs"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }