From dadad397ef0014f859fd7cae8cb3c69d14846d33 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Mon, 18 Sep 2023 20:24:33 -0700 Subject: [PATCH] Finish optimizing channel representations and operations --- Cargo.lock | 1 + crates/channel/Cargo.toml | 1 + crates/channel/src/channel_store.rs | 11 +- .../src/channel_store/channel_index.rs | 110 +++++++----------- crates/channel/src/channel_store_tests.rs | 1 - crates/collab/src/db.rs | 5 +- crates/collab/src/db/queries/channels.rs | 54 +++++---- crates/collab/src/db/tests/channel_tests.rs | 2 +- crates/collab/src/rpc.rs | 37 +++--- crates/collab_ui/src/collab_panel.rs | 28 ++++- 10 files changed, 132 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 327ca26937..e927ae5bf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1222,6 +1222,7 @@ dependencies = [ "serde", "serde_derive", "settings", + "smallvec", "smol", "sum_tree", "tempfile", diff --git a/crates/channel/Cargo.toml b/crates/channel/Cargo.toml index 00e9135bc1..16a1d418d5 100644 --- a/crates/channel/Cargo.toml +++ b/crates/channel/Cargo.toml @@ -28,6 +28,7 @@ anyhow.workspace = true futures.workspace = true image = "0.23" lazy_static.workspace = true +smallvec.workspace = true log.workspace = true parking_lot.workspace = true postage.workspace = true diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index 8a3875b2b9..02eddb9900 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -805,21 +805,18 @@ impl ChannelStore { } } - let mut index_edit = self.channel_index.bulk_edit(); - dbg!(&index_edit); + let mut index = self.channel_index.bulk_insert(); for channel in payload.channels { - index_edit.insert(channel) + index.insert(channel) } for edge in payload.insert_edge { - index_edit.insert_edge(edge.parent_id, edge.channel_id); + index.insert_edge(edge.channel_id, edge.parent_id); } for edge in payload.delete_edge { - index_edit.delete_edge(edge.parent_id, edge.channel_id); + index.delete_edge(edge.parent_id, edge.channel_id); } - drop(index_edit); - dbg!(&self.channel_index); } for permission in payload.channel_permissions { diff --git a/crates/channel/src/channel_store/channel_index.rs b/crates/channel/src/channel_store/channel_index.rs index b1b205a941..8fe2607f9e 100644 --- a/crates/channel/src/channel_store/channel_index.rs +++ b/crates/channel/src/channel_store/channel_index.rs @@ -1,10 +1,9 @@ use std::{ops::Deref, sync::Arc}; +use crate::{Channel, ChannelId}; use collections::HashMap; use rpc::proto; -use crate::{Channel, ChannelId}; - use super::ChannelPath; pub type ChannelsById = HashMap>; @@ -35,8 +34,8 @@ impl ChannelIndex { }); } - pub fn bulk_edit(&mut self) -> ChannelPathsEditGuard { - ChannelPathsEditGuard { + pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard { + ChannelPathsInsertGuard { paths: &mut self.paths, channels_by_id: &mut self.channels_by_id, } @@ -54,12 +53,12 @@ impl Deref for ChannelIndex { /// A guard for ensuring that the paths index maintains its sort and uniqueness /// invariants after a series of insertions #[derive(Debug)] -pub struct ChannelPathsEditGuard<'a> { +pub struct ChannelPathsInsertGuard<'a> { paths: &'a mut Vec, channels_by_id: &'a mut ChannelsById, } -impl<'a> ChannelPathsEditGuard<'a> { +impl<'a> ChannelPathsInsertGuard<'a> { /// Remove the given edge from this index. This will not remove the channel. /// If this operation would result in a dangling edge, re-insert it. pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) { @@ -94,69 +93,50 @@ impl<'a> ChannelPathsEditGuard<'a> { } } - pub fn insert_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) { - debug_assert!(self.channels_by_id.contains_key(&parent_id)); - let mut ix = 0; - println!("*********** INSERTING EDGE {}, {} ***********", channel_id, parent_id); - dbg!(&self.paths); - while ix < self.paths.len() { - let path = &self.paths[ix]; - println!("*********"); - dbg!(path); + pub fn insert_edge(&mut self, channel_id: ChannelId, parent_id: ChannelId) { + let mut parents = Vec::new(); + let mut descendants = Vec::new(); + let mut ixs_to_remove = Vec::new(); + for (ix, path) in self.paths.iter().enumerate() { + if path + .windows(2) + .any(|window| window[0] == parent_id && window[1] == channel_id) + { + // We already have this edge in the index + return; + } if path.ends_with(&[parent_id]) { - dbg!("Appending to parent path"); - let mut new_path = Vec::with_capacity(path.len() + 1); - new_path.extend_from_slice(path); - new_path.push(channel_id); - self.paths.insert(ix + 1, dbg!(ChannelPath::new(new_path.into()))); - ix += 2; - } else if let Some(path_ix) = path.iter().position(|c| c == &channel_id) { - if path.contains(&parent_id) { - dbg!("Doing nothing"); - ix += 1; - continue; + parents.push(path); + } else if let Some(position) = path.iter().position(|id| id == &channel_id) { + if position == 0 { + ixs_to_remove.push(ix); } - if path_ix == 0 && path.len() == 1 { - dbg!("Removing path that is just this"); - self.paths.swap_remove(ix); - continue; - } - // This is the busted section rn - // We're trying to do this weird, unsorted context - // free insertion thing, but we can't insert 'parent_id', - // we have to _prepend_ with _parent path to_, - // or something like that. - // It's a bit busted rn, I think I need to keep this whole thing - // sorted now, as this is a huge mess. - // Basically, we want to do the exact thing we do in the - // server, except explicitly. - // Also, rethink the bulk edit abstraction, it's use may no longer - // be as needed with the channel names and edges seperated. - dbg!("Expanding path which contains"); - let (left, right) = path.split_at(path_ix); - let mut new_path = Vec::with_capacity(left.len() + right.len() + 1); - - /// WRONG WRONG WRONG - new_path.extend_from_slice(left); - new_path.push(parent_id); - /// WRONG WRONG WRONG - - new_path.extend_from_slice(right); - if path_ix == 0 { - dbg!("Replacing path that starts with this"); - self.paths[ix] = dbg!(ChannelPath::new(new_path.into())); - } else { - dbg!("inserting new path"); - self.paths.insert(ix + 1, dbg!(ChannelPath::new(new_path.into()))); - ix += 1; - } - ix += 1; - } else { - dbg!("Doing nothing"); - ix += 1; + descendants.push(path.split_at(position).1); } } + + let mut new_paths = Vec::new(); + for parent in parents.iter() { + if descendants.is_empty() { + let mut new_path = Vec::with_capacity(parent.len() + 1); + new_path.extend_from_slice(parent); + new_path.push(channel_id); + new_paths.push(ChannelPath(new_path.into())); + } else { + for descendant in descendants.iter() { + let mut new_path = Vec::with_capacity(parent.len() + descendant.len()); + new_path.extend_from_slice(parent); + new_path.extend_from_slice(descendant); + new_paths.push(ChannelPath(new_path.into())); + } + } + } + + for ix in ixs_to_remove.into_iter().rev() { + self.paths.swap_remove(ix); + } + self.paths.extend(new_paths) } fn insert_root(&mut self, channel_id: ChannelId) { @@ -164,7 +144,7 @@ impl<'a> ChannelPathsEditGuard<'a> { } } -impl<'a> Drop for ChannelPathsEditGuard<'a> { +impl<'a> Drop for ChannelPathsInsertGuard<'a> { fn drop(&mut self) { self.paths.sort_by(|a, b| { let a = channel_path_sorting_key(a, &self.channels_by_id); diff --git a/crates/channel/src/channel_store_tests.rs b/crates/channel/src/channel_store_tests.rs index 59bfe341aa..775bf29425 100644 --- a/crates/channel/src/channel_store_tests.rs +++ b/crates/channel/src/channel_store_tests.rs @@ -22,7 +22,6 @@ fn test_update_channels(cx: &mut AppContext) { proto::Channel { id: 2, name: "a".to_string(), - }, ], channel_permissions: vec![proto::ChannelPermission { diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 11c9c98614..527c4faaa5 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -14,7 +14,10 @@ use collections::{BTreeMap, HashMap, HashSet}; use dashmap::DashMap; use futures::StreamExt; use rand::{prelude::StdRng, Rng, SeedableRng}; -use rpc::{proto::{self}, ConnectionId}; +use rpc::{ + proto::{self}, + ConnectionId, +}; use sea_orm::{ entity::prelude::*, ActiveValue, Condition, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index ca5e6f0df3..2d65139a98 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -375,10 +375,7 @@ impl Database { } } - Ok(ChannelGraph { - channels, - edges, - }) + Ok(ChannelGraph { channels, edges }) } pub async fn get_channels_for_user(&self, user_id: UserId) -> Result { @@ -394,12 +391,16 @@ impl Database { .all(&*tx) .await?; - self.get_user_channels(channel_memberships, user_id, &tx).await + self.get_user_channels(channel_memberships, &tx).await }) .await } - pub async fn get_channel_for_user(&self, channel_id: ChannelId, user_id: UserId) -> Result { + pub async fn get_channel_for_user( + &self, + channel_id: ChannelId, + user_id: UserId, + ) -> Result { self.transaction(|tx| async move { let tx = tx; @@ -413,12 +414,16 @@ impl Database { .all(&*tx) .await?; - self.get_user_channels(channel_membership, user_id, &tx).await + self.get_user_channels(channel_membership, &tx).await }) .await } - pub async fn get_user_channels(&self, channel_memberships: Vec, user_id: UserId, tx: &DatabaseTransaction) -> Result { + pub async fn get_user_channels( + &self, + channel_memberships: Vec, + tx: &DatabaseTransaction, + ) -> Result { let parents_by_child_id = self .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx) .await?; @@ -824,9 +829,9 @@ impl Database { self.check_user_is_channel_admin(to, user, &*tx).await?; let to_ancestors = self.get_channel_ancestors(to, &*tx).await?; - let mut from_descendants = self.get_channel_descendants([channel], &*tx).await?; + let mut channel_descendants = self.get_channel_descendants([channel], &*tx).await?; for ancestor in to_ancestors { - if from_descendants.contains_key(&ancestor) { + if channel_descendants.contains_key(&ancestor) { return Err(anyhow!("Cannot create a channel cycle").into()); } } @@ -853,15 +858,17 @@ impl Database { ], ); tx.execute(channel_paths_stmt).await?; - for (from_id, to_ids) in from_descendants.iter().filter(|(id, _)| id != &&channel) { - for to_id in to_ids.iter() { + for (descdenant_id, descendant_parent_ids) in + channel_descendants.iter().filter(|(id, _)| id != &&channel) + { + for descendant_parent_id in descendant_parent_ids.iter() { let channel_paths_stmt = Statement::from_sql_and_values( self.pool.get_database_backend(), sql, [ - from_id.to_proto().into(), - from_id.to_proto().into(), - to_id.to_proto().into(), + descdenant_id.to_proto().into(), + descdenant_id.to_proto().into(), + descendant_parent_id.to_proto().into(), ], ); tx.execute(channel_paths_stmt).await?; @@ -883,14 +890,14 @@ impl Database { tx.execute(channel_paths_stmt).await?; } - if let Some(channel) = from_descendants.get_mut(&channel) { + if let Some(channel) = channel_descendants.get_mut(&channel) { // Remove the other parents channel.clear(); channel.insert(to); } let channels = self - .get_channel_graph(from_descendants, false, &*tx) + .get_channel_graph(channel_descendants, false, &*tx) .await?; Ok(channels) @@ -1009,8 +1016,16 @@ impl PartialEq for ChannelGraph { // Order independent comparison for tests let channels_set = self.channels.iter().collect::>(); let other_channels_set = other.channels.iter().collect::>(); - let edges_set = self.edges.iter().map(|edge| (edge.channel_id, edge.parent_id)).collect::>(); - let other_edges_set = other.edges.iter().map(|edge| (edge.channel_id, edge.parent_id)).collect::>(); + let edges_set = self + .edges + .iter() + .map(|edge| (edge.channel_id, edge.parent_id)) + .collect::>(); + let other_edges_set = other + .edges + .iter() + .map(|edge| (edge.channel_id, edge.parent_id)) + .collect::>(); channels_set == other_channels_set && edges_set == other_edges_set } @@ -1023,7 +1038,6 @@ impl PartialEq for ChannelGraph { } } - struct SmallSet(SmallVec<[T; 1]>); impl Deref for SmallSet { diff --git a/crates/collab/src/db/tests/channel_tests.rs b/crates/collab/src/db/tests/channel_tests.rs index d8870cbd93..8a03014cf9 100644 --- a/crates/collab/src/db/tests/channel_tests.rs +++ b/crates/collab/src/db/tests/channel_tests.rs @@ -5,7 +5,7 @@ use rpc::{ }; use crate::{ - db::{queries::channels::ChannelGraph, ChannelId, Database, NewUserParams, tests::graph}, + db::{queries::channels::ChannelGraph, tests::graph, ChannelId, Database, NewUserParams}, test_both_dbs, }; use std::sync::Arc; diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 60f4216f64..87b7720235 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -3,8 +3,8 @@ mod connection_pool; use crate::{ auth, db::{ - self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, - ServerId, User, UserId, + self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User, + UserId, }, executor::Executor, AppState, Result, @@ -38,8 +38,8 @@ use lazy_static::lazy_static; use prometheus::{register_int_gauge, IntGauge}; use rpc::{ proto::{ - self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, - LiveKitConnectionInfo, RequestMessage, ChannelEdge, + self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, ChannelEdge, EntityMessage, + EnvelopedMessage, LiveKitConnectionInfo, RequestMessage, }, Connection, ConnectionId, Peer, Receipt, TypedEnvelope, }; @@ -2213,17 +2213,14 @@ async fn create_channel( let update = proto::UpdateChannels { channels: vec![channel], - insert_edge: vec![ - ChannelEdge { - parent_id: parent_id.to_proto(), - channel_id: id.to_proto(), - } - ], + insert_edge: vec![ChannelEdge { + parent_id: parent_id.to_proto(), + channel_id: id.to_proto(), + }], ..Default::default() }; - let user_ids_to_notify = - db.get_channel_members(parent_id).await?; + let user_ids_to_notify = db.get_channel_members(parent_id).await?; let connection_pool = session.connection_pool().await; for user_id in user_ids_to_notify { @@ -2549,10 +2546,16 @@ async fn respond_to_channel_invite( let result = db.get_channel_for_user(channel_id, session.user_id).await?; update .channels - .extend(result.channels.channels.into_iter().map(|channel| proto::Channel { - id: channel.id.to_proto(), - name: channel.name, - })); + .extend( + result + .channels + .channels + .into_iter() + .map(|channel| proto::Channel { + id: channel.id.to_proto(), + name: channel.name, + }), + ); update.insert_edge = result.channels.edges; update .channel_participants @@ -2971,7 +2974,7 @@ fn build_initial_channels_update( ) -> proto::UpdateChannels { let mut update = proto::UpdateChannels::default(); - for channel in channels.channels.channels{ + for channel in channels.channels.channels { update.channels.push(proto::Channel { id: channel.id.to_proto(), name: channel.name, diff --git a/crates/collab_ui/src/collab_panel.rs b/crates/collab_ui/src/collab_panel.rs index db69ce5460..70137407a1 100644 --- a/crates/collab_ui/src/collab_panel.rs +++ b/crates/collab_ui/src/collab_panel.rs @@ -2024,11 +2024,15 @@ impl CollabPanel { if let Some(channel_name) = channel_name { items.push(ContextMenuItem::action( format!("Move '#{}' here", channel_name), - MoveChannel { to: path.channel_id() }, + MoveChannel { + to: path.channel_id(), + }, )); items.push(ContextMenuItem::action( format!("Link '#{}' here", channel_name), - LinkChannel { to: path.channel_id() }, + LinkChannel { + to: path.channel_id(), + }, )); items.push(ContextMenuItem::Separator) } @@ -2046,7 +2050,12 @@ impl CollabPanel { location: path.clone(), }, ), - ContextMenuItem::action("Open Notes", OpenChannelBuffer { channel_id: path.channel_id() }), + ContextMenuItem::action( + "Open Notes", + OpenChannelBuffer { + channel_id: path.channel_id(), + }, + ), ]); if self.channel_store.read(cx).is_user_admin(path.channel_id()) { @@ -2306,7 +2315,6 @@ impl CollabPanel { } self.toggle_channel_collapsed(&path.clone(), cx); - } fn expand_selected_channel(&mut self, _: &ExpandSelectedChannel, cx: &mut ViewContext) { @@ -2324,11 +2332,19 @@ impl CollabPanel { self.toggle_channel_collapsed(path.to_owned(), cx) } - fn toggle_channel_collapsed_action(&mut self, action: &ToggleCollapse, cx: &mut ViewContext) { + fn toggle_channel_collapsed_action( + &mut self, + action: &ToggleCollapse, + cx: &mut ViewContext, + ) { self.toggle_channel_collapsed(&action.location, cx); } - fn toggle_channel_collapsed<'a>(&mut self, path: impl Into>, cx: &mut ViewContext) { + fn toggle_channel_collapsed<'a>( + &mut self, + path: impl Into>, + cx: &mut ViewContext, + ) { let path = path.into(); match self.collapsed_channels.binary_search(&path) { Ok(ix) => {