diff --git a/Cargo.lock b/Cargo.lock index ffdc736830..18950cdb82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1217,6 +1217,7 @@ version = "0.1.0" dependencies = [ "anyhow", "client", + "clock", "collections", "db", "feature_flags", @@ -1530,6 +1531,7 @@ dependencies = [ "tracing-subscriber", "unindent", "util", + "uuid 1.4.1", "workspace", ] diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs index a03eb1f1b5..a097cc5467 100644 --- a/crates/channel/src/channel_buffer.rs +++ b/crates/channel/src/channel_buffer.rs @@ -2,14 +2,17 @@ use crate::Channel; use anyhow::Result; use client::{Client, Collaborator, UserStore}; use collections::HashMap; -use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle}; +use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task}; +use language::proto::serialize_version; use rpc::{ proto::{self, PeerId}, TypedEnvelope, }; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use util::ResultExt; +const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250); + pub(crate) fn init(client: &Arc) { client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer); client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators); @@ -24,11 +27,13 @@ pub struct ChannelBuffer { buffer_epoch: u64, client: Arc, subscription: Option, + acknowledge_task: Option>>, } pub enum ChannelBufferEvent { CollaboratorsChanged, Disconnected, + BufferEdited, } impl Entity for ChannelBuffer { @@ -36,6 +41,9 @@ impl Entity for ChannelBuffer { fn release(&mut self, _: &mut AppContext) { if self.connected { + if let Some(task) = self.acknowledge_task.take() { + task.detach(); + } self.client .send(proto::LeaveChannelBuffer { channel_id: self.channel.id, @@ -81,6 +89,7 @@ impl ChannelBuffer { client, connected: true, collaborators: Default::default(), + acknowledge_task: None, channel, subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())), user_store, @@ -159,19 +168,45 @@ impl ChannelBuffer { &mut self, _: ModelHandle, event: &language::Event, - _: &mut ModelContext, + cx: &mut ModelContext, ) { - if let language::Event::Operation(operation) = event { - let operation = language::proto::serialize_operation(operation); - self.client - .send(proto::UpdateChannelBuffer { - channel_id: self.channel.id, - operations: vec![operation], - }) - .log_err(); + match event { + language::Event::Operation(operation) => { + let operation = language::proto::serialize_operation(operation); + self.client + .send(proto::UpdateChannelBuffer { + channel_id: self.channel.id, + operations: vec![operation], + }) + .log_err(); + } + language::Event::Edited => { + cx.emit(ChannelBufferEvent::BufferEdited); + } + _ => {} } } + pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) { + let buffer = self.buffer.read(cx); + let version = buffer.version(); + let buffer_id = buffer.remote_id(); + let client = self.client.clone(); + let epoch = self.epoch(); + + self.acknowledge_task = Some(cx.spawn_weak(|_, cx| async move { + cx.background().timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL).await; + client + .send(proto::AckBufferOperation { + buffer_id, + epoch, + version: serialize_version(&version), + }) + .ok(); + Ok(()) + })); + } + pub fn epoch(&self) -> u64 { self.buffer_epoch } diff --git a/crates/channel/src/channel_chat.rs b/crates/channel/src/channel_chat.rs index 8e03a3b6fd..5b32d67f63 100644 --- a/crates/channel/src/channel_chat.rs +++ b/crates/channel/src/channel_chat.rs @@ -1,4 +1,4 @@ -use crate::Channel; +use crate::{Channel, ChannelStore}; use anyhow::{anyhow, Result}; use client::{ proto, @@ -16,7 +16,9 @@ use util::{post_inc, ResultExt as _, TryFutureExt}; pub struct ChannelChat { channel: Arc, messages: SumTree, + channel_store: ModelHandle, loaded_all_messages: bool, + last_acknowledged_id: Option, next_pending_message_id: usize, user_store: ModelHandle, rpc: Arc, @@ -77,6 +79,7 @@ impl Entity for ChannelChat { impl ChannelChat { pub async fn new( channel: Arc, + channel_store: ModelHandle, user_store: ModelHandle, client: Arc, mut cx: AsyncAppContext, @@ -94,11 +97,13 @@ impl ChannelChat { let mut this = Self { channel, user_store, + channel_store, rpc: client, outgoing_messages_lock: Default::default(), messages: Default::default(), loaded_all_messages, next_pending_message_id: 0, + last_acknowledged_id: None, rng: StdRng::from_entropy(), _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()), }; @@ -219,6 +224,26 @@ impl ChannelChat { false } + pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext) { + if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id { + if self + .last_acknowledged_id + .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id) + { + self.rpc + .send(proto::AckChannelMessage { + channel_id: self.channel.id, + message_id: latest_message_id, + }) + .ok(); + self.last_acknowledged_id = Some(latest_message_id); + self.channel_store.update(cx, |store, cx| { + store.acknowledge_message_id(self.channel.id, latest_message_id, cx); + }); + } + } + } + pub fn rejoin(&mut self, cx: &mut ModelContext) { let user_store = self.user_store.clone(); let rpc = self.rpc.clone(); diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index f0f66f4839..db6d5f42c5 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -43,8 +43,8 @@ pub type ChannelData = (Channel, ChannelPath); pub struct Channel { pub id: ChannelId, pub name: String, - pub has_note_changed: bool, - pub has_new_messages: bool, + pub unseen_note_version: Option<(u64, clock::Global)>, + pub unseen_message_id: Option, } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] @@ -201,34 +201,60 @@ impl ChannelStore { ) -> Task>> { let client = self.client.clone(); let user_store = self.user_store.clone(); - let open_channel_buffer = self.open_channel_resource( + self.open_channel_resource( channel_id, |this| &mut this.opened_buffers, |channel, cx| ChannelBuffer::new(channel, client, user_store, cx), cx, - ); - cx.spawn(|this, mut cx| async move { - let buffer = open_channel_buffer.await?; - this.update(&mut cx, |this, cx| { - this.channel_index.clear_note_changed(channel_id); - cx.notify(); - }); - Ok(buffer) - }) + ) } pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option { self.channel_index .by_id() .get(&channel_id) - .map(|channel| channel.has_note_changed) + .map(|channel| channel.unseen_note_version.is_some()) } pub fn has_new_messages(&self, channel_id: ChannelId) -> Option { self.channel_index .by_id() .get(&channel_id) - .map(|channel| channel.has_new_messages) + .map(|channel| channel.unseen_message_id.is_some()) + } + + pub fn notes_changed( + &mut self, + channel_id: ChannelId, + epoch: u64, + version: &clock::Global, + cx: &mut ModelContext, + ) { + self.channel_index.note_changed(channel_id, epoch, version); + cx.notify(); + } + + pub fn acknowledge_message_id( + &mut self, + channel_id: ChannelId, + message_id: u64, + cx: &mut ModelContext, + ) { + self.channel_index + .acknowledge_message_id(channel_id, message_id); + cx.notify(); + } + + pub fn acknowledge_notes_version( + &mut self, + channel_id: ChannelId, + epoch: u64, + version: &clock::Global, + cx: &mut ModelContext, + ) { + self.channel_index + .acknowledge_note_version(channel_id, epoch, version); + cx.notify(); } pub fn open_channel_chat( @@ -238,20 +264,13 @@ impl ChannelStore { ) -> Task>> { let client = self.client.clone(); let user_store = self.user_store.clone(); - let open_channel_chat = self.open_channel_resource( + let this = cx.handle(); + self.open_channel_resource( channel_id, |this| &mut this.opened_chats, - |channel, cx| ChannelChat::new(channel, user_store, client, cx), + |channel, cx| ChannelChat::new(channel, this, user_store, client, cx), cx, - ); - cx.spawn(|this, mut cx| async move { - let chat = open_channel_chat.await?; - this.update(&mut cx, |this, cx| { - this.channel_index.clear_message_changed(channel_id); - cx.notify(); - }); - Ok(chat) - }) + ) } /// Asynchronously open a given resource associated with a channel. @@ -811,8 +830,8 @@ impl ChannelStore { Arc::new(Channel { id: channel.id, name: channel.name, - has_note_changed: false, - has_new_messages: false, + unseen_note_version: None, + unseen_message_id: None, }), ), } @@ -822,8 +841,8 @@ impl ChannelStore { || !payload.delete_channels.is_empty() || !payload.insert_edge.is_empty() || !payload.delete_edge.is_empty() - || !payload.notes_changed.is_empty() - || !payload.new_messages.is_empty(); + || !payload.unseen_channel_messages.is_empty() + || !payload.unseen_channel_buffer_changes.is_empty(); if channels_changed { if !payload.delete_channels.is_empty() { @@ -850,12 +869,20 @@ impl ChannelStore { index.insert(channel) } - for id_changed in payload.notes_changed { - index.note_changed(id_changed); + for unseen_buffer_change in payload.unseen_channel_buffer_changes { + let version = language::proto::deserialize_version(&unseen_buffer_change.version); + index.note_changed( + unseen_buffer_change.channel_id, + unseen_buffer_change.epoch, + &version, + ); } - for id_changed in payload.new_messages { - index.new_messages(id_changed); + for unseen_channel_message in payload.unseen_channel_messages { + index.new_messages( + unseen_channel_message.channel_id, + unseen_channel_message.message_id, + ); } for edge in payload.insert_edge { diff --git a/crates/channel/src/channel_store/channel_index.rs b/crates/channel/src/channel_store/channel_index.rs index 513e20e3a7..2a93ca6573 100644 --- a/crates/channel/src/channel_store/channel_index.rs +++ b/crates/channel/src/channel_store/channel_index.rs @@ -39,17 +39,38 @@ impl ChannelIndex { } } - pub fn clear_note_changed(&mut self, channel_id: ChannelId) { + pub fn acknowledge_note_version( + &mut self, + channel_id: ChannelId, + epoch: u64, + version: &clock::Global, + ) { if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { - Arc::make_mut(channel).has_note_changed = false; + let channel = Arc::make_mut(channel); + if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version { + if epoch > *unseen_epoch + || epoch == *unseen_epoch && version.observed_all(unseen_version) + { + channel.unseen_note_version = None; + } + } } } - pub fn clear_message_changed(&mut self, channel_id: ChannelId) { + pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) { if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { - Arc::make_mut(channel).has_new_messages = false; + let channel = Arc::make_mut(channel); + if let Some(unseen_message_id) = channel.unseen_message_id { + if message_id >= unseen_message_id { + channel.unseen_message_id = None; + } + } } } + + pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) { + insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version); + } } impl Deref for ChannelIndex { @@ -88,15 +109,14 @@ impl<'a> ChannelPathsInsertGuard<'a> { } } - pub fn note_changed(&mut self, channel_id: ChannelId) { - if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { - Arc::make_mut(channel).has_note_changed = true; - } + pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) { + insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version); } - pub fn new_messages(&mut self, channel_id: ChannelId) { + pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) { if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { - Arc::make_mut(channel).has_new_messages = true; + let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0); + *unseen_message_id = message_id.max(*unseen_message_id); } } @@ -109,8 +129,8 @@ impl<'a> ChannelPathsInsertGuard<'a> { Arc::new(Channel { id: channel_proto.id, name: channel_proto.name, - has_note_changed: false, - has_new_messages: false, + unseen_note_version: None, + unseen_message_id: None, }), ); self.insert_root(channel_proto.id); @@ -186,3 +206,21 @@ fn channel_path_sorting_key<'a>( path.iter() .map(|id| Some(channels_by_id.get(id)?.name.as_str())) } + +fn insert_note_changed( + channels_by_id: &mut BTreeMap>, + channel_id: u64, + epoch: u64, + version: &clock::Global, +) { + if let Some(channel) = channels_by_id.get_mut(&channel_id) { + let unseen_version = Arc::make_mut(channel) + .unseen_note_version + .get_or_insert((0, clock::Global::new())); + if epoch > unseen_version.0 { + *unseen_version = (epoch, version.clone()); + } else { + unseen_version.1.join(&version); + } + } +} diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 5eae700404..5767ac54b7 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -34,7 +34,7 @@ use std::{ future::Future, marker::PhantomData, path::PathBuf, - sync::{Arc, Weak}, + sync::{atomic::AtomicU64, Arc, Weak}, time::{Duration, Instant}, }; use telemetry::Telemetry; @@ -105,7 +105,7 @@ pub fn init(client: &Arc, cx: &mut AppContext) { } pub struct Client { - id: usize, + id: AtomicU64, peer: Arc, http: Arc, telemetry: Arc, @@ -374,7 +374,7 @@ impl settings::Setting for TelemetrySettings { impl Client { pub fn new(http: Arc, cx: &AppContext) -> Arc { Arc::new(Self { - id: 0, + id: AtomicU64::new(0), peer: Peer::new(0), telemetry: Telemetry::new(http.clone(), cx), http, @@ -387,17 +387,16 @@ impl Client { }) } - pub fn id(&self) -> usize { - self.id + pub fn id(&self) -> u64 { + self.id.load(std::sync::atomic::Ordering::SeqCst) } pub fn http_client(&self) -> Arc { self.http.clone() } - #[cfg(any(test, feature = "test-support"))] - pub fn set_id(&mut self, id: usize) -> &Self { - self.id = id; + pub fn set_id(&self, id: u64) -> &Self { + self.id.store(id, std::sync::atomic::Ordering::SeqCst); self } @@ -454,7 +453,7 @@ impl Client { } fn set_status(self: &Arc, status: Status, cx: &AsyncAppContext) { - log::info!("set status on client {}: {:?}", self.id, status); + log::info!("set status on client {}: {:?}", self.id(), status); let mut state = self.state.write(); *state.status.0.borrow_mut() = status; @@ -805,6 +804,7 @@ impl Client { } } let credentials = credentials.unwrap(); + self.set_id(credentials.user_id); if was_disconnected { self.set_status(Status::Connecting, cx); @@ -1221,7 +1221,7 @@ impl Client { } pub fn send(&self, message: T) -> Result<()> { - log::debug!("rpc send. client_id:{}, name:{}", self.id, T::NAME); + log::debug!("rpc send. client_id:{}, name:{}", self.id(), T::NAME); self.peer.send(self.connection_id()?, message) } @@ -1237,7 +1237,7 @@ impl Client { &self, request: T, ) -> impl Future>> { - let client_id = self.id; + let client_id = self.id(); log::debug!( "rpc request start. client_id:{}. name:{}", client_id, @@ -1258,7 +1258,7 @@ impl Client { } fn respond(&self, receipt: Receipt, response: T::Response) -> Result<()> { - log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME); + log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME); self.peer.respond(receipt, response) } @@ -1267,7 +1267,7 @@ impl Client { receipt: Receipt, error: proto::Error, ) -> Result<()> { - log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME); + log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME); self.peer.respond_with_error(receipt, error) } @@ -1336,7 +1336,7 @@ impl Client { if let Some(handler) = handler { let future = handler(subscriber, message, &self, cx.clone()); - let client_id = self.id; + let client_id = self.id(); log::debug!( "rpc message received. client_id:{}, sender_id:{:?}, type:{}", client_id, diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 13bb3c06e8..e60b7cc33d 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -439,8 +439,8 @@ pub struct ChannelsForUser { pub channels: ChannelGraph, pub channel_participants: HashMap>, pub channels_with_admin_privileges: HashSet, - pub channels_with_changed_notes: HashSet, - pub channels_with_new_messages: HashSet, + pub unseen_buffer_changes: Vec, + pub channel_messages: Vec, } #[derive(Debug)] diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index 78ccd9e54a..c85432f2bb 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -432,7 +432,12 @@ impl Database { channel_id: ChannelId, user: UserId, operations: &[proto::Operation], - ) -> Result<(Vec, Vec)> { + ) -> Result<( + Vec, + Vec, + i32, + Vec, + )> { self.transaction(move |tx| async move { self.check_user_is_channel_member(channel_id, user, &*tx) .await?; @@ -453,6 +458,7 @@ impl Database { .collect::>(); let mut channel_members; + let max_version; if !operations.is_empty() { let max_operation = operations @@ -460,6 +466,11 @@ impl Database { .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref())) .unwrap(); + max_version = vec![proto::VectorClockEntry { + replica_id: *max_operation.replica_id.as_ref() as u32, + timestamp: *max_operation.lamport_timestamp.as_ref() as u32, + }]; + // get current channel participants and save the max operation above self.save_max_operation( user, @@ -492,6 +503,7 @@ impl Database { .await?; } else { channel_members = Vec::new(); + max_version = Vec::new(); } let mut connections = Vec::new(); @@ -510,7 +522,7 @@ impl Database { }); } - Ok((connections, channel_members)) + Ok((connections, channel_members, buffer.epoch, max_version)) }) .await } @@ -712,12 +724,12 @@ impl Database { .await } - pub async fn channels_with_changed_notes( + pub async fn unseen_channel_buffer_changes( &self, user_id: UserId, channel_ids: &[ChannelId], tx: &DatabaseTransaction, - ) -> Result> { + ) -> Result> { #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)] enum QueryIds { ChannelId, @@ -750,37 +762,45 @@ impl Database { } drop(rows); - let last_operations = self - .get_last_operations_for_buffers(channel_ids_by_buffer_id.keys().copied(), &*tx) + let latest_operations = self + .get_latest_operations_for_buffers(channel_ids_by_buffer_id.keys().copied(), &*tx) .await?; - let mut channels_with_new_changes = HashSet::default(); - for last_operation in last_operations { - if let Some(observed_edit) = observed_edits_by_buffer_id.get(&last_operation.buffer_id) - { - if observed_edit.epoch == last_operation.epoch - && observed_edit.lamport_timestamp == last_operation.lamport_timestamp - && observed_edit.replica_id == last_operation.replica_id + let mut changes = Vec::default(); + for latest in latest_operations { + if let Some(observed) = observed_edits_by_buffer_id.get(&latest.buffer_id) { + if ( + observed.epoch, + observed.lamport_timestamp, + observed.replica_id, + ) >= (latest.epoch, latest.lamport_timestamp, latest.replica_id) { continue; } } - if let Some(channel_id) = channel_ids_by_buffer_id.get(&last_operation.buffer_id) { - channels_with_new_changes.insert(*channel_id); + if let Some(channel_id) = channel_ids_by_buffer_id.get(&latest.buffer_id) { + changes.push(proto::UnseenChannelBufferChange { + channel_id: channel_id.to_proto(), + epoch: latest.epoch as u64, + version: vec![proto::VectorClockEntry { + replica_id: latest.replica_id as u32, + timestamp: latest.lamport_timestamp as u32, + }], + }); } } - Ok(channels_with_new_changes) + Ok(changes) } - pub async fn get_last_operations_for_buffers( + pub async fn get_latest_operations_for_buffers( &self, - channel_ids: impl IntoIterator, + buffer_ids: impl IntoIterator, tx: &DatabaseTransaction, ) -> Result> { let mut values = String::new(); - for id in channel_ids { + for id in buffer_ids { if !values.is_empty() { values.push_str(", "); } @@ -795,13 +815,10 @@ impl Database { r#" SELECT * - FROM ( + FROM + ( SELECT - buffer_id, - epoch, - lamport_timestamp, - replica_id, - value, + *, row_number() OVER ( PARTITION BY buffer_id ORDER BY @@ -812,17 +829,17 @@ impl Database { FROM buffer_operations WHERE buffer_id in ({values}) - ) AS operations + ) AS last_operations WHERE row_number = 1 "#, ); let stmt = Statement::from_string(self.pool.get_database_backend(), sql); - let operations = buffer_operation::Model::find_by_statement(stmt) + Ok(buffer_operation::Entity::find() + .from_raw_sql(stmt) .all(&*tx) - .await?; - Ok(operations) + .await?) } } diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index 207cca7657..ab31f59541 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -463,20 +463,20 @@ impl Database { } let channel_ids = graph.channels.iter().map(|c| c.id).collect::>(); - let channels_with_changed_notes = self - .channels_with_changed_notes(user_id, &channel_ids, &*tx) + let channel_buffer_changes = self + .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx) .await?; - let channels_with_new_messages = self - .channels_with_new_messages(user_id, &channel_ids, &*tx) + let unseen_messages = self + .unseen_channel_messages(user_id, &channel_ids, &*tx) .await?; Ok(ChannelsForUser { channels: graph, channel_participants, channels_with_admin_privileges, - channels_with_changed_notes, - channels_with_new_messages, + unseen_buffer_changes: channel_buffer_changes, + channel_messages: unseen_messages, }) } diff --git a/crates/collab/src/db/queries/messages.rs b/crates/collab/src/db/queries/messages.rs index 893c1726da..db1252230e 100644 --- a/crates/collab/src/db/queries/messages.rs +++ b/crates/collab/src/db/queries/messages.rs @@ -279,12 +279,12 @@ impl Database { Ok(()) } - pub async fn channels_with_new_messages( + pub async fn unseen_channel_messages( &self, user_id: UserId, channel_ids: &[ChannelId], tx: &DatabaseTransaction, - ) -> Result> { + ) -> Result> { let mut observed_messages_by_channel_id = HashMap::default(); let mut rows = observed_channel_messages::Entity::find() .filter(observed_channel_messages::Column::UserId.eq(user_id)) @@ -334,7 +334,7 @@ impl Database { .all(&*tx) .await?; - let mut channels_with_new_changes = HashSet::default(); + let mut changes = Vec::new(); for last_message in last_messages { if let Some(observed_message) = observed_messages_by_channel_id.get(&last_message.channel_id) @@ -343,10 +343,13 @@ impl Database { continue; } } - channels_with_new_changes.insert(last_message.channel_id); + changes.push(proto::UnseenChannelMessage { + channel_id: last_message.channel_id.to_proto(), + message_id: last_message.id.to_proto(), + }); } - Ok(channels_with_new_changes) + Ok(changes) } pub async fn remove_channel_message( diff --git a/crates/collab/src/db/tests/buffer_tests.rs b/crates/collab/src/db/tests/buffer_tests.rs index 407cc22108..e5d8ab8cf9 100644 --- a/crates/collab/src/db/tests/buffer_tests.rs +++ b/crates/collab/src/db/tests/buffer_tests.rs @@ -235,7 +235,7 @@ async fn test_channel_buffers_last_operations(db: &Database) { .transaction(|tx| { let buffers = &buffers; async move { - db.get_last_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx) + db.get_latest_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx) .await } }) @@ -299,7 +299,7 @@ async fn test_channel_buffers_last_operations(db: &Database) { .transaction(|tx| { let buffers = &buffers; async move { - db.get_last_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx) + db.get_latest_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx) .await } }) @@ -317,7 +317,7 @@ async fn test_channel_buffers_last_operations(db: &Database) { .transaction(|tx| { let buffers = &buffers; async move { - db.get_last_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx) + db.get_latest_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx) .await } }) @@ -331,11 +331,11 @@ async fn test_channel_buffers_last_operations(db: &Database) { ], ); - let changed_channels = db + let buffer_changes = db .transaction(|tx| { let buffers = &buffers; async move { - db.channels_with_changed_notes( + db.unseen_channel_buffer_changes( observer_id, &[ buffers[0].channel_id, @@ -349,31 +349,42 @@ async fn test_channel_buffers_last_operations(db: &Database) { }) .await .unwrap(); + assert_eq!( - changed_channels, + buffer_changes, [ - buffers[0].channel_id, - buffers[1].channel_id, - buffers[2].channel_id, + rpc::proto::UnseenChannelBufferChange { + channel_id: buffers[0].channel_id.to_proto(), + epoch: 0, + version: serialize_version(&text_buffers[0].version()), + }, + rpc::proto::UnseenChannelBufferChange { + channel_id: buffers[1].channel_id.to_proto(), + epoch: 1, + version: serialize_version(&text_buffers[1].version()), + }, + rpc::proto::UnseenChannelBufferChange { + channel_id: buffers[2].channel_id.to_proto(), + epoch: 0, + version: serialize_version(&text_buffers[2].version()), + }, ] - .into_iter() - .collect::>() ); db.observe_buffer_version( buffers[1].id, observer_id, 1, - &serialize_version(&text_buffers[1].version()), + serialize_version(&text_buffers[1].version()).as_slice(), ) .await .unwrap(); - let changed_channels = db + let buffer_changes = db .transaction(|tx| { let buffers = &buffers; async move { - db.channels_with_changed_notes( + db.unseen_channel_buffer_changes( observer_id, &[ buffers[0].channel_id, @@ -387,11 +398,21 @@ async fn test_channel_buffers_last_operations(db: &Database) { }) .await .unwrap(); + assert_eq!( - changed_channels, - [buffers[0].channel_id, buffers[2].channel_id,] - .into_iter() - .collect::>() + buffer_changes, + [ + rpc::proto::UnseenChannelBufferChange { + channel_id: buffers[0].channel_id.to_proto(), + epoch: 0, + version: serialize_version(&text_buffers[0].version()), + }, + rpc::proto::UnseenChannelBufferChange { + channel_id: buffers[2].channel_id.to_proto(), + epoch: 0, + version: serialize_version(&text_buffers[2].version()), + }, + ] ); // Observe an earlier version of the buffer. @@ -407,11 +428,11 @@ async fn test_channel_buffers_last_operations(db: &Database) { .await .unwrap(); - let changed_channels = db + let buffer_changes = db .transaction(|tx| { let buffers = &buffers; async move { - db.channels_with_changed_notes( + db.unseen_channel_buffer_changes( observer_id, &[ buffers[0].channel_id, @@ -425,11 +446,21 @@ async fn test_channel_buffers_last_operations(db: &Database) { }) .await .unwrap(); + assert_eq!( - changed_channels, - [buffers[0].channel_id, buffers[2].channel_id,] - .into_iter() - .collect::>() + buffer_changes, + [ + rpc::proto::UnseenChannelBufferChange { + channel_id: buffers[0].channel_id.to_proto(), + epoch: 0, + version: serialize_version(&text_buffers[0].version()), + }, + rpc::proto::UnseenChannelBufferChange { + channel_id: buffers[2].channel_id.to_proto(), + epoch: 0, + version: serialize_version(&text_buffers[2].version()), + }, + ] ); } diff --git a/crates/collab/src/db/tests/message_tests.rs b/crates/collab/src/db/tests/message_tests.rs index f3d385e4a0..4966ef1bda 100644 --- a/crates/collab/src/db/tests/message_tests.rs +++ b/crates/collab/src/db/tests/message_tests.rs @@ -144,25 +144,32 @@ async fn test_channel_message_new_notification(db: &Arc) { .await .unwrap(); - let _ = db + let (fourth_message, _, _) = db .create_channel_message(channel_2, user, "2_1", OffsetDateTime::now_utc(), 4) .await .unwrap(); // Check that observer has new messages - let channels_with_new_messages = db + let unseen_messages = db .transaction(|tx| async move { - db.channels_with_new_messages(observer, &[channel_1, channel_2], &*tx) + db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) .await }) .await .unwrap(); assert_eq!( - channels_with_new_messages, - [channel_1, channel_2] - .into_iter() - .collect::>() + unseen_messages, + [ + rpc::proto::UnseenChannelMessage { + channel_id: channel_1.to_proto(), + message_id: third_message.to_proto(), + }, + rpc::proto::UnseenChannelMessage { + channel_id: channel_2.to_proto(), + message_id: fourth_message.to_proto(), + }, + ] ); // Observe the second message @@ -171,18 +178,25 @@ async fn test_channel_message_new_notification(db: &Arc) { .unwrap(); // Make sure the observer still has a new message - let channels_with_new_messages = db + let unseen_messages = db .transaction(|tx| async move { - db.channels_with_new_messages(observer, &[channel_1, channel_2], &*tx) + db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) .await }) .await .unwrap(); assert_eq!( - channels_with_new_messages, - [channel_1, channel_2] - .into_iter() - .collect::>() + unseen_messages, + [ + rpc::proto::UnseenChannelMessage { + channel_id: channel_1.to_proto(), + message_id: third_message.to_proto(), + }, + rpc::proto::UnseenChannelMessage { + channel_id: channel_2.to_proto(), + message_id: fourth_message.to_proto(), + }, + ] ); // Observe the third message, @@ -191,16 +205,20 @@ async fn test_channel_message_new_notification(db: &Arc) { .unwrap(); // Make sure the observer does not have a new method - let channels_with_new_messages = db + let unseen_messages = db .transaction(|tx| async move { - db.channels_with_new_messages(observer, &[channel_1, channel_2], &*tx) + db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) .await }) .await .unwrap(); + assert_eq!( - channels_with_new_messages, - [channel_2].into_iter().collect::>() + unseen_messages, + [rpc::proto::UnseenChannelMessage { + channel_id: channel_2.to_proto(), + message_id: fourth_message.to_proto(), + }] ); // Observe the second message again, should not regress our observed state @@ -208,16 +226,19 @@ async fn test_channel_message_new_notification(db: &Arc) { .await .unwrap(); - // Make sure the observer does not have a new method - let channels_with_new_messages = db + // Make sure the observer does not have a new message + let unseen_messages = db .transaction(|tx| async move { - db.channels_with_new_messages(observer, &[channel_1, channel_2], &*tx) + db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) .await }) .await .unwrap(); assert_eq!( - channels_with_new_messages, - [channel_2].into_iter().collect::>() + unseen_messages, + [rpc::proto::UnseenChannelMessage { + channel_id: channel_2.to_proto(), + message_id: fourth_message.to_proto(), + }] ); } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 371d0466c1..1f0ecce2bf 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -274,7 +274,8 @@ impl Server { .add_message_handler(unfollow) .add_message_handler(update_followers) .add_message_handler(update_diff_base) - .add_request_handler(get_private_user_info); + .add_request_handler(get_private_user_info) + .add_message_handler(acknowledge_channel_message); Arc::new(server) } @@ -2568,16 +2569,8 @@ async fn respond_to_channel_invite( name: channel.name, }), ); - update.notes_changed = result - .channels_with_changed_notes - .iter() - .map(|id| id.to_proto()) - .collect(); - update.new_messages = result - .channels_with_new_messages - .iter() - .map(|id| id.to_proto()) - .collect(); + update.unseen_channel_messages = result.channel_messages; + update.unseen_channel_buffer_changes = result.unseen_buffer_changes; update.insert_edge = result.channels.edges; update .channel_participants @@ -2701,7 +2694,7 @@ async fn update_channel_buffer( let db = session.db().await; let channel_id = ChannelId::from_proto(request.channel_id); - let (collaborators, non_collaborators) = db + let (collaborators, non_collaborators, epoch, version) = db .update_channel_buffer(channel_id, session.user_id, &request.operations) .await?; @@ -2726,7 +2719,11 @@ async fn update_channel_buffer( session.peer.send( peer_id.into(), proto::UpdateChannels { - notes_changed: vec![channel_id.to_proto()], + unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange { + channel_id: channel_id.to_proto(), + epoch: epoch as u64, + version: version.clone(), + }], ..Default::default() }, ) @@ -2859,9 +2856,7 @@ async fn send_channel_message( message: Some(message), })?; - dbg!(&non_participants); let pool = &*session.connection_pool().await; - broadcast( None, non_participants @@ -2871,7 +2866,10 @@ async fn send_channel_message( session.peer.send( peer_id.into(), proto::UpdateChannels { - new_messages: vec![channel_id.to_proto()], + unseen_channel_messages: vec![proto::UnseenChannelMessage { + channel_id: channel_id.to_proto(), + message_id: message_id.to_proto(), + }], ..Default::default() }, ) @@ -2900,6 +2898,20 @@ async fn remove_channel_message( Ok(()) } +async fn acknowledge_channel_message( + request: proto::AckChannelMessage, + session: Session, +) -> Result<()> { + let channel_id = ChannelId::from_proto(request.channel_id); + let message_id = MessageId::from_proto(request.message_id); + session + .db() + .await + .observe_channel_message(channel_id, session.user_id, message_id) + .await?; + Ok(()) +} + async fn join_channel_chat( request: proto::JoinChannelChat, response: Response, @@ -3035,18 +3047,8 @@ fn build_initial_channels_update( }); } - update.notes_changed = channels - .channels_with_changed_notes - .iter() - .map(|channel_id| channel_id.to_proto()) - .collect(); - - update.new_messages = channels - .channels_with_new_messages - .iter() - .map(|channel_id| channel_id.to_proto()) - .collect(); - + update.unseen_channel_buffer_changes = channels.unseen_buffer_changes; + update.unseen_channel_messages = channels.channel_messages; update.insert_edge = channels.channels.edges; for (channel_id, participants) in channels.channel_participants { diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index 68acffacf8..d9d12d485b 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -445,8 +445,8 @@ fn channel(id: u64, name: &'static str) -> Channel { Channel { id, name: name.to_string(), - has_note_changed: false, - has_new_messages: false, + unseen_note_version: None, + unseen_message_id: None, } } diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index a56df311bd..cf5b58703c 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -151,12 +151,12 @@ impl TestServer { Arc::get_mut(&mut client) .unwrap() - .set_id(user_id.0 as usize) + .set_id(user_id.to_proto()) .override_authenticate(move |cx| { cx.spawn(|_| async move { let access_token = "the-token".to_string(); Ok(Credentials { - user_id: user_id.0 as u64, + user_id: user_id.to_proto(), access_token, }) }) diff --git a/crates/collab_ui/src/channel_view.rs b/crates/collab_ui/src/channel_view.rs index 1d9e409748..a955768050 100644 --- a/crates/collab_ui/src/channel_view.rs +++ b/crates/collab_ui/src/channel_view.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Result}; use call::report_call_event_for_channel; -use channel::{Channel, ChannelBuffer, ChannelBufferEvent, ChannelId}; +use channel::{Channel, ChannelBuffer, ChannelBufferEvent, ChannelId, ChannelStore}; use client::{ proto::{self, PeerId}, Collaborator, ParticipantIndex, @@ -36,6 +36,7 @@ pub fn init(cx: &mut AppContext) { pub struct ChannelView { pub editor: ViewHandle, project: ModelHandle, + channel_store: ModelHandle, channel_buffer: ModelHandle, remote_id: Option, _editor_event_subscription: Subscription, @@ -94,7 +95,13 @@ impl ChannelView { pane.update(&mut cx, |pane, cx| { pane.items_of_type::() .find(|channel_view| channel_view.read(cx).channel_buffer == channel_buffer) - .unwrap_or_else(|| cx.add_view(|cx| Self::new(project, channel_buffer, cx))) + .unwrap_or_else(|| { + cx.add_view(|cx| { + let mut this = Self::new(project, channel_store, channel_buffer, cx); + this.acknowledge_buffer_version(cx); + this + }) + }) }) .ok_or_else(|| anyhow!("pane was dropped")) }) @@ -102,6 +109,7 @@ impl ChannelView { pub fn new( project: ModelHandle, + channel_store: ModelHandle, channel_buffer: ModelHandle, cx: &mut ViewContext, ) -> Self { @@ -121,6 +129,7 @@ impl ChannelView { Self { editor, project, + channel_store, channel_buffer, remote_id: None, _editor_event_subscription, @@ -137,13 +146,44 @@ impl ChannelView { event: &ChannelBufferEvent, cx: &mut ViewContext, ) { - if let ChannelBufferEvent::Disconnected = event { - self.editor.update(cx, |editor, cx| { + match event { + ChannelBufferEvent::Disconnected => self.editor.update(cx, |editor, cx| { editor.set_read_only(true); cx.notify(); - }) + }), + ChannelBufferEvent::BufferEdited => { + if cx.is_self_focused() || self.editor.is_focused(cx) { + self.acknowledge_buffer_version(cx); + } else { + self.channel_store.update(cx, |store, cx| { + let channel_buffer = self.channel_buffer.read(cx); + store.notes_changed( + channel_buffer.channel().id, + channel_buffer.epoch(), + &channel_buffer.buffer().read(cx).version(), + cx, + ) + }); + } + } + _ => {} } } + + fn acknowledge_buffer_version(&mut self, cx: &mut ViewContext<'_, '_, ChannelView>) { + self.channel_store.update(cx, |store, cx| { + let channel_buffer = self.channel_buffer.read(cx); + store.acknowledge_notes_version( + channel_buffer.channel().id, + channel_buffer.epoch(), + &channel_buffer.buffer().read(cx).version(), + cx, + ) + }); + self.channel_buffer.update(cx, |buffer, cx| { + buffer.acknowledge_buffer_version(cx); + }); + } } impl Entity for ChannelView { @@ -161,6 +201,7 @@ impl View for ChannelView { fn focus_in(&mut self, _: AnyViewHandle, cx: &mut ViewContext) { if cx.is_self_focused() { + self.acknowledge_buffer_version(cx); cx.focus(self.editor.as_any()) } } @@ -200,6 +241,7 @@ impl Item for ChannelView { fn clone_on_split(&self, _: WorkspaceId, cx: &mut ViewContext) -> Option { Some(Self::new( self.project.clone(), + self.channel_store.clone(), self.channel_buffer.clone(), cx, )) diff --git a/crates/collab_ui/src/chat_panel.rs b/crates/collab_ui/src/chat_panel.rs index 81a421e8d9..626b3004d7 100644 --- a/crates/collab_ui/src/chat_panel.rs +++ b/crates/collab_ui/src/chat_panel.rs @@ -42,6 +42,7 @@ pub struct ChatPanel { local_timezone: UtcOffset, fs: Arc, width: Option, + active: bool, pending_serialization: Task>, subscriptions: Vec, workspace: WeakViewHandle, @@ -138,6 +139,7 @@ impl ChatPanel { has_focus: false, subscriptions: Vec::new(), workspace: workspace_handle, + active: false, width: None, }; @@ -154,9 +156,9 @@ impl ChatPanel { }), ); - this.init_active_channel(cx); + this.update_channel_count(cx); cx.observe(&this.channel_store, |this, _, cx| { - this.init_active_channel(cx); + this.update_channel_count(cx) }) .detach(); @@ -225,10 +227,8 @@ impl ChatPanel { ); } - fn init_active_channel(&mut self, cx: &mut ViewContext) { + fn update_channel_count(&mut self, cx: &mut ViewContext) { let channel_count = self.channel_store.read(cx).channel_count(); - self.message_list.reset(0); - self.active_chat = None; self.channel_select.update(cx, |select, cx| { select.set_item_count(channel_count, cx); }); @@ -247,6 +247,7 @@ impl ChatPanel { } let subscription = cx.subscribe(&chat, Self::channel_did_change); self.active_chat = Some((chat, subscription)); + self.acknowledge_last_message(cx); self.channel_select.update(cx, |select, cx| { if let Some(ix) = self.channel_store.read(cx).index_of_channel(id) { select.set_selected_index(ix, cx); @@ -268,11 +269,22 @@ impl ChatPanel { new_count, } => { self.message_list.splice(old_range.clone(), *new_count); + self.acknowledge_last_message(cx); } } cx.notify(); } + fn acknowledge_last_message(&mut self, cx: &mut ViewContext<'_, '_, ChatPanel>) { + if self.active { + if let Some((chat, _)) = &self.active_chat { + chat.update(cx, |chat, cx| { + chat.acknowledge_last_message(cx); + }); + } + } + } + fn render_channel(&self, cx: &mut ViewContext) -> AnyElement { let theme = theme::current(cx); Flex::column() @@ -627,8 +639,12 @@ impl Panel for ChatPanel { } fn set_active(&mut self, active: bool, cx: &mut ViewContext) { - if active && !is_chat_feature_enabled(cx) { - cx.emit(Event::Dismissed); + self.active = active; + if active { + self.acknowledge_last_message(cx); + if !is_chat_feature_enabled(cx) { + cx.emit(Event::Dismissed); + } } } diff --git a/crates/collab_ui/src/collab_panel.rs b/crates/collab_ui/src/collab_panel.rs index 53d5140a12..5e90d438fc 100644 --- a/crates/collab_ui/src/collab_panel.rs +++ b/crates/collab_ui/src/collab_panel.rs @@ -1821,7 +1821,7 @@ impl CollabPanel { channel.name.clone(), theme .channel_name - .in_state(channel.has_new_messages) + .in_state(channel.unseen_message_id.is_some()) .text .clone(), ) @@ -1880,7 +1880,7 @@ impl CollabPanel { let participants = self.channel_store.read(cx).channel_participants(channel_id); if participants.is_empty() { - if channel.has_note_changed { + if channel.unseen_note_version.is_some() { Svg::new("icons/terminal.svg") .with_color(theme.channel_note_active_color) .constrained() diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 0f289edcf8..3501e70e6a 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -957,8 +957,19 @@ message UpdateChannels { repeated uint64 remove_channel_invitations = 6; repeated ChannelParticipants channel_participants = 7; repeated ChannelPermission channel_permissions = 8; - repeated uint64 notes_changed = 9; - repeated uint64 new_messages = 10; + repeated UnseenChannelMessage unseen_channel_messages = 9; + repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10; +} + +message UnseenChannelMessage { + uint64 channel_id = 1; + uint64 message_id = 2; +} + +message UnseenChannelBufferChange { + uint64 channel_id = 1; + uint64 epoch = 2; + repeated VectorClockEntry version = 3; } message ChannelEdge { @@ -1127,8 +1138,7 @@ message RejoinChannelBuffersResponse { message AckBufferOperation { uint64 buffer_id = 1; uint64 epoch = 2; - uint64 lamport_timestamp = 3; - uint64 replica_id = 4; + repeated VectorClockEntry version = 3; } message JoinChannelBufferResponse {