diff --git a/gpui/src/executor.rs b/gpui/src/executor.rs index b135f5034d..9c7681e19e 100644 --- a/gpui/src/executor.rs +++ b/gpui/src/executor.rs @@ -3,8 +3,9 @@ use async_task::Runnable; pub use async_task::Task; use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString}; use parking_lot::Mutex; +use postage::{barrier, prelude::Stream as _}; use rand::prelude::*; -use smol::{channel, prelude::*, Executor}; +use smol::{channel, prelude::*, Executor, Timer}; use std::{ fmt::{self, Debug}, marker::PhantomData, @@ -18,7 +19,7 @@ use std::{ }, task::{Context, Poll}, thread, - time::Duration, + time::{Duration, Instant}, }; use waker_fn::waker_fn; @@ -49,6 +50,8 @@ struct DeterministicState { spawned_from_foreground: Vec<(Runnable, Backtrace)>, forbid_parking: bool, block_on_ticks: RangeInclusive, + now: Instant, + pending_timers: Vec<(Instant, barrier::Sender)>, } pub struct Deterministic { @@ -67,6 +70,8 @@ impl Deterministic { spawned_from_foreground: Default::default(), forbid_parking: false, block_on_ticks: 0..=1000, + now: Instant::now(), + pending_timers: Default::default(), })), parker: Default::default(), } @@ -119,17 +124,39 @@ impl Deterministic { T: 'static, F: Future + 'static, { - smol::pin!(future); - - let unparker = self.parker.lock().unparker(); let woken = Arc::new(AtomicBool::new(false)); - let waker = { - let woken = woken.clone(); - waker_fn(move || { - woken.store(true, SeqCst); - unparker.unpark(); - }) - }; + let mut future = Box::pin(future); + loop { + if let Some(result) = self.run_internal(woken.clone(), &mut future) { + return result; + } + + if !woken.load(SeqCst) && self.state.lock().forbid_parking { + panic!("deterministic executor parked after a call to forbid_parking"); + } + + woken.store(false, SeqCst); + self.parker.lock().park(); + } + } + + fn run_until_parked(&self) { + let woken = Arc::new(AtomicBool::new(false)); + let future = std::future::pending::<()>(); + smol::pin!(future); + self.run_internal(woken, future); + } + + pub fn run_internal(&self, woken: Arc, mut future: F) -> Option + where + T: 'static, + F: Future + Unpin, + { + let unparker = self.parker.lock().unparker(); + let waker = waker_fn(move || { + woken.store(true, SeqCst); + unparker.unpark(); + }); let mut cx = Context::from_waker(&waker); let mut trace = Trace::default(); @@ -163,23 +190,17 @@ impl Deterministic { runnable.run(); } else { drop(state); - if let Poll::Ready(result) = future.as_mut().poll(&mut cx) { - return result; + if let Poll::Ready(result) = future.poll(&mut cx) { + return Some(result); } + let state = self.state.lock(); if state.scheduled_from_foreground.is_empty() && state.scheduled_from_background.is_empty() && state.spawned_from_foreground.is_empty() { - if state.forbid_parking && !woken.load(SeqCst) { - panic!("deterministic executor parked after a call to forbid_parking"); - } - drop(state); - woken.store(false, SeqCst); - self.parker.lock().park(); + return None; } - - continue; } } } @@ -407,6 +428,41 @@ impl Foreground { } } + pub async fn timer(&self, duration: Duration) { + match self { + Self::Deterministic(executor) => { + let (tx, mut rx) = barrier::channel(); + { + let mut state = executor.state.lock(); + let wakeup_at = state.now + duration; + state.pending_timers.push((wakeup_at, tx)); + } + rx.recv().await; + } + _ => { + Timer::after(duration).await; + } + } + } + + pub fn advance_clock(&self, duration: Duration) { + match self { + Self::Deterministic(executor) => { + executor.run_until_parked(); + + let mut state = executor.state.lock(); + state.now += duration; + let now = state.now; + let mut pending_timers = mem::take(&mut state.pending_timers); + drop(state); + + pending_timers.retain(|(wakeup, _)| *wakeup > now); + executor.state.lock().pending_timers.extend(pending_timers); + } + _ => panic!("this method can only be called on a deterministic executor"), + } + } + pub fn set_block_on_ticks(&self, range: RangeInclusive) { match self { Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range, diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 34f5f378d9..2bd0eac625 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -5,10 +5,7 @@ use super::{ }; use anyhow::anyhow; use async_std::{sync::RwLock, task}; -use async_tungstenite::{ - tungstenite::{protocol::Role, Error as WebSocketError, Message as WebSocketMessage}, - WebSocketStream, -}; +use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; use futures::{future::BoxFuture, FutureExt}; use postage::{mpsc, prelude::Sink as _, prelude::Stream as _}; use sha1::{Digest as _, Sha1}; @@ -30,7 +27,7 @@ use time::OffsetDateTime; use zrpc::{ auth::random_token, proto::{self, AnyTypedEnvelope, EnvelopedMessage}, - ConnectionId, Peer, TypedEnvelope, + Conn, ConnectionId, Peer, TypedEnvelope, }; type ReplicaId = u16; @@ -95,6 +92,7 @@ impl Server { }; server + .add_handler(Server::ping) .add_handler(Server::share_worktree) .add_handler(Server::join_worktree) .add_handler(Server::update_worktree) @@ -133,19 +131,12 @@ impl Server { self } - pub fn handle_connection( + pub fn handle_connection( self: &Arc, connection: Conn, addr: String, user_id: UserId, - ) -> impl Future - where - Conn: 'static - + futures::Sink - + futures::Stream> - + Send - + Unpin, - { + ) -> impl Future { let this = self.clone(); async move { let (connection_id, handle_io, mut incoming_rx) = @@ -254,6 +245,11 @@ impl Server { worktree_ids } + async fn ping(self: Arc, request: TypedEnvelope) -> tide::Result<()> { + self.peer.respond(request.receipt(), proto::Ack {}).await?; + Ok(()) + } + async fn share_worktree( self: Arc, mut request: TypedEnvelope, @@ -503,7 +499,9 @@ impl Server { request: TypedEnvelope, ) -> tide::Result<()> { self.broadcast_in_worktree(request.payload.worktree_id, &request) - .await + .await?; + self.peer.respond(request.receipt(), proto::Ack {}).await?; + Ok(()) } async fn buffer_saved( @@ -974,8 +972,7 @@ pub fn add_routes(app: &mut tide::Server>, rpc: &Arc) { let user_id = user_id.ok_or_else(|| anyhow!("user_id is not present on request. ensure auth::VerifyToken middleware is present"))?; task::spawn(async move { if let Some(stream) = upgrade_receiver.await { - let stream = WebSocketStream::from_raw_socket(stream, Role::Server, None).await; - server.handle_connection(stream, addr, user_id).await; + server.handle_connection(Conn::new(WebSocketStream::from_raw_socket(stream, Role::Server, None).await), addr, user_id).await; } }); @@ -1009,17 +1006,25 @@ mod tests { }; use async_std::{sync::RwLockReadGuard, task}; use gpui::TestAppContext; - use postage::mpsc; + use parking_lot::Mutex; + use postage::{mpsc, watch}; use serde_json::json; use sqlx::types::time::OffsetDateTime; - use std::{path::Path, sync::Arc, time::Duration}; + use std::{ + path::Path, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, + time::Duration, + }; use zed::{ channel::{Channel, ChannelDetails, ChannelList}, editor::{Editor, Insert}, fs::{FakeFs, Fs as _}, language::LanguageRegistry, - rpc::Client, - settings, test, + rpc::{self, Client}, + settings, user::UserStore, worktree::Worktree, }; @@ -1469,7 +1474,7 @@ mod tests { .await; // Drop client B's connection and ensure client A observes client B leaving the worktree. - client_b.disconnect().await.unwrap(); + client_b.disconnect(&cx_b.to_async()).await.unwrap(); worktree_a .condition(&cx_a, |tree, _| tree.peers().len() == 0) .await; @@ -1675,11 +1680,206 @@ mod tests { ); } + #[gpui::test] + async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { + cx_a.foreground().forbid_parking(); + + // Connect to a server as 2 clients. + let mut server = TestServer::start().await; + let (user_id_a, client_a) = server.create_client(&mut cx_a, "user_a").await; + let (user_id_b, client_b) = server.create_client(&mut cx_b, "user_b").await; + let mut status_b = client_b.status(); + + // Create an org that includes these 2 users. + let db = &server.app_state.db; + let org_id = db.create_org("Test Org", "test-org").await.unwrap(); + db.add_org_member(org_id, user_id_a, false).await.unwrap(); + db.add_org_member(org_id, user_id_b, false).await.unwrap(); + + // Create a channel that includes all the users. + let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap(); + db.add_channel_member(channel_id, user_id_a, false) + .await + .unwrap(); + db.add_channel_member(channel_id, user_id_b, false) + .await + .unwrap(); + db.create_channel_message( + channel_id, + user_id_b, + "hello A, it's B.", + OffsetDateTime::now_utc(), + ) + .await + .unwrap(); + + let user_store_a = Arc::new(UserStore::new(client_a.clone())); + let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx)); + channels_a + .condition(&mut cx_a, |list, _| list.available_channels().is_some()) + .await; + + channels_a.read_with(&cx_a, |list, _| { + assert_eq!( + list.available_channels().unwrap(), + &[ChannelDetails { + id: channel_id.to_proto(), + name: "test-channel".to_string() + }] + ) + }); + let channel_a = channels_a.update(&mut cx_a, |this, cx| { + this.get_channel(channel_id.to_proto(), cx).unwrap() + }); + channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty())); + channel_a + .condition(&cx_a, |channel, _| { + channel_messages(channel) + == [("user_b".to_string(), "hello A, it's B.".to_string())] + }) + .await; + + let user_store_b = Arc::new(UserStore::new(client_b.clone())); + let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b, client_b, cx)); + channels_b + .condition(&mut cx_b, |list, _| list.available_channels().is_some()) + .await; + channels_b.read_with(&cx_b, |list, _| { + assert_eq!( + list.available_channels().unwrap(), + &[ChannelDetails { + id: channel_id.to_proto(), + name: "test-channel".to_string() + }] + ) + }); + + let channel_b = channels_b.update(&mut cx_b, |this, cx| { + this.get_channel(channel_id.to_proto(), cx).unwrap() + }); + channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty())); + channel_b + .condition(&cx_b, |channel, _| { + channel_messages(channel) + == [("user_b".to_string(), "hello A, it's B.".to_string())] + }) + .await; + + // Disconnect client B, ensuring we can still access its cached channel data. + server.forbid_connections(); + server.disconnect_client(user_id_b); + while !matches!( + status_b.recv().await, + Some(rpc::Status::ReconnectionError { .. }) + ) {} + + channels_b.read_with(&cx_b, |channels, _| { + assert_eq!( + channels.available_channels().unwrap(), + [ChannelDetails { + id: channel_id.to_proto(), + name: "test-channel".to_string() + }] + ) + }); + channel_b.read_with(&cx_b, |channel, _| { + assert_eq!( + channel_messages(channel), + [("user_b".to_string(), "hello A, it's B.".to_string())] + ) + }); + + // Send a message from client A while B is disconnected. + channel_a + .update(&mut cx_a, |channel, cx| { + channel + .send_message("oh, hi B.".to_string(), cx) + .unwrap() + .detach(); + let task = channel.send_message("sup".to_string(), cx).unwrap(); + assert_eq!( + channel + .pending_messages() + .iter() + .map(|m| &m.body) + .collect::>(), + &["oh, hi B.", "sup"] + ); + task + }) + .await + .unwrap(); + + // Give client B a chance to reconnect. + server.allow_connections(); + cx_b.foreground().advance_clock(Duration::from_secs(10)); + + // Verify that B sees the new messages upon reconnection. + channel_b + .condition(&cx_b, |channel, _| { + channel_messages(channel) + == [ + ("user_b".to_string(), "hello A, it's B.".to_string()), + ("user_a".to_string(), "oh, hi B.".to_string()), + ("user_a".to_string(), "sup".to_string()), + ] + }) + .await; + + // Ensure client A and B can communicate normally after reconnection. + channel_a + .update(&mut cx_a, |channel, cx| { + channel.send_message("you online?".to_string(), cx).unwrap() + }) + .await + .unwrap(); + channel_b + .condition(&cx_b, |channel, _| { + channel_messages(channel) + == [ + ("user_b".to_string(), "hello A, it's B.".to_string()), + ("user_a".to_string(), "oh, hi B.".to_string()), + ("user_a".to_string(), "sup".to_string()), + ("user_a".to_string(), "you online?".to_string()), + ] + }) + .await; + + channel_b + .update(&mut cx_b, |channel, cx| { + channel.send_message("yep".to_string(), cx).unwrap() + }) + .await + .unwrap(); + channel_a + .condition(&cx_a, |channel, _| { + channel_messages(channel) + == [ + ("user_b".to_string(), "hello A, it's B.".to_string()), + ("user_a".to_string(), "oh, hi B.".to_string()), + ("user_a".to_string(), "sup".to_string()), + ("user_a".to_string(), "you online?".to_string()), + ("user_b".to_string(), "yep".to_string()), + ] + }) + .await; + + fn channel_messages(channel: &Channel) -> Vec<(String, String)> { + channel + .messages() + .cursor::<(), ()>() + .map(|m| (m.sender.github_login.clone(), m.body.clone())) + .collect() + } + } + struct TestServer { peer: Arc, app_state: Arc, server: Arc, notifications: mpsc::Receiver<()>, + connection_killers: Arc>>>>, + forbid_connections: Arc, _test_db: TestDb, } @@ -1695,6 +1895,8 @@ mod tests { app_state, server, notifications: notifications.1, + connection_killers: Default::default(), + forbid_connections: Default::default(), _test_db: test_db, } } @@ -1704,20 +1906,67 @@ mod tests { cx: &mut TestAppContext, name: &str, ) -> (UserId, Arc) { - let user_id = self.app_state.db.create_user(name, false).await.unwrap(); - let client = Client::new(); - let (client_conn, server_conn) = test::Channel::bidirectional(); - cx.background() - .spawn( - self.server - .handle_connection(server_conn, name.to_string(), user_id), - ) - .detach(); + let client_user_id = self.app_state.db.create_user(name, false).await.unwrap(); + let client_name = name.to_string(); + let mut client = Client::new(); + let server = self.server.clone(); + let connection_killers = self.connection_killers.clone(); + let forbid_connections = self.forbid_connections.clone(); + Arc::get_mut(&mut client) + .unwrap() + .set_login_and_connect_callbacks( + move |cx| { + cx.spawn(|_| async move { + let access_token = "the-token".to_string(); + Ok((client_user_id.0 as u64, access_token)) + }) + }, + move |user_id, access_token, cx| { + assert_eq!(user_id, client_user_id.0 as u64); + assert_eq!(access_token, "the-token"); + + let server = server.clone(); + let connection_killers = connection_killers.clone(); + let forbid_connections = forbid_connections.clone(); + let client_name = client_name.clone(); + cx.spawn(move |cx| async move { + if forbid_connections.load(SeqCst) { + Err(anyhow!("server is forbidding connections")) + } else { + let (client_conn, server_conn, kill_conn) = Conn::in_memory(); + connection_killers.lock().insert(client_user_id, kill_conn); + cx.background() + .spawn(server.handle_connection( + server_conn, + client_name, + client_user_id, + )) + .detach(); + Ok(client_conn) + } + }) + }, + ); + client - .add_connection(user_id.to_proto(), client_conn, &cx.to_async()) + .authenticate_and_connect(&cx.to_async()) .await .unwrap(); - (user_id, client) + (client_user_id, client) + } + + fn disconnect_client(&self, user_id: UserId) { + if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) { + let _ = kill_conn.try_send(Some(())); + } + } + + fn forbid_connections(&self) { + self.forbid_connections.store(true, SeqCst); + } + + fn allow_connections(&self) { + self.forbid_connections.store(false, SeqCst); } async fn build_app_state(test_db: &TestDb) -> Arc { diff --git a/zed/src/channel.rs b/zed/src/channel.rs index 24997d4964..aa182c0540 100644 --- a/zed/src/channel.rs +++ b/zed/src/channel.rs @@ -11,6 +11,7 @@ use gpui::{ use postage::prelude::Stream; use std::{ collections::{HashMap, HashSet}, + mem, ops::Range, sync::Arc, }; @@ -71,7 +72,7 @@ pub enum ChannelListEvent {} #[derive(Clone, Debug, PartialEq)] pub enum ChannelEvent { - MessagesAdded { + MessagesUpdated { old_range: Range, new_count: usize, }, @@ -87,36 +88,47 @@ impl ChannelList { rpc: Arc, cx: &mut ModelContext, ) -> Self { - let _task = cx.spawn(|this, mut cx| { + let _task = cx.spawn_weak(|this, mut cx| { let rpc = rpc.clone(); async move { - let mut user_id = rpc.user_id(); - loop { - let available_channels = if user_id.recv().await.unwrap().is_some() { - Some( - rpc.request(proto::GetChannels {}) + let mut status = rpc.status(); + while let Some((status, this)) = status.recv().await.zip(this.upgrade(&cx)) { + match status { + rpc::Status::Connected { .. } => { + let response = rpc + .request(proto::GetChannels {}) .await - .context("failed to fetch available channels")? - .channels - .into_iter() - .map(Into::into) - .collect(), - ) - } else { - None - }; + .context("failed to fetch available channels")?; + this.update(&mut cx, |this, cx| { + this.available_channels = + Some(response.channels.into_iter().map(Into::into).collect()); - this.update(&mut cx, |this, cx| { - if available_channels.is_none() { - if this.available_channels.is_none() { - return; - } - this.channels.clear(); + let mut to_remove = Vec::new(); + for (channel_id, channel) in &this.channels { + if let Some(channel) = channel.upgrade(cx) { + channel.update(cx, |channel, cx| channel.rejoin(cx)) + } else { + to_remove.push(*channel_id); + } + } + + for channel_id in to_remove { + this.channels.remove(&channel_id); + } + cx.notify(); + }); } - this.available_channels = available_channels; - cx.notify(); - }); + rpc::Status::Disconnected { .. } => { + this.update(&mut cx, |this, cx| { + this.available_channels = None; + this.channels.clear(); + cx.notify(); + }); + } + _ => {} + } } + Ok(()) } .log_err() }); @@ -285,6 +297,43 @@ impl Channel { false } + pub fn rejoin(&mut self, cx: &mut ModelContext) { + let user_store = self.user_store.clone(); + let rpc = self.rpc.clone(); + let channel_id = self.details.id; + cx.spawn(|channel, mut cx| { + async move { + let response = rpc.request(proto::JoinChannel { channel_id }).await?; + let messages = messages_from_proto(response.messages, &user_store).await?; + let loaded_all_messages = response.done; + + channel.update(&mut cx, |channel, cx| { + if let Some((first_new_message, last_old_message)) = + messages.first().zip(channel.messages.last()) + { + if first_new_message.id > last_old_message.id { + let old_messages = mem::take(&mut channel.messages); + cx.emit(ChannelEvent::MessagesUpdated { + old_range: 0..old_messages.summary().count, + new_count: 0, + }); + channel.loaded_all_messages = loaded_all_messages; + } + } + + channel.insert_messages(messages, cx); + if loaded_all_messages { + channel.loaded_all_messages = loaded_all_messages; + } + }); + + Ok(()) + } + .log_err() + }) + .detach(); + } + pub fn message_count(&self) -> usize { self.messages.summary().count } @@ -350,7 +399,7 @@ impl Channel { drop(old_cursor); self.messages = new_messages; - cx.emit(ChannelEvent::MessagesAdded { + cx.emit(ChannelEvent::MessagesUpdated { old_range: start_ix..end_ix, new_count, }); @@ -446,22 +495,21 @@ impl<'a> sum_tree::SeekDimension<'a, ChannelMessageSummary> for Count { #[cfg(test)] mod tests { use super::*; + use crate::test::FakeServer; use gpui::TestAppContext; - use postage::mpsc::Receiver; - use zrpc::{test::Channel, ConnectionId, Peer, Receipt}; #[gpui::test] async fn test_channel_messages(mut cx: TestAppContext) { let user_id = 5; - let client = Client::new(); - let mut server = FakeServer::for_client(user_id, &client, &cx).await; + let mut client = Client::new(); + let server = FakeServer::for_client(user_id, &mut client, &cx).await; let user_store = Arc::new(UserStore::new(client.clone())); let channel_list = cx.add_model(|cx| ChannelList::new(user_store, client.clone(), cx)); channel_list.read_with(&cx, |list, _| assert_eq!(list.available_channels(), None)); // Get the available channels. - let get_channels = server.receive::().await; + let get_channels = server.receive::().await.unwrap(); server .respond( get_channels.receipt(), @@ -492,7 +540,7 @@ mod tests { }) .unwrap(); channel.read_with(&cx, |channel, _| assert!(channel.messages().is_empty())); - let join_channel = server.receive::().await; + let join_channel = server.receive::().await.unwrap(); server .respond( join_channel.receipt(), @@ -517,7 +565,7 @@ mod tests { .await; // Client requests all users for the received messages - let mut get_users = server.receive::().await; + let mut get_users = server.receive::().await.unwrap(); get_users.payload.user_ids.sort(); assert_eq!(get_users.payload.user_ids, vec![5, 6]); server @@ -542,7 +590,7 @@ mod tests { assert_eq!( channel.next_event(&cx).await, - ChannelEvent::MessagesAdded { + ChannelEvent::MessagesUpdated { old_range: 0..0, new_count: 2, } @@ -574,7 +622,7 @@ mod tests { .await; // Client requests user for message since they haven't seen them yet - let get_users = server.receive::().await; + let get_users = server.receive::().await.unwrap(); assert_eq!(get_users.payload.user_ids, vec![7]); server .respond( @@ -591,7 +639,7 @@ mod tests { assert_eq!( channel.next_event(&cx).await, - ChannelEvent::MessagesAdded { + ChannelEvent::MessagesUpdated { old_range: 2..2, new_count: 1, } @@ -610,7 +658,7 @@ mod tests { channel.update(&mut cx, |channel, cx| { assert!(channel.load_more_messages(cx)); }); - let get_messages = server.receive::().await; + let get_messages = server.receive::().await.unwrap(); assert_eq!(get_messages.payload.channel_id, 5); assert_eq!(get_messages.payload.before_message_id, 10); server @@ -638,7 +686,7 @@ mod tests { assert_eq!( channel.next_event(&cx).await, - ChannelEvent::MessagesAdded { + ChannelEvent::MessagesUpdated { old_range: 0..0, new_count: 2, } @@ -656,53 +704,4 @@ mod tests { ); }); } - - struct FakeServer { - peer: Arc, - incoming: Receiver>, - connection_id: ConnectionId, - } - - impl FakeServer { - async fn for_client(user_id: u64, client: &Arc, cx: &TestAppContext) -> Self { - let (client_conn, server_conn) = Channel::bidirectional(); - let peer = Peer::new(); - let (connection_id, io, incoming) = peer.add_connection(server_conn).await; - cx.background().spawn(io).detach(); - - client - .add_connection(user_id, client_conn, &cx.to_async()) - .await - .unwrap(); - - Self { - peer, - incoming, - connection_id, - } - } - - async fn send(&self, message: T) { - self.peer.send(self.connection_id, message).await.unwrap(); - } - - async fn receive(&mut self) -> TypedEnvelope { - *self - .incoming - .recv() - .await - .unwrap() - .into_any() - .downcast::>() - .unwrap() - } - - async fn respond( - &self, - receipt: Receipt, - response: T::Response, - ) { - self.peer.respond(receipt, response).await.unwrap() - } - } } diff --git a/zed/src/chat_panel.rs b/zed/src/chat_panel.rs index 18b737b2d8..200a35fcca 100644 --- a/zed/src/chat_panel.rs +++ b/zed/src/chat_panel.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::{ channel::{Channel, ChannelEvent, ChannelList, ChannelMessage}, editor::Editor, - rpc::Client, + rpc::{self, Client}, theme, util::{ResultExt, TryFutureExt}, Settings, @@ -14,10 +14,10 @@ use gpui::{ keymap::Binding, platform::CursorStyle, views::{ItemType, Select, SelectStyle}, - AppContext, Entity, ModelHandle, MutableAppContext, RenderContext, Subscription, View, + AppContext, Entity, ModelHandle, MutableAppContext, RenderContext, Subscription, Task, View, ViewContext, ViewHandle, }; -use postage::watch; +use postage::{prelude::Stream, watch}; use time::{OffsetDateTime, UtcOffset}; const MESSAGE_LOADING_THRESHOLD: usize = 50; @@ -31,6 +31,7 @@ pub struct ChatPanel { channel_select: ViewHandle