diff --git a/Cargo.lock b/Cargo.lock
index d5d0493936..6d5fdc67d7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1347,6 +1347,43 @@ dependencies = [
"uuid 1.4.1",
]
+[[package]]
+name = "channel2"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "client2",
+ "clock",
+ "collections",
+ "db2",
+ "feature_flags2",
+ "futures 0.3.28",
+ "gpui2",
+ "image",
+ "language2",
+ "lazy_static",
+ "log",
+ "parking_lot 0.11.2",
+ "postage",
+ "rand 0.8.5",
+ "rpc2",
+ "schemars",
+ "serde",
+ "serde_derive",
+ "settings2",
+ "smallvec",
+ "smol",
+ "sum_tree",
+ "tempfile",
+ "text",
+ "thiserror",
+ "time",
+ "tiny_http",
+ "url",
+ "util",
+ "uuid 1.4.1",
+]
+
[[package]]
name = "chrono"
version = "0.4.31"
@@ -2422,8 +2459,10 @@ dependencies = [
"client",
"collections",
"editor",
+ "futures 0.3.28",
"gpui",
"language",
+ "log",
"lsp",
"postage",
"project",
@@ -9690,7 +9729,7 @@ dependencies = [
[[package]]
name = "tree-sitter-vue"
version = "0.0.1"
-source = "git+https://github.com/zed-industries/tree-sitter-vue?rev=95b2890#95b28908d90e928c308866f7631e73ef6e1d4b5f"
+source = "git+https://github.com/zed-industries/tree-sitter-vue?rev=9b6cb221ccb8d0b956fcb17e9a1efac2feefeb58#9b6cb221ccb8d0b956fcb17e9a1efac2feefeb58"
dependencies = [
"cc",
"tree-sitter",
diff --git a/Cargo.toml b/Cargo.toml
index 998ea081a6..6245889530 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,7 @@ members = [
"crates/call",
"crates/call2",
"crates/channel",
+ "crates/channel2",
"crates/cli",
"crates/client",
"crates/client2",
@@ -175,7 +176,7 @@ tree-sitter-yaml = { git = "https://github.com/zed-industries/tree-sitter-yaml",
tree-sitter-lua = "0.0.14"
tree-sitter-nix = { git = "https://github.com/nix-community/tree-sitter-nix", rev = "66e3e9ce9180ae08fc57372061006ef83f0abde7" }
tree-sitter-nu = { git = "https://github.com/nushell/tree-sitter-nu", rev = "786689b0562b9799ce53e824cb45a1a2a04dc673"}
-tree-sitter-vue = {git = "https://github.com/zed-industries/tree-sitter-vue", rev = "95b2890"}
+tree-sitter-vue = {git = "https://github.com/zed-industries/tree-sitter-vue", rev = "9b6cb221ccb8d0b956fcb17e9a1efac2feefeb58"}
[patch.crates-io]
tree-sitter = { git = "https://github.com/tree-sitter/tree-sitter", rev = "35a6052fbcafc5e5fc0f9415b8652be7dcaf7222" }
async-task = { git = "https://github.com/zed-industries/async-task", rev = "341b57d6de98cdfd7b418567b8de2022ca993a6e" }
diff --git a/assets/icons/dash.svg b/assets/icons/dash.svg
new file mode 100644
index 0000000000..efff9eab5e
--- /dev/null
+++ b/assets/icons/dash.svg
@@ -0,0 +1 @@
+
diff --git a/crates/channel2/Cargo.toml b/crates/channel2/Cargo.toml
new file mode 100644
index 0000000000..c292d4e8dd
--- /dev/null
+++ b/crates/channel2/Cargo.toml
@@ -0,0 +1,54 @@
+[package]
+name = "channel2"
+version = "0.1.0"
+edition = "2021"
+publish = false
+
+[lib]
+path = "src/channel2.rs"
+doctest = false
+
+[features]
+test-support = ["collections/test-support", "gpui/test-support", "rpc/test-support"]
+
+[dependencies]
+client = { package = "client2", path = "../client2" }
+collections = { path = "../collections" }
+db = { package = "db2", path = "../db2" }
+gpui = { package = "gpui2", path = "../gpui2" }
+util = { path = "../util" }
+rpc = { package = "rpc2", path = "../rpc2" }
+text = { path = "../text" }
+language = { package = "language2", path = "../language2" }
+settings = { package = "settings2", path = "../settings2" }
+feature_flags = { package = "feature_flags2", path = "../feature_flags2" }
+sum_tree = { path = "../sum_tree" }
+clock = { path = "../clock" }
+
+anyhow.workspace = true
+futures.workspace = true
+image = "0.23"
+lazy_static.workspace = true
+smallvec.workspace = true
+log.workspace = true
+parking_lot.workspace = true
+postage.workspace = true
+rand.workspace = true
+schemars.workspace = true
+smol.workspace = true
+thiserror.workspace = true
+time.workspace = true
+tiny_http = "0.8"
+uuid.workspace = true
+url = "2.2"
+serde.workspace = true
+serde_derive.workspace = true
+tempfile = "3"
+
+[dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
+gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] }
+rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }
+client = { package = "client2", path = "../client2", features = ["test-support"] }
+settings = { package = "settings2", path = "../settings2", features = ["test-support"] }
+util = { path = "../util", features = ["test-support"] }
diff --git a/crates/channel2/src/channel2.rs b/crates/channel2/src/channel2.rs
new file mode 100644
index 0000000000..f38ae4078a
--- /dev/null
+++ b/crates/channel2/src/channel2.rs
@@ -0,0 +1,23 @@
+mod channel_buffer;
+mod channel_chat;
+mod channel_store;
+
+use client::{Client, UserStore};
+use gpui::{AppContext, Model};
+use std::sync::Arc;
+
+pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
+pub use channel_chat::{
+ mentions_to_proto, ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId,
+ MessageParams,
+};
+pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore};
+
+#[cfg(test)]
+mod channel_store_tests;
+
+pub fn init(client: &Arc, user_store: Model, cx: &mut AppContext) {
+ channel_store::init(client, user_store, cx);
+ channel_buffer::init(client);
+ channel_chat::init(client);
+}
diff --git a/crates/channel2/src/channel_buffer.rs b/crates/channel2/src/channel_buffer.rs
new file mode 100644
index 0000000000..4c321a8fbc
--- /dev/null
+++ b/crates/channel2/src/channel_buffer.rs
@@ -0,0 +1,259 @@
+use crate::{Channel, ChannelId, ChannelStore};
+use anyhow::Result;
+use client::{Client, Collaborator, UserStore};
+use collections::HashMap;
+use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
+use language::proto::serialize_version;
+use rpc::{
+ proto::{self, PeerId},
+ TypedEnvelope,
+};
+use std::{sync::Arc, time::Duration};
+use util::ResultExt;
+
+pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
+
+pub(crate) fn init(client: &Arc) {
+ client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
+ client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
+}
+
+pub struct ChannelBuffer {
+ pub channel_id: ChannelId,
+ connected: bool,
+ collaborators: HashMap,
+ user_store: Model,
+ channel_store: Model,
+ buffer: Model,
+ buffer_epoch: u64,
+ client: Arc,
+ subscription: Option,
+ acknowledge_task: Option>>,
+}
+
+pub enum ChannelBufferEvent {
+ CollaboratorsChanged,
+ Disconnected,
+ BufferEdited,
+ ChannelChanged,
+}
+
+impl EventEmitter for ChannelBuffer {
+ type Event = ChannelBufferEvent;
+}
+
+impl ChannelBuffer {
+ pub(crate) async fn new(
+ channel: Arc,
+ client: Arc,
+ user_store: Model,
+ channel_store: Model,
+ mut cx: AsyncAppContext,
+ ) -> Result> {
+ let response = client
+ .request(proto::JoinChannelBuffer {
+ channel_id: channel.id,
+ })
+ .await?;
+
+ let base_text = response.base_text;
+ let operations = response
+ .operations
+ .into_iter()
+ .map(language::proto::deserialize_operation)
+ .collect::, _>>()?;
+
+ let buffer = cx.build_model(|_| {
+ language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
+ })?;
+ buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
+
+ let subscription = client.subscribe_to_entity(channel.id)?;
+
+ anyhow::Ok(cx.build_model(|cx| {
+ cx.subscribe(&buffer, Self::on_buffer_update).detach();
+ cx.on_release(Self::release).detach();
+ let mut this = Self {
+ buffer,
+ buffer_epoch: response.epoch,
+ client,
+ connected: true,
+ collaborators: Default::default(),
+ acknowledge_task: None,
+ channel_id: channel.id,
+ subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
+ user_store,
+ channel_store,
+ };
+ this.replace_collaborators(response.collaborators, cx);
+ this
+ })?)
+ }
+
+ fn release(&mut self, _: &mut AppContext) {
+ if self.connected {
+ if let Some(task) = self.acknowledge_task.take() {
+ task.detach();
+ }
+ self.client
+ .send(proto::LeaveChannelBuffer {
+ channel_id: self.channel_id,
+ })
+ .log_err();
+ }
+ }
+
+ pub fn remote_id(&self, cx: &AppContext) -> u64 {
+ self.buffer.read(cx).remote_id()
+ }
+
+ pub fn user_store(&self) -> &Model {
+ &self.user_store
+ }
+
+ pub(crate) fn replace_collaborators(
+ &mut self,
+ collaborators: Vec,
+ cx: &mut ModelContext,
+ ) {
+ let mut new_collaborators = HashMap::default();
+ for collaborator in collaborators {
+ if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
+ new_collaborators.insert(collaborator.peer_id, collaborator);
+ }
+ }
+
+ for (_, old_collaborator) in &self.collaborators {
+ if !new_collaborators.contains_key(&old_collaborator.peer_id) {
+ self.buffer.update(cx, |buffer, cx| {
+ buffer.remove_peer(old_collaborator.replica_id as u16, cx)
+ });
+ }
+ }
+ self.collaborators = new_collaborators;
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
+ cx.notify();
+ }
+
+ async fn handle_update_channel_buffer(
+ this: Model,
+ update_channel_buffer: TypedEnvelope,
+ _: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ let ops = update_channel_buffer
+ .payload
+ .operations
+ .into_iter()
+ .map(language::proto::deserialize_operation)
+ .collect::, _>>()?;
+
+ this.update(&mut cx, |this, cx| {
+ cx.notify();
+ this.buffer
+ .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
+ })??;
+
+ Ok(())
+ }
+
+ async fn handle_update_channel_buffer_collaborators(
+ this: Model,
+ message: TypedEnvelope,
+ _: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, cx| {
+ this.replace_collaborators(message.payload.collaborators, cx);
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
+ cx.notify();
+ })
+ }
+
+ fn on_buffer_update(
+ &mut self,
+ _: Model,
+ event: &language::Event,
+ cx: &mut ModelContext,
+ ) {
+ match event {
+ language::Event::Operation(operation) => {
+ let operation = language::proto::serialize_operation(operation);
+ self.client
+ .send(proto::UpdateChannelBuffer {
+ channel_id: self.channel_id,
+ operations: vec![operation],
+ })
+ .log_err();
+ }
+ language::Event::Edited => {
+ cx.emit(ChannelBufferEvent::BufferEdited);
+ }
+ _ => {}
+ }
+ }
+
+ pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
+ let buffer = self.buffer.read(cx);
+ let version = buffer.version();
+ let buffer_id = buffer.remote_id();
+ let client = self.client.clone();
+ let epoch = self.epoch();
+
+ self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
+ cx.background_executor()
+ .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
+ .await;
+ client
+ .send(proto::AckBufferOperation {
+ buffer_id,
+ epoch,
+ version: serialize_version(&version),
+ })
+ .ok();
+ Ok(())
+ }));
+ }
+
+ pub fn epoch(&self) -> u64 {
+ self.buffer_epoch
+ }
+
+ pub fn buffer(&self) -> Model {
+ self.buffer.clone()
+ }
+
+ pub fn collaborators(&self) -> &HashMap {
+ &self.collaborators
+ }
+
+ pub fn channel(&self, cx: &AppContext) -> Option> {
+ self.channel_store
+ .read(cx)
+ .channel_for_id(self.channel_id)
+ .cloned()
+ }
+
+ pub(crate) fn disconnect(&mut self, cx: &mut ModelContext) {
+ log::info!("channel buffer {} disconnected", self.channel_id);
+ if self.connected {
+ self.connected = false;
+ self.subscription.take();
+ cx.emit(ChannelBufferEvent::Disconnected);
+ cx.notify()
+ }
+ }
+
+ pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext) {
+ cx.emit(ChannelBufferEvent::ChannelChanged);
+ cx.notify()
+ }
+
+ pub fn is_connected(&self) -> bool {
+ self.connected
+ }
+
+ pub fn replica_id(&self, cx: &AppContext) -> u16 {
+ self.buffer.read(cx).replica_id()
+ }
+}
diff --git a/crates/channel2/src/channel_chat.rs b/crates/channel2/src/channel_chat.rs
new file mode 100644
index 0000000000..a5b5249853
--- /dev/null
+++ b/crates/channel2/src/channel_chat.rs
@@ -0,0 +1,647 @@
+use crate::{Channel, ChannelId, ChannelStore};
+use anyhow::{anyhow, Result};
+use client::{
+ proto,
+ user::{User, UserStore},
+ Client, Subscription, TypedEnvelope, UserId,
+};
+use futures::lock::Mutex;
+use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
+use rand::prelude::*;
+use std::{
+ collections::HashSet,
+ mem,
+ ops::{ControlFlow, Range},
+ sync::Arc,
+};
+use sum_tree::{Bias, SumTree};
+use time::OffsetDateTime;
+use util::{post_inc, ResultExt as _, TryFutureExt};
+
+pub struct ChannelChat {
+ pub channel_id: ChannelId,
+ messages: SumTree,
+ acknowledged_message_ids: HashSet,
+ channel_store: Model,
+ loaded_all_messages: bool,
+ last_acknowledged_id: Option,
+ next_pending_message_id: usize,
+ user_store: Model,
+ rpc: Arc,
+ outgoing_messages_lock: Arc>,
+ rng: StdRng,
+ _subscription: Subscription,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct MessageParams {
+ pub text: String,
+ pub mentions: Vec<(Range, UserId)>,
+}
+
+#[derive(Clone, Debug)]
+pub struct ChannelMessage {
+ pub id: ChannelMessageId,
+ pub body: String,
+ pub timestamp: OffsetDateTime,
+ pub sender: Arc,
+ pub nonce: u128,
+ pub mentions: Vec<(Range, UserId)>,
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum ChannelMessageId {
+ Saved(u64),
+ Pending(usize),
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ChannelMessageSummary {
+ max_id: ChannelMessageId,
+ count: usize,
+}
+
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct Count(usize);
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum ChannelChatEvent {
+ MessagesUpdated {
+ old_range: Range,
+ new_count: usize,
+ },
+ NewMessage {
+ channel_id: ChannelId,
+ message_id: u64,
+ },
+}
+
+impl EventEmitter for ChannelChat {
+ type Event = ChannelChatEvent;
+}
+pub fn init(client: &Arc) {
+ client.add_model_message_handler(ChannelChat::handle_message_sent);
+ client.add_model_message_handler(ChannelChat::handle_message_removed);
+}
+
+impl ChannelChat {
+ pub async fn new(
+ channel: Arc,
+ channel_store: Model,
+ user_store: Model,
+ client: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result> {
+ let channel_id = channel.id;
+ let subscription = client.subscribe_to_entity(channel_id).unwrap();
+
+ let response = client
+ .request(proto::JoinChannelChat { channel_id })
+ .await?;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ let loaded_all_messages = response.done;
+
+ Ok(cx.build_model(|cx| {
+ cx.on_release(Self::release).detach();
+ let mut this = Self {
+ channel_id: channel.id,
+ user_store,
+ channel_store,
+ rpc: client,
+ outgoing_messages_lock: Default::default(),
+ messages: Default::default(),
+ acknowledged_message_ids: Default::default(),
+ loaded_all_messages,
+ next_pending_message_id: 0,
+ last_acknowledged_id: None,
+ rng: StdRng::from_entropy(),
+ _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
+ };
+ this.insert_messages(messages, cx);
+ this
+ })?)
+ }
+
+ fn release(&mut self, _: &mut AppContext) {
+ self.rpc
+ .send(proto::LeaveChannelChat {
+ channel_id: self.channel_id,
+ })
+ .log_err();
+ }
+
+ pub fn channel(&self, cx: &AppContext) -> Option> {
+ self.channel_store
+ .read(cx)
+ .channel_for_id(self.channel_id)
+ .cloned()
+ }
+
+ pub fn client(&self) -> &Arc {
+ &self.rpc
+ }
+
+ pub fn send_message(
+ &mut self,
+ message: MessageParams,
+ cx: &mut ModelContext,
+ ) -> Result>> {
+ if message.text.is_empty() {
+ Err(anyhow!("message body can't be empty"))?;
+ }
+
+ let current_user = self
+ .user_store
+ .read(cx)
+ .current_user()
+ .ok_or_else(|| anyhow!("current_user is not present"))?;
+
+ let channel_id = self.channel_id;
+ let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
+ let nonce = self.rng.gen();
+ self.insert_messages(
+ SumTree::from_item(
+ ChannelMessage {
+ id: pending_id,
+ body: message.text.clone(),
+ sender: current_user,
+ timestamp: OffsetDateTime::now_utc(),
+ mentions: message.mentions.clone(),
+ nonce,
+ },
+ &(),
+ ),
+ cx,
+ );
+ let user_store = self.user_store.clone();
+ let rpc = self.rpc.clone();
+ let outgoing_messages_lock = self.outgoing_messages_lock.clone();
+ Ok(cx.spawn(move |this, mut cx| async move {
+ let outgoing_message_guard = outgoing_messages_lock.lock().await;
+ let request = rpc.request(proto::SendChannelMessage {
+ channel_id,
+ body: message.text,
+ nonce: Some(nonce.into()),
+ mentions: mentions_to_proto(&message.mentions),
+ });
+ let response = request.await?;
+ drop(outgoing_message_guard);
+ let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
+ let id = response.id;
+ let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ })?;
+ Ok(id)
+ }))
+ }
+
+ pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext) -> Task> {
+ let response = self.rpc.request(proto::RemoveChannelMessage {
+ channel_id: self.channel_id,
+ message_id: id,
+ });
+ cx.spawn(move |this, mut cx| async move {
+ response.await?;
+ this.update(&mut cx, |this, cx| {
+ this.message_removed(id, cx);
+ })?;
+ Ok(())
+ })
+ }
+
+ pub fn load_more_messages(&mut self, cx: &mut ModelContext) -> Option>> {
+ if self.loaded_all_messages {
+ return None;
+ }
+
+ let rpc = self.rpc.clone();
+ let user_store = self.user_store.clone();
+ let channel_id = self.channel_id;
+ let before_message_id = self.first_loaded_message_id()?;
+ Some(cx.spawn(move |this, mut cx| {
+ async move {
+ let response = rpc
+ .request(proto::GetChannelMessages {
+ channel_id,
+ before_message_id,
+ })
+ .await?;
+ let loaded_all_messages = response.done;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.loaded_all_messages = loaded_all_messages;
+ this.insert_messages(messages, cx);
+ })?;
+ anyhow::Ok(())
+ }
+ .log_err()
+ }))
+ }
+
+ pub fn first_loaded_message_id(&mut self) -> Option {
+ self.messages.first().and_then(|message| match message.id {
+ ChannelMessageId::Saved(id) => Some(id),
+ ChannelMessageId::Pending(_) => None,
+ })
+ }
+
+ /// Load all of the chat messages since a certain message id.
+ ///
+ /// For now, we always maintain a suffix of the channel's messages.
+ pub async fn load_history_since_message(
+ chat: Model,
+ message_id: u64,
+ mut cx: AsyncAppContext,
+ ) -> Option {
+ loop {
+ let step = chat
+ .update(&mut cx, |chat, cx| {
+ if let Some(first_id) = chat.first_loaded_message_id() {
+ if first_id <= message_id {
+ let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>();
+ let message_id = ChannelMessageId::Saved(message_id);
+ cursor.seek(&message_id, Bias::Left, &());
+ return ControlFlow::Break(
+ if cursor
+ .item()
+ .map_or(false, |message| message.id == message_id)
+ {
+ Some(cursor.start().1 .0)
+ } else {
+ None
+ },
+ );
+ }
+ }
+ ControlFlow::Continue(chat.load_more_messages(cx))
+ })
+ .log_err()?;
+ match step {
+ ControlFlow::Break(ix) => return ix,
+ ControlFlow::Continue(task) => task?.await?,
+ }
+ }
+ }
+
+ pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext) {
+ if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
+ if self
+ .last_acknowledged_id
+ .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
+ {
+ self.rpc
+ .send(proto::AckChannelMessage {
+ channel_id: self.channel_id,
+ message_id: latest_message_id,
+ })
+ .ok();
+ self.last_acknowledged_id = Some(latest_message_id);
+ self.channel_store.update(cx, |store, cx| {
+ store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
+ });
+ }
+ }
+ }
+
+ pub fn rejoin(&mut self, cx: &mut ModelContext) {
+ let user_store = self.user_store.clone();
+ let rpc = self.rpc.clone();
+ let channel_id = self.channel_id;
+ cx.spawn(move |this, mut cx| {
+ async move {
+ let response = rpc.request(proto::JoinChannelChat { channel_id }).await?;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ let loaded_all_messages = response.done;
+
+ let pending_messages = this.update(&mut cx, |this, cx| {
+ if let Some((first_new_message, last_old_message)) =
+ messages.first().zip(this.messages.last())
+ {
+ if first_new_message.id > last_old_message.id {
+ let old_messages = mem::take(&mut this.messages);
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: 0..old_messages.summary().count,
+ new_count: 0,
+ });
+ this.loaded_all_messages = loaded_all_messages;
+ }
+ }
+
+ this.insert_messages(messages, cx);
+ if loaded_all_messages {
+ this.loaded_all_messages = loaded_all_messages;
+ }
+
+ this.pending_messages().cloned().collect::>()
+ })?;
+
+ for pending_message in pending_messages {
+ let request = rpc.request(proto::SendChannelMessage {
+ channel_id,
+ body: pending_message.body,
+ mentions: mentions_to_proto(&pending_message.mentions),
+ nonce: Some(pending_message.nonce.into()),
+ });
+ let response = request.await?;
+ let message = ChannelMessage::from_proto(
+ response.message.ok_or_else(|| anyhow!("invalid message"))?,
+ &user_store,
+ &mut cx,
+ )
+ .await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ })?;
+ }
+
+ anyhow::Ok(())
+ }
+ .log_err()
+ })
+ .detach();
+ }
+
+ pub fn message_count(&self) -> usize {
+ self.messages.summary().count
+ }
+
+ pub fn messages(&self) -> &SumTree {
+ &self.messages
+ }
+
+ pub fn message(&self, ix: usize) -> &ChannelMessage {
+ let mut cursor = self.messages.cursor::();
+ cursor.seek(&Count(ix), Bias::Right, &());
+ cursor.item().unwrap()
+ }
+
+ pub fn acknowledge_message(&mut self, id: u64) {
+ if self.acknowledged_message_ids.insert(id) {
+ self.rpc
+ .send(proto::AckChannelMessage {
+ channel_id: self.channel_id,
+ message_id: id,
+ })
+ .ok();
+ }
+ }
+
+ pub fn messages_in_range(&self, range: Range) -> impl Iterator- {
+ let mut cursor = self.messages.cursor::();
+ cursor.seek(&Count(range.start), Bias::Right, &());
+ cursor.take(range.len())
+ }
+
+ pub fn pending_messages(&self) -> impl Iterator
- {
+ let mut cursor = self.messages.cursor::();
+ cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
+ cursor
+ }
+
+ async fn handle_message_sent(
+ this: Model,
+ message: TypedEnvelope,
+ _: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
+ let message = message
+ .payload
+ .message
+ .ok_or_else(|| anyhow!("empty message"))?;
+ let message_id = message.id;
+
+ let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ cx.emit(ChannelChatEvent::NewMessage {
+ channel_id: this.channel_id,
+ message_id,
+ })
+ })?;
+
+ Ok(())
+ }
+
+ async fn handle_message_removed(
+ this: Model,
+ message: TypedEnvelope,
+ _: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, cx| {
+ this.message_removed(message.payload.message_id, cx)
+ })?;
+ Ok(())
+ }
+
+ fn insert_messages(&mut self, messages: SumTree, cx: &mut ModelContext) {
+ if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
+ let nonces = messages
+ .cursor::<()>()
+ .map(|m| m.nonce)
+ .collect::>();
+
+ let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
+ let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
+ let start_ix = old_cursor.start().1 .0;
+ let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
+ let removed_count = removed_messages.summary().count;
+ let new_count = messages.summary().count;
+ let end_ix = start_ix + removed_count;
+
+ new_messages.append(messages, &());
+
+ let mut ranges = Vec::>::new();
+ if new_messages.last().unwrap().is_pending() {
+ new_messages.append(old_cursor.suffix(&()), &());
+ } else {
+ new_messages.append(
+ old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
+ &(),
+ );
+
+ while let Some(message) = old_cursor.item() {
+ let message_ix = old_cursor.start().1 .0;
+ if nonces.contains(&message.nonce) {
+ if ranges.last().map_or(false, |r| r.end == message_ix) {
+ ranges.last_mut().unwrap().end += 1;
+ } else {
+ ranges.push(message_ix..message_ix + 1);
+ }
+ } else {
+ new_messages.push(message.clone(), &());
+ }
+ old_cursor.next(&());
+ }
+ }
+
+ drop(old_cursor);
+ self.messages = new_messages;
+
+ for range in ranges.into_iter().rev() {
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: range,
+ new_count: 0,
+ });
+ }
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: start_ix..end_ix,
+ new_count,
+ });
+
+ cx.notify();
+ }
+ }
+
+ fn message_removed(&mut self, id: u64, cx: &mut ModelContext) {
+ let mut cursor = self.messages.cursor::();
+ let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
+ if let Some(item) = cursor.item() {
+ if item.id == ChannelMessageId::Saved(id) {
+ let ix = messages.summary().count;
+ cursor.next(&());
+ messages.append(cursor.suffix(&()), &());
+ drop(cursor);
+ self.messages = messages;
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: ix..ix + 1,
+ new_count: 0,
+ });
+ }
+ }
+ }
+}
+
+async fn messages_from_proto(
+ proto_messages: Vec,
+ user_store: &Model,
+ cx: &mut AsyncAppContext,
+) -> Result> {
+ let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
+ let mut result = SumTree::new();
+ result.extend(messages, &());
+ Ok(result)
+}
+
+impl ChannelMessage {
+ pub async fn from_proto(
+ message: proto::ChannelMessage,
+ user_store: &Model,
+ cx: &mut AsyncAppContext,
+ ) -> Result {
+ let sender = user_store
+ .update(cx, |user_store, cx| {
+ user_store.get_user(message.sender_id, cx)
+ })?
+ .await?;
+ Ok(ChannelMessage {
+ id: ChannelMessageId::Saved(message.id),
+ body: message.body,
+ mentions: message
+ .mentions
+ .into_iter()
+ .filter_map(|mention| {
+ let range = mention.range?;
+ Some((range.start as usize..range.end as usize, mention.user_id))
+ })
+ .collect(),
+ timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
+ sender,
+ nonce: message
+ .nonce
+ .ok_or_else(|| anyhow!("nonce is required"))?
+ .into(),
+ })
+ }
+
+ pub fn is_pending(&self) -> bool {
+ matches!(self.id, ChannelMessageId::Pending(_))
+ }
+
+ pub async fn from_proto_vec(
+ proto_messages: Vec,
+ user_store: &Model,
+ cx: &mut AsyncAppContext,
+ ) -> Result> {
+ let unique_user_ids = proto_messages
+ .iter()
+ .map(|m| m.sender_id)
+ .collect::>()
+ .into_iter()
+ .collect();
+ user_store
+ .update(cx, |user_store, cx| {
+ user_store.get_users(unique_user_ids, cx)
+ })?
+ .await?;
+
+ let mut messages = Vec::with_capacity(proto_messages.len());
+ for message in proto_messages {
+ messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
+ }
+ Ok(messages)
+ }
+}
+
+pub fn mentions_to_proto(mentions: &[(Range, UserId)]) -> Vec {
+ mentions
+ .iter()
+ .map(|(range, user_id)| proto::ChatMention {
+ range: Some(proto::Range {
+ start: range.start as u64,
+ end: range.end as u64,
+ }),
+ user_id: *user_id as u64,
+ })
+ .collect()
+}
+
+impl sum_tree::Item for ChannelMessage {
+ type Summary = ChannelMessageSummary;
+
+ fn summary(&self) -> Self::Summary {
+ ChannelMessageSummary {
+ max_id: self.id,
+ count: 1,
+ }
+ }
+}
+
+impl Default for ChannelMessageId {
+ fn default() -> Self {
+ Self::Saved(0)
+ }
+}
+
+impl sum_tree::Summary for ChannelMessageSummary {
+ type Context = ();
+
+ fn add_summary(&mut self, summary: &Self, _: &()) {
+ self.max_id = summary.max_id;
+ self.count += summary.count;
+ }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
+ fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+ debug_assert!(summary.max_id > *self);
+ *self = summary.max_id;
+ }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
+ fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+ self.0 += summary.count;
+ }
+}
+
+impl<'a> From<&'a str> for MessageParams {
+ fn from(value: &'a str) -> Self {
+ Self {
+ text: value.into(),
+ mentions: Vec::new(),
+ }
+ }
+}
diff --git a/crates/channel2/src/channel_store.rs b/crates/channel2/src/channel_store.rs
new file mode 100644
index 0000000000..3c9abd59e2
--- /dev/null
+++ b/crates/channel2/src/channel_store.rs
@@ -0,0 +1,1021 @@
+mod channel_index;
+
+use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
+use anyhow::{anyhow, Result};
+use channel_index::ChannelIndex;
+use client::{Client, Subscription, User, UserId, UserStore};
+use collections::{hash_map, HashMap, HashSet};
+use db::RELEASE_CHANNEL;
+use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
+use gpui::{
+ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
+};
+use rpc::{
+ proto::{self, ChannelVisibility},
+ TypedEnvelope,
+};
+use std::{mem, sync::Arc, time::Duration};
+use util::{async_maybe, ResultExt};
+
+pub fn init(client: &Arc, user_store: Model, cx: &mut AppContext) {
+ let channel_store =
+ cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
+ cx.set_global(channel_store);
+}
+
+pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub type ChannelId = u64;
+
+pub struct ChannelStore {
+ pub channel_index: ChannelIndex,
+ channel_invitations: Vec>,
+ channel_participants: HashMap>>,
+ outgoing_invites: HashSet<(ChannelId, UserId)>,
+ update_channels_tx: mpsc::UnboundedSender,
+ opened_buffers: HashMap>,
+ opened_chats: HashMap>,
+ client: Arc,
+ user_store: Model,
+ _rpc_subscription: Subscription,
+ _watch_connection_status: Task