From 67ad75a376bf77b173cf3cdf58a0c89eb79b4230 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Fri, 15 Sep 2023 00:38:02 -0700 Subject: [PATCH] Clean up implementation of channel index, get simple channel moving test cases working --- crates/channel/src/channel_store.rs | 17 ++-- .../src/channel_store/channel_index.rs | 88 +++++++------------ crates/collab/src/db/queries/channels.rs | 35 ++++++-- crates/collab/src/rpc.rs | 4 +- crates/collab/src/tests/channel_tests.rs | 79 ++++++++--------- crates/rpc/proto/zed.proto | 2 +- 6 files changed, 114 insertions(+), 111 deletions(-) diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index 08ff11f85d..b37f918052 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -747,7 +747,7 @@ impl ChannelStore { } } - let channels_changed = !payload.channels.is_empty() || !payload.delete_channels.is_empty(); + let channels_changed = !payload.channels.is_empty() || !payload.delete_channels.is_empty() || !payload.delete_edge.is_empty(); if channels_changed { if !payload.delete_channels.is_empty() { self.channel_index.delete_channels(&payload.delete_channels); @@ -768,16 +768,19 @@ impl ChannelStore { } } - let mut channel_index = self.channel_index.start_upsert(); + let mut index_edit = self.channel_index.bulk_edit(); + for channel in payload.channels { - channel_index.upsert(channel) + index_edit.upsert(channel) + } + + for edge in payload.delete_edge { + index_edit + .delete_edge(edge.parent_id, edge.channel_id); } } - for edge in payload.delete_channel_edge { - self.channel_index - .delete_edge(edge.parent_id, edge.channel_id); - } + for permission in payload.channel_permissions { if permission.is_admin { diff --git a/crates/channel/src/channel_store/channel_index.rs b/crates/channel/src/channel_store/channel_index.rs index 08c5060196..36e02f4134 100644 --- a/crates/channel/src/channel_store/channel_index.rs +++ b/crates/channel/src/channel_store/channel_index.rs @@ -4,7 +4,6 @@ use collections::HashMap; use rpc::proto; use serde_derive::{Deserialize, Serialize}; - use crate::{Channel, ChannelId}; pub type ChannelsById = HashMap>; @@ -48,18 +47,40 @@ impl ChannelIndex { self.channels_by_id.clear(); } - pub fn len(&self) -> usize { - self.paths.len() + /// Delete the given channels from this index. + pub fn delete_channels(&mut self, channels: &[ChannelId]) { + self.channels_by_id + .retain(|channel_id, _| !channels.contains(channel_id)); + self.paths.retain(|path| { + path.iter() + .all(|channel_id| self.channels_by_id.contains_key(channel_id)) + }); } - pub fn get(&self, idx: usize) -> Option<&ChannelPath> { - self.paths.get(idx) + pub fn bulk_edit(&mut self) -> ChannelPathsEditGuard { + ChannelPathsEditGuard { + paths: &mut self.paths, + channels_by_id: &mut self.channels_by_id, + } } +} - pub fn iter(&self) -> impl Iterator { - self.paths.iter() +impl Deref for ChannelIndex { + type Target = [ChannelPath]; + + fn deref(&self) -> &Self::Target { + &self.paths } +} +/// A guard for ensuring that the paths index maintains its sort and uniqueness +/// invariants after a series of insertions +pub struct ChannelPathsEditGuard<'a> { + paths: &'a mut Vec, + channels_by_id: &'a mut ChannelsById, +} + +impl<'a> ChannelPathsEditGuard<'a> { /// Remove the given edge from this index. This will not remove the channel. /// If this operation would result in a dangling edge, re-insert it. pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) { @@ -69,56 +90,16 @@ impl ChannelIndex { .any(|window| window == [parent_id, channel_id]) }); - // Ensure that there is at least one channel path in the index if !self .paths .iter() .any(|path| path.iter().any(|id| id == &channel_id)) { - let path = ChannelPath(Arc::from([channel_id])); - let current_item: Vec<_> = - channel_path_sorting_key(&path, &self.channels_by_id).collect(); - match self.paths.binary_search_by(|channel_path| { - current_item - .iter() - .copied() - .cmp(channel_path_sorting_key(channel_path, &self.channels_by_id)) - }) { - Ok(ix) => self.paths.insert(ix, path), - Err(ix) => self.paths.insert(ix, path), - } + self.insert_root(channel_id); } } - /// Delete the given channels from this index. - pub fn delete_channels(&mut self, channels: &[ChannelId]) { - self.channels_by_id - .retain(|channel_id, _| !channels.contains(channel_id)); - self.paths.retain(|channel_path| { - !channel_path - .iter() - .any(|channel_id| channels.contains(channel_id)) - }) - } - - /// Upsert one or more channels into this index. - pub fn start_upsert(&mut self) -> ChannelPathsUpsertGuard { - ChannelPathsUpsertGuard { - paths: &mut self.paths, - channels_by_id: &mut self.channels_by_id, - } - } -} - -/// A guard for ensuring that the paths index maintains its sort and uniqueness -/// invariants after a series of insertions -pub struct ChannelPathsUpsertGuard<'a> { - paths: &'a mut Vec, - channels_by_id: &'a mut ChannelsById, -} - -impl<'a> ChannelPathsUpsertGuard<'a> { pub fn upsert(&mut self, channel_proto: proto::Channel) { if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) { Arc::make_mut(existing_channel).name = channel_proto.name; @@ -149,9 +130,12 @@ impl<'a> ChannelPathsUpsertGuard<'a> { let mut new_path = path.to_vec(); new_path.push(channel_id); self.paths.insert(ix + 1, ChannelPath(new_path.into())); + ix += 2; + } else if path.len() == 1 && path[0] == channel_id { + self.paths.swap_remove(ix); + } else { ix += 1; } - ix += 1; } } @@ -160,7 +144,7 @@ impl<'a> ChannelPathsUpsertGuard<'a> { } } -impl<'a> Drop for ChannelPathsUpsertGuard<'a> { +impl<'a> Drop for ChannelPathsEditGuard<'a> { fn drop(&mut self) { self.paths.sort_by(|a, b| { let a = channel_path_sorting_key(a, &self.channels_by_id); @@ -168,10 +152,6 @@ impl<'a> Drop for ChannelPathsUpsertGuard<'a> { a.cmp(b) }); self.paths.dedup(); - self.paths.retain(|path| { - path.iter() - .all(|channel_id| self.channels_by_id.contains_key(channel_id)) - }); } } diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index f8ad453632..90374e76b2 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -333,9 +333,10 @@ impl Database { .await } - async fn get_all_channels( + async fn get_channels_internal( &self, parents_by_child_id: ChannelDescendants, + trim_dangling_parents: bool, tx: &DatabaseTransaction, ) -> Result> { let mut channels = Vec::with_capacity(parents_by_child_id.len()); @@ -346,15 +347,36 @@ impl Database { .await?; while let Some(row) = rows.next().await { let row = row?; - // As these rows are pulled from the map's keys, this unwrap is safe. let parents = parents_by_child_id.get(&row.id).unwrap(); if parents.len() > 0 { + let mut added_channel = false; for parent in parents { + // Trim out any dangling parent pointers. + // That the user doesn't have access to + if trim_dangling_parents { + if parents_by_child_id.contains_key(parent) { + added_channel = true; + channels.push(Channel { + id: row.id, + name: row.name.clone(), + parent_id: Some(*parent), + }); + } + } else { + added_channel = true; + channels.push(Channel { + id: row.id, + name: row.name.clone(), + parent_id: Some(*parent), + }); + } + } + if !added_channel { channels.push(Channel { id: row.id, - name: row.name.clone(), - parent_id: Some(*parent), + name: row.name, + parent_id: None, }); } } else { @@ -392,7 +414,8 @@ impl Database { .filter_map(|membership| membership.admin.then_some(membership.channel_id)) .collect(); - let channels = self.get_all_channels(parents_by_child_id, &tx).await?; + let channels = self.get_channels_internal(parents_by_child_id, true, &tx).await?; + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryUserIdsAndChannelIds { @@ -854,7 +877,7 @@ impl Database { channel.insert(to); } - let channels = self.get_all_channels(from_descendants, &*tx).await?; + let channels = self.get_channels_internal(from_descendants, false, &*tx).await?; Ok(channels) } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 6a5ba3e5d4..1d89c6fe42 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2441,7 +2441,7 @@ async fn unlink_channel( let members = db.get_channel_members(from).await?; let update = proto::UpdateChannels { - delete_channel_edge: vec![proto::ChannelEdge { + delete_edge: vec![proto::ChannelEdge { channel_id: channel_id.to_proto(), parent_id: from.to_proto(), }], @@ -2478,7 +2478,7 @@ async fn move_channel( let members_to = db.get_channel_members(channel_id).await?; let update = proto::UpdateChannels { - delete_channel_edge: vec![proto::ChannelEdge { + delete_edge: vec![proto::ChannelEdge { channel_id: channel_id.to_proto(), parent_id: from_parent.to_proto(), }], diff --git a/crates/collab/src/tests/channel_tests.rs b/crates/collab/src/tests/channel_tests.rs index 09583fda29..aeb2f1adcc 100644 --- a/crates/collab/src/tests/channel_tests.rs +++ b/crates/collab/src/tests/channel_tests.rs @@ -910,11 +910,6 @@ async fn test_channel_moving( let channel_c_id = channels[2]; let channel_d_id = channels[3]; - dbg!(channel_a_id); - dbg!(channel_b_id); - dbg!(channel_c_id); - dbg!(channel_d_id); - // Current shape: // a - b - c - d assert_channels_list_shape( @@ -987,6 +982,7 @@ async fn test_channel_moving( let channel_ga_id = b_channels[1]; let channel_ep_id = b_channels[2]; + // Current shape for B: // /- ep // mu -- ga @@ -995,8 +991,8 @@ async fn test_channel_moving( cx_b, &[ (channel_mu_id, 0), + (channel_ep_id, 1), (channel_ga_id, 1), - (channel_ep_id, 1) ], ); @@ -1006,6 +1002,36 @@ async fn test_channel_moving( // mu -- ga // /---------\ // b -- c -- d + assert_channels_list_shape( + client_b.channel_store(), + cx_b, + &[ + // New channels from a + (channel_b_id, 0), + (channel_c_id, 1), + (channel_d_id, 2), + (channel_d_id, 1), + + // B's old channels + (channel_mu_id, 0), + (channel_ep_id, 1), + (channel_ga_id, 1), + + ], + ); + + client_b + .channel_store() + .update(cx_b, |channel_store, cx| { + channel_store.link_channel(channel_b_id, channel_ep_id, cx) + }) + .await + .unwrap(); + + // Current shape for B: + // /---------\ + // /- ep -- b -- c -- d + // mu -- ga assert_channels_list_shape( client_b.channel_store(), cx_b, @@ -1015,46 +1041,17 @@ async fn test_channel_moving( (channel_ga_id, 1), (channel_ep_id, 1), - // New channels from a - (channel_b_id, 0), - (channel_c_id, 1), - (channel_d_id, 1), - (channel_d_id, 2), + // New channels from a, now under epsilon + (channel_b_id, 2), + (channel_c_id, 3), + (channel_d_id, 3), + (channel_d_id, 4), ], ); - // client_b - // .channel_store() - // .update(cx_a, |channel_store, cx| { - // channel_store.move_channel(channel_a_b_id, None, channel_b_epsilon_id, cx) - // }) - // .await - // .unwrap(); - - // // Current shape for B: - // // /---------\ - // // /- ep -- b -- c -- d - // // mu -- ga - // assert_channels_list_shape( - // client_b.channel_store(), - // cx_b, - // &[ - // // B's old channels - // (channel_b_mu_id, 0), - // (channel_b_gamma_id, 1), - // (channel_b_epsilon_id, 1), - - // // New channels from a, now under epsilon - // (channel_a_b_id, 2), - // (channel_a_c_id, 3), - // (channel_a_d_id, 3), - // (channel_a_d_id, 4), - // ], - // ); - client_b .channel_store() - .update(cx_a, |channel_store, cx| { + .update(cx_b, |channel_store, cx| { channel_store.link_channel(channel_ga_id, channel_b_id, cx) }) .await diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 54414f38f4..294f9a9706 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -959,7 +959,7 @@ message LspDiskBasedDiagnosticsUpdated {} message UpdateChannels { repeated Channel channels = 1; - repeated ChannelEdge delete_channel_edge = 2; + repeated ChannelEdge delete_edge = 2; repeated uint64 delete_channels = 3; repeated Channel channel_invitations = 4; repeated uint64 remove_channel_invitations = 5;