From 786e72468408df61810a318759928e80e4f4de71 Mon Sep 17 00:00:00 2001 From: Michael Sloan Date: Mon, 23 Jun 2025 23:14:15 -0600 Subject: [PATCH] Fetch and wait for channels when opening channel notes via URL (#33291) Release Notes: * Collaboration: Now fetches and waits for channels when opening channel notes via URL. --- Cargo.lock | 1 + crates/channel/Cargo.toml | 1 + crates/channel/src/channel_store.rs | 146 ++++++++++++++++++---------- 3 files changed, 99 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 518b790a2c..234e3998bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2697,6 +2697,7 @@ dependencies = [ "http_client", "language", "log", + "postage", "rand 0.8.5", "release_channel", "rpc", diff --git a/crates/channel/Cargo.toml b/crates/channel/Cargo.toml index 73b850a815..962847f3f1 100644 --- a/crates/channel/Cargo.toml +++ b/crates/channel/Cargo.toml @@ -24,6 +24,7 @@ futures.workspace = true gpui.workspace = true language.workspace = true log.workspace = true +postage.workspace = true rand.workspace = true release_channel.workspace = true rpc.workspace = true diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index a73734cd49..b7ba811421 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -4,13 +4,14 @@ use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::Channel use anyhow::{Context as _, Result, anyhow}; use channel_index::ChannelIndex; use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore}; -use collections::{HashMap, HashSet, hash_map}; +use collections::{HashMap, HashSet}; use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared}; use gpui::{ App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task, WeakEntity, }; use language::Capability; +use postage::{sink::Sink, watch}; use rpc::{ TypedEnvelope, proto::{self, ChannelRole, ChannelVisibility}, @@ -43,6 +44,7 @@ pub struct ChannelStore { opened_chats: HashMap>, client: Arc, did_subscribe: bool, + channels_loaded: (watch::Sender, watch::Receiver), user_store: Entity, _rpc_subscriptions: [Subscription; 2], _watch_connection_status: Task>, @@ -219,6 +221,7 @@ impl ChannelStore { }), channel_states: Default::default(), did_subscribe: false, + channels_loaded: watch::channel_with(false), } } @@ -234,6 +237,48 @@ impl ChannelStore { } } + pub fn wait_for_channels( + &mut self, + timeout: Duration, + cx: &mut Context, + ) -> Task> { + let mut channels_loaded_rx = self.channels_loaded.1.clone(); + if *channels_loaded_rx.borrow() { + return Task::ready(Ok(())); + } + + let mut status_receiver = self.client.status(); + if status_receiver.borrow().is_connected() { + self.initialize(); + } + + let mut timer = cx.background_executor().timer(timeout).fuse(); + cx.spawn(async move |this, cx| { + loop { + futures::select_biased! { + channels_loaded = channels_loaded_rx.next().fuse() => { + if let Some(true) = channels_loaded { + return Ok(()); + } + } + status = status_receiver.next().fuse() => { + if let Some(status) = status { + if status.is_connected() { + this.update(cx, |this, _cx| { + this.initialize(); + }).ok(); + } + } + continue; + } + _ = timer => { + return Err(anyhow!("{:?} elapsed without receiving channels", timeout)); + } + } + } + }) + } + pub fn client(&self) -> Arc { self.client.clone() } @@ -309,6 +354,7 @@ impl ChannelStore { let channel_store = cx.entity(); self.open_channel_resource( channel_id, + "notes", |this| &mut this.opened_buffers, async move |channel, cx| { ChannelBuffer::new(channel, client, user_store, channel_store, cx).await @@ -439,6 +485,7 @@ impl ChannelStore { let this = cx.entity(); self.open_channel_resource( channel_id, + "chat", |this| &mut this.opened_chats, async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await, cx, @@ -453,6 +500,7 @@ impl ChannelStore { fn open_channel_resource( &mut self, channel_id: ChannelId, + resource_name: &'static str, get_map: fn(&mut Self) -> &mut HashMap>, load: F, cx: &mut Context, @@ -462,58 +510,56 @@ impl ChannelStore { T: 'static, { let task = loop { - match get_map(self).entry(channel_id) { - hash_map::Entry::Occupied(e) => match e.get() { - OpenEntityHandle::Open(entity) => { - if let Some(entity) = entity.upgrade() { - break Task::ready(Ok(entity)).shared(); - } else { - get_map(self).remove(&channel_id); - continue; - } + match get_map(self).get(&channel_id) { + Some(OpenEntityHandle::Open(entity)) => { + if let Some(entity) = entity.upgrade() { + break Task::ready(Ok(entity)).shared(); + } else { + get_map(self).remove(&channel_id); + continue; } - OpenEntityHandle::Loading(task) => { - break task.clone(); - } - }, - hash_map::Entry::Vacant(e) => { - let task = cx - .spawn(async move |this, cx| { - let channel = this.read_with(cx, |this, _| { - this.channel_for_id(channel_id).cloned().ok_or_else(|| { - Arc::new(anyhow!("no channel for id: {channel_id}")) - }) - })??; - - load(channel, cx).await.map_err(Arc::new) - }) - .shared(); - - e.insert(OpenEntityHandle::Loading(task.clone())); - cx.spawn({ - let task = task.clone(); - async move |this, cx| { - let result = task.await; - this.update(cx, |this, _| match result { - Ok(model) => { - get_map(this).insert( - channel_id, - OpenEntityHandle::Open(model.downgrade()), - ); - } - Err(_) => { - get_map(this).remove(&channel_id); - } - }) - .ok(); - } - }) - .detach(); - break task; } + Some(OpenEntityHandle::Loading(task)) => break task.clone(), + None => {} } + + let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx); + let task = cx + .spawn(async move |this, cx| { + channels_ready.await?; + let channel = this.read_with(cx, |this, _| { + this.channel_for_id(channel_id) + .cloned() + .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}"))) + })??; + + load(channel, cx).await.map_err(Arc::new) + }) + .shared(); + + get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone())); + let task = cx.spawn({ + async move |this, cx| { + let result = task.await; + this.update(cx, |this, _| match &result { + Ok(model) => { + get_map(this) + .insert(channel_id, OpenEntityHandle::Open(model.downgrade())); + } + Err(_) => { + get_map(this).remove(&channel_id); + } + })?; + result + } + }); + break task.shared(); }; - cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{error}")) }) + cx.background_spawn(async move { + task.await.map_err(|error| { + anyhow!("{error}").context(format!("failed to open channel {resource_name}")) + }) + }) } pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool { @@ -1147,6 +1193,8 @@ impl ChannelStore { .or_default() .update_latest_message_id(latest_channel_message.message_id); } + + self.channels_loaded.0.try_send(true).log_err(); } cx.notify();