WIP: Send the channel name and the channel edges seperately, so we're not repeating them constantly
This commit is currently broken and includes debug data for a failed attempt at rewriting the insert_edge logic
This commit is contained in:
parent
363867c65b
commit
5f9c56c8b0
13 changed files with 437 additions and 760 deletions
|
@ -1,3 +1,4 @@
|
|||
use rpc::proto::ChannelEdge;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use super::*;
|
||||
|
@ -326,7 +327,6 @@ impl Database {
|
|||
.map(|channel| Channel {
|
||||
id: channel.id,
|
||||
name: channel.name,
|
||||
parent_id: None,
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -335,12 +335,12 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
|
||||
async fn get_channels_internal(
|
||||
async fn get_channel_graph(
|
||||
&self,
|
||||
parents_by_child_id: ChannelDescendants,
|
||||
trim_dangling_parents: bool,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<Vec<Channel>> {
|
||||
) -> Result<ChannelGraph> {
|
||||
let mut channels = Vec::with_capacity(parents_by_child_id.len());
|
||||
{
|
||||
let mut rows = channel::Entity::find()
|
||||
|
@ -349,49 +349,36 @@ impl Database {
|
|||
.await?;
|
||||
while let Some(row) = rows.next().await {
|
||||
let row = row?;
|
||||
// As these rows are pulled from the map's keys, this unwrap is safe.
|
||||
let parents = parents_by_child_id.get(&row.id).unwrap();
|
||||
if parents.len() > 0 {
|
||||
let mut added_channel = false;
|
||||
for parent in parents.iter() {
|
||||
// Trim out any dangling parent pointers.
|
||||
// That the user doesn't have access to
|
||||
if trim_dangling_parents {
|
||||
if parents_by_child_id.contains_key(parent) {
|
||||
added_channel = true;
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name.clone(),
|
||||
parent_id: Some(*parent),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
added_channel = true;
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name.clone(),
|
||||
parent_id: Some(*parent),
|
||||
});
|
||||
}
|
||||
}
|
||||
if !added_channel {
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name,
|
||||
parent_id: None,
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
let mut edges = Vec::with_capacity(parents_by_child_id.len());
|
||||
for (channel, parents) in parents_by_child_id.iter() {
|
||||
for parent in parents.into_iter() {
|
||||
if trim_dangling_parents {
|
||||
if parents_by_child_id.contains_key(parent) {
|
||||
edges.push(ChannelEdge {
|
||||
channel_id: channel.to_proto(),
|
||||
parent_id: parent.to_proto(),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channels.push(Channel {
|
||||
id: row.id,
|
||||
name: row.name,
|
||||
parent_id: None,
|
||||
edges.push(ChannelEdge {
|
||||
channel_id: channel.to_proto(),
|
||||
parent_id: parent.to_proto(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(channels)
|
||||
Ok(ChannelGraph {
|
||||
channels,
|
||||
edges,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
|
||||
|
@ -407,51 +394,74 @@ impl Database {
|
|||
.all(&*tx)
|
||||
.await?;
|
||||
|
||||
let parents_by_child_id = self
|
||||
.get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
|
||||
.await?;
|
||||
|
||||
let channels_with_admin_privileges = channel_memberships
|
||||
.iter()
|
||||
.filter_map(|membership| membership.admin.then_some(membership.channel_id))
|
||||
.collect();
|
||||
|
||||
let channels = self
|
||||
.get_channels_internal(parents_by_child_id, true, &tx)
|
||||
.await?;
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||
enum QueryUserIdsAndChannelIds {
|
||||
ChannelId,
|
||||
UserId,
|
||||
}
|
||||
|
||||
let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
|
||||
{
|
||||
let mut rows = room_participant::Entity::find()
|
||||
.inner_join(room::Entity)
|
||||
.filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
|
||||
.select_only()
|
||||
.column(room::Column::ChannelId)
|
||||
.column(room_participant::Column::UserId)
|
||||
.into_values::<_, QueryUserIdsAndChannelIds>()
|
||||
.stream(&*tx)
|
||||
.await?;
|
||||
while let Some(row) = rows.next().await {
|
||||
let row: (ChannelId, UserId) = row?;
|
||||
channel_participants.entry(row.0).or_default().push(row.1)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ChannelsForUser {
|
||||
channels,
|
||||
channel_participants,
|
||||
channels_with_admin_privileges,
|
||||
})
|
||||
self.get_user_channels(channel_memberships, user_id, &tx).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_channel_for_user(&self, channel_id: ChannelId, user_id: UserId) -> Result<ChannelsForUser> {
|
||||
self.transaction(|tx| async move {
|
||||
let tx = tx;
|
||||
|
||||
let channel_membership = channel_member::Entity::find()
|
||||
.filter(
|
||||
channel_member::Column::UserId
|
||||
.eq(user_id)
|
||||
.and(channel_member::Column::ChannelId.eq(channel_id))
|
||||
.and(channel_member::Column::Accepted.eq(true)),
|
||||
)
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
|
||||
self.get_user_channels(channel_membership, user_id, &tx).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_user_channels(&self, channel_memberships: Vec<channel_member::Model>, user_id: UserId, tx: &DatabaseTransaction) -> Result<ChannelsForUser> {
|
||||
let parents_by_child_id = self
|
||||
.get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
|
||||
.await?;
|
||||
|
||||
let channels_with_admin_privileges = channel_memberships
|
||||
.iter()
|
||||
.filter_map(|membership| membership.admin.then_some(membership.channel_id))
|
||||
.collect();
|
||||
|
||||
let graph = self
|
||||
.get_channel_graph(parents_by_child_id, true, &tx)
|
||||
.await?;
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
|
||||
enum QueryUserIdsAndChannelIds {
|
||||
ChannelId,
|
||||
UserId,
|
||||
}
|
||||
|
||||
let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
|
||||
{
|
||||
let mut rows = room_participant::Entity::find()
|
||||
.inner_join(room::Entity)
|
||||
.filter(room::Column::ChannelId.is_in(graph.channels.iter().map(|c| c.id)))
|
||||
.select_only()
|
||||
.column(room::Column::ChannelId)
|
||||
.column(room_participant::Column::UserId)
|
||||
.into_values::<_, QueryUserIdsAndChannelIds>()
|
||||
.stream(&*tx)
|
||||
.await?;
|
||||
while let Some(row) = rows.next().await {
|
||||
let row: (ChannelId, UserId) = row?;
|
||||
channel_participants.entry(row.0).or_default().push(row.1)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ChannelsForUser {
|
||||
channels: graph,
|
||||
channel_participants,
|
||||
channels_with_admin_privileges,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
|
||||
self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
|
||||
.await
|
||||
|
@ -759,7 +769,6 @@ impl Database {
|
|||
Channel {
|
||||
id: channel.id,
|
||||
name: channel.name,
|
||||
parent_id: None,
|
||||
},
|
||||
is_accepted,
|
||||
)))
|
||||
|
@ -792,7 +801,7 @@ impl Database {
|
|||
user: UserId,
|
||||
channel: ChannelId,
|
||||
to: ChannelId,
|
||||
) -> Result<Vec<Channel>> {
|
||||
) -> Result<ChannelGraph> {
|
||||
self.transaction(|tx| async move {
|
||||
// Note that even with these maxed permissions, this linking operation
|
||||
// is still insecure because you can't remove someone's permissions to a
|
||||
|
@ -811,7 +820,7 @@ impl Database {
|
|||
channel: ChannelId,
|
||||
to: ChannelId,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<Vec<Channel>> {
|
||||
) -> Result<ChannelGraph> {
|
||||
self.check_user_is_channel_admin(to, user, &*tx).await?;
|
||||
|
||||
let to_ancestors = self.get_channel_ancestors(to, &*tx).await?;
|
||||
|
@ -881,7 +890,7 @@ impl Database {
|
|||
}
|
||||
|
||||
let channels = self
|
||||
.get_channels_internal(from_descendants, false, &*tx)
|
||||
.get_channel_graph(from_descendants, false, &*tx)
|
||||
.await?;
|
||||
|
||||
Ok(channels)
|
||||
|
@ -961,7 +970,7 @@ impl Database {
|
|||
channel: ChannelId,
|
||||
from: ChannelId,
|
||||
to: ChannelId,
|
||||
) -> Result<Vec<Channel>> {
|
||||
) -> Result<ChannelGraph> {
|
||||
self.transaction(|tx| async move {
|
||||
self.check_user_is_channel_admin(channel, user, &*tx)
|
||||
.await?;
|
||||
|
@ -982,6 +991,39 @@ enum QueryUserIds {
|
|||
UserId,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ChannelGraph {
|
||||
pub channels: Vec<Channel>,
|
||||
pub edges: Vec<ChannelEdge>,
|
||||
}
|
||||
|
||||
impl ChannelGraph {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.channels.is_empty() && self.edges.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl PartialEq for ChannelGraph {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// Order independent comparison for tests
|
||||
let channels_set = self.channels.iter().collect::<HashSet<_>>();
|
||||
let other_channels_set = other.channels.iter().collect::<HashSet<_>>();
|
||||
let edges_set = self.edges.iter().map(|edge| (edge.channel_id, edge.parent_id)).collect::<HashSet<_>>();
|
||||
let other_edges_set = other.edges.iter().map(|edge| (edge.channel_id, edge.parent_id)).collect::<HashSet<_>>();
|
||||
|
||||
channels_set == other_channels_set && edges_set == other_edges_set
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
impl PartialEq for ChannelGraph {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.channels == other.channels && self.edges == other.edges
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
struct SmallSet<T>(SmallVec<[T; 1]>);
|
||||
|
||||
impl<T> Deref for SmallSet<T> {
|
||||
|
@ -1008,7 +1050,7 @@ impl<T> SmallSet<T> {
|
|||
Err(ix) => {
|
||||
self.0.insert(ix, value);
|
||||
true
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue