Clean up implementation of channel index, get simple channel moving test cases working
This commit is contained in:
parent
9afb67f2cf
commit
67ad75a376
6 changed files with 114 additions and 111 deletions
|
@ -747,7 +747,7 @@ impl ChannelStore {
|
|||
}
|
||||
}
|
||||
|
||||
let channels_changed = !payload.channels.is_empty() || !payload.delete_channels.is_empty();
|
||||
let channels_changed = !payload.channels.is_empty() || !payload.delete_channels.is_empty() || !payload.delete_edge.is_empty();
|
||||
if channels_changed {
|
||||
if !payload.delete_channels.is_empty() {
|
||||
self.channel_index.delete_channels(&payload.delete_channels);
|
||||
|
@ -768,16 +768,19 @@ impl ChannelStore {
|
|||
}
|
||||
}
|
||||
|
||||
let mut channel_index = self.channel_index.start_upsert();
|
||||
let mut index_edit = self.channel_index.bulk_edit();
|
||||
|
||||
for channel in payload.channels {
|
||||
channel_index.upsert(channel)
|
||||
index_edit.upsert(channel)
|
||||
}
|
||||
|
||||
for edge in payload.delete_edge {
|
||||
index_edit
|
||||
.delete_edge(edge.parent_id, edge.channel_id);
|
||||
}
|
||||
}
|
||||
|
||||
for edge in payload.delete_channel_edge {
|
||||
self.channel_index
|
||||
.delete_edge(edge.parent_id, edge.channel_id);
|
||||
}
|
||||
|
||||
|
||||
for permission in payload.channel_permissions {
|
||||
if permission.is_admin {
|
||||
|
|
|
@ -4,7 +4,6 @@ use collections::HashMap;
|
|||
use rpc::proto;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
|
||||
|
||||
use crate::{Channel, ChannelId};
|
||||
|
||||
pub type ChannelsById = HashMap<ChannelId, Arc<Channel>>;
|
||||
|
@ -48,18 +47,40 @@ impl ChannelIndex {
|
|||
self.channels_by_id.clear();
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.paths.len()
|
||||
/// Delete the given channels from this index.
|
||||
pub fn delete_channels(&mut self, channels: &[ChannelId]) {
|
||||
self.channels_by_id
|
||||
.retain(|channel_id, _| !channels.contains(channel_id));
|
||||
self.paths.retain(|path| {
|
||||
path.iter()
|
||||
.all(|channel_id| self.channels_by_id.contains_key(channel_id))
|
||||
});
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: usize) -> Option<&ChannelPath> {
|
||||
self.paths.get(idx)
|
||||
pub fn bulk_edit(&mut self) -> ChannelPathsEditGuard {
|
||||
ChannelPathsEditGuard {
|
||||
paths: &mut self.paths,
|
||||
channels_by_id: &mut self.channels_by_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &ChannelPath> {
|
||||
self.paths.iter()
|
||||
impl Deref for ChannelIndex {
|
||||
type Target = [ChannelPath];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.paths
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard for ensuring that the paths index maintains its sort and uniqueness
|
||||
/// invariants after a series of insertions
|
||||
pub struct ChannelPathsEditGuard<'a> {
|
||||
paths: &'a mut Vec<ChannelPath>,
|
||||
channels_by_id: &'a mut ChannelsById,
|
||||
}
|
||||
|
||||
impl<'a> ChannelPathsEditGuard<'a> {
|
||||
/// Remove the given edge from this index. This will not remove the channel.
|
||||
/// If this operation would result in a dangling edge, re-insert it.
|
||||
pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
|
||||
|
@ -69,56 +90,16 @@ impl ChannelIndex {
|
|||
.any(|window| window == [parent_id, channel_id])
|
||||
});
|
||||
|
||||
|
||||
// Ensure that there is at least one channel path in the index
|
||||
if !self
|
||||
.paths
|
||||
.iter()
|
||||
.any(|path| path.iter().any(|id| id == &channel_id))
|
||||
{
|
||||
let path = ChannelPath(Arc::from([channel_id]));
|
||||
let current_item: Vec<_> =
|
||||
channel_path_sorting_key(&path, &self.channels_by_id).collect();
|
||||
match self.paths.binary_search_by(|channel_path| {
|
||||
current_item
|
||||
.iter()
|
||||
.copied()
|
||||
.cmp(channel_path_sorting_key(channel_path, &self.channels_by_id))
|
||||
}) {
|
||||
Ok(ix) => self.paths.insert(ix, path),
|
||||
Err(ix) => self.paths.insert(ix, path),
|
||||
}
|
||||
self.insert_root(channel_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete the given channels from this index.
|
||||
pub fn delete_channels(&mut self, channels: &[ChannelId]) {
|
||||
self.channels_by_id
|
||||
.retain(|channel_id, _| !channels.contains(channel_id));
|
||||
self.paths.retain(|channel_path| {
|
||||
!channel_path
|
||||
.iter()
|
||||
.any(|channel_id| channels.contains(channel_id))
|
||||
})
|
||||
}
|
||||
|
||||
/// Upsert one or more channels into this index.
|
||||
pub fn start_upsert(&mut self) -> ChannelPathsUpsertGuard {
|
||||
ChannelPathsUpsertGuard {
|
||||
paths: &mut self.paths,
|
||||
channels_by_id: &mut self.channels_by_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard for ensuring that the paths index maintains its sort and uniqueness
|
||||
/// invariants after a series of insertions
|
||||
pub struct ChannelPathsUpsertGuard<'a> {
|
||||
paths: &'a mut Vec<ChannelPath>,
|
||||
channels_by_id: &'a mut ChannelsById,
|
||||
}
|
||||
|
||||
impl<'a> ChannelPathsUpsertGuard<'a> {
|
||||
pub fn upsert(&mut self, channel_proto: proto::Channel) {
|
||||
if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
|
||||
Arc::make_mut(existing_channel).name = channel_proto.name;
|
||||
|
@ -149,9 +130,12 @@ impl<'a> ChannelPathsUpsertGuard<'a> {
|
|||
let mut new_path = path.to_vec();
|
||||
new_path.push(channel_id);
|
||||
self.paths.insert(ix + 1, ChannelPath(new_path.into()));
|
||||
ix += 2;
|
||||
} else if path.len() == 1 && path[0] == channel_id {
|
||||
self.paths.swap_remove(ix);
|
||||
} else {
|
||||
ix += 1;
|
||||
}
|
||||
ix += 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,7 +144,7 @@ impl<'a> ChannelPathsUpsertGuard<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for ChannelPathsUpsertGuard<'a> {
|
||||
impl<'a> Drop for ChannelPathsEditGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
self.paths.sort_by(|a, b| {
|
||||
let a = channel_path_sorting_key(a, &self.channels_by_id);
|
||||
|
@ -168,10 +152,6 @@ impl<'a> Drop for ChannelPathsUpsertGuard<'a> {
|
|||
a.cmp(b)
|
||||
});
|
||||
self.paths.dedup();
|
||||
self.paths.retain(|path| {
|
||||
path.iter()
|
||||
.all(|channel_id| self.channels_by_id.contains_key(channel_id))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -333,9 +333,10 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
|
||||
async fn get_all_channels(
|
||||
async fn get_channels_internal(
|
||||
&self,
|
||||
parents_by_child_id: ChannelDescendants,
|
||||
trim_dangling_parents: bool,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<Vec<Channel>> {
|
||||
let mut channels = Vec::with_capacity(parents_by_child_id.len());
|
||||
|
@ -346,17 +347,38 @@ impl Database {
|
|||
.await?;
|
||||
while let Some(row) = rows.next().await {
|
||||
let row = row?;
|
||||
|
||||
// As these rows are pulled from the map's keys, this unwrap is safe.
|
||||
let parents = parents_by_child_id.get(&row.id).unwrap();
|
||||
if parents.len() > 0 {
|
||||
let mut added_channel = false;
|
||||
for parent in parents {
|
||||
// Trim out any dangling parent pointers.
|
||||
// That the user doesn't have access to
|
||||
if trim_dangling_parents {
|
||||
if parents_by_child_id.contains_key(parent) {
|
||||
added_channel = true;
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name.clone(),
|
||||
parent_id: Some(*parent),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
added_channel = true;
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name.clone(),
|
||||
parent_id: Some(*parent),
|
||||
});
|
||||
}
|
||||
}
|
||||
if !added_channel {
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name,
|
||||
parent_id: None,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
|
@ -392,7 +414,8 @@ impl Database {
|
|||
.filter_map(|membership| membership.admin.then_some(membership.channel_id))
|
||||
.collect();
|
||||
|
||||
let channels = self.get_all_channels(parents_by_child_id, &tx).await?;
|
||||
let channels = self.get_channels_internal(parents_by_child_id, true, &tx).await?;
|
||||
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||
enum QueryUserIdsAndChannelIds {
|
||||
|
@ -854,7 +877,7 @@ impl Database {
|
|||
channel.insert(to);
|
||||
}
|
||||
|
||||
let channels = self.get_all_channels(from_descendants, &*tx).await?;
|
||||
let channels = self.get_channels_internal(from_descendants, false, &*tx).await?;
|
||||
|
||||
Ok(channels)
|
||||
}
|
||||
|
|
|
@ -2441,7 +2441,7 @@ async fn unlink_channel(
|
|||
let members = db.get_channel_members(from).await?;
|
||||
|
||||
let update = proto::UpdateChannels {
|
||||
delete_channel_edge: vec![proto::ChannelEdge {
|
||||
delete_edge: vec![proto::ChannelEdge {
|
||||
channel_id: channel_id.to_proto(),
|
||||
parent_id: from.to_proto(),
|
||||
}],
|
||||
|
@ -2478,7 +2478,7 @@ async fn move_channel(
|
|||
let members_to = db.get_channel_members(channel_id).await?;
|
||||
|
||||
let update = proto::UpdateChannels {
|
||||
delete_channel_edge: vec![proto::ChannelEdge {
|
||||
delete_edge: vec![proto::ChannelEdge {
|
||||
channel_id: channel_id.to_proto(),
|
||||
parent_id: from_parent.to_proto(),
|
||||
}],
|
||||
|
|
|
@ -910,11 +910,6 @@ async fn test_channel_moving(
|
|||
let channel_c_id = channels[2];
|
||||
let channel_d_id = channels[3];
|
||||
|
||||
dbg!(channel_a_id);
|
||||
dbg!(channel_b_id);
|
||||
dbg!(channel_c_id);
|
||||
dbg!(channel_d_id);
|
||||
|
||||
// Current shape:
|
||||
// a - b - c - d
|
||||
assert_channels_list_shape(
|
||||
|
@ -987,6 +982,7 @@ async fn test_channel_moving(
|
|||
let channel_ga_id = b_channels[1];
|
||||
let channel_ep_id = b_channels[2];
|
||||
|
||||
|
||||
// Current shape for B:
|
||||
// /- ep
|
||||
// mu -- ga
|
||||
|
@ -995,8 +991,8 @@ async fn test_channel_moving(
|
|||
cx_b,
|
||||
&[
|
||||
(channel_mu_id, 0),
|
||||
(channel_ep_id, 1),
|
||||
(channel_ga_id, 1),
|
||||
(channel_ep_id, 1)
|
||||
],
|
||||
);
|
||||
|
||||
|
@ -1006,6 +1002,36 @@ async fn test_channel_moving(
|
|||
// mu -- ga
|
||||
// /---------\
|
||||
// b -- c -- d
|
||||
assert_channels_list_shape(
|
||||
client_b.channel_store(),
|
||||
cx_b,
|
||||
&[
|
||||
// New channels from a
|
||||
(channel_b_id, 0),
|
||||
(channel_c_id, 1),
|
||||
(channel_d_id, 2),
|
||||
(channel_d_id, 1),
|
||||
|
||||
// B's old channels
|
||||
(channel_mu_id, 0),
|
||||
(channel_ep_id, 1),
|
||||
(channel_ga_id, 1),
|
||||
|
||||
],
|
||||
);
|
||||
|
||||
client_b
|
||||
.channel_store()
|
||||
.update(cx_b, |channel_store, cx| {
|
||||
channel_store.link_channel(channel_b_id, channel_ep_id, cx)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Current shape for B:
|
||||
// /---------\
|
||||
// /- ep -- b -- c -- d
|
||||
// mu -- ga
|
||||
assert_channels_list_shape(
|
||||
client_b.channel_store(),
|
||||
cx_b,
|
||||
|
@ -1015,46 +1041,17 @@ async fn test_channel_moving(
|
|||
(channel_ga_id, 1),
|
||||
(channel_ep_id, 1),
|
||||
|
||||
// New channels from a
|
||||
(channel_b_id, 0),
|
||||
(channel_c_id, 1),
|
||||
(channel_d_id, 1),
|
||||
(channel_d_id, 2),
|
||||
// New channels from a, now under epsilon
|
||||
(channel_b_id, 2),
|
||||
(channel_c_id, 3),
|
||||
(channel_d_id, 3),
|
||||
(channel_d_id, 4),
|
||||
],
|
||||
);
|
||||
|
||||
// client_b
|
||||
// .channel_store()
|
||||
// .update(cx_a, |channel_store, cx| {
|
||||
// channel_store.move_channel(channel_a_b_id, None, channel_b_epsilon_id, cx)
|
||||
// })
|
||||
// .await
|
||||
// .unwrap();
|
||||
|
||||
// // Current shape for B:
|
||||
// // /---------\
|
||||
// // /- ep -- b -- c -- d
|
||||
// // mu -- ga
|
||||
// assert_channels_list_shape(
|
||||
// client_b.channel_store(),
|
||||
// cx_b,
|
||||
// &[
|
||||
// // B's old channels
|
||||
// (channel_b_mu_id, 0),
|
||||
// (channel_b_gamma_id, 1),
|
||||
// (channel_b_epsilon_id, 1),
|
||||
|
||||
// // New channels from a, now under epsilon
|
||||
// (channel_a_b_id, 2),
|
||||
// (channel_a_c_id, 3),
|
||||
// (channel_a_d_id, 3),
|
||||
// (channel_a_d_id, 4),
|
||||
// ],
|
||||
// );
|
||||
|
||||
client_b
|
||||
.channel_store()
|
||||
.update(cx_a, |channel_store, cx| {
|
||||
.update(cx_b, |channel_store, cx| {
|
||||
channel_store.link_channel(channel_ga_id, channel_b_id, cx)
|
||||
})
|
||||
.await
|
||||
|
|
|
@ -959,7 +959,7 @@ message LspDiskBasedDiagnosticsUpdated {}
|
|||
|
||||
message UpdateChannels {
|
||||
repeated Channel channels = 1;
|
||||
repeated ChannelEdge delete_channel_edge = 2;
|
||||
repeated ChannelEdge delete_edge = 2;
|
||||
repeated uint64 delete_channels = 3;
|
||||
repeated Channel channel_invitations = 4;
|
||||
repeated uint64 remove_channel_invitations = 5;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue