Fix notifications on channel changes
This commit is contained in:
parent
0ce1ec5d15
commit
2b11463567
6 changed files with 598 additions and 262 deletions
|
@ -2220,26 +2220,13 @@ async fn create_channel(
|
|||
return Ok(());
|
||||
};
|
||||
|
||||
let members: Vec<proto::ChannelMember> = db
|
||||
.get_channel_participant_details(parent_id, session.user_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|member| {
|
||||
member.role() == proto::ChannelRole::Admin
|
||||
|| member.role() == proto::ChannelRole::Member
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut updates: HashMap<UserId, proto::UpdateChannels> = HashMap::default();
|
||||
|
||||
for member in members {
|
||||
let user_id = UserId::from_proto(member.user_id);
|
||||
let channels = db.get_channel_for_user(parent_id, user_id).await?;
|
||||
updates.insert(user_id, build_initial_channels_update(channels, vec![]));
|
||||
}
|
||||
let updates = db
|
||||
.participants_to_notify_for_channel_change(parent_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
for (user_id, update) in updates {
|
||||
for (user_id, channels) in updates {
|
||||
let update = build_initial_channels_update(channels, vec![]);
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
if user_id == session.user_id {
|
||||
continue;
|
||||
|
@ -2353,31 +2340,55 @@ async fn set_channel_visibility(
|
|||
let channel_id = ChannelId::from_proto(request.channel_id);
|
||||
let visibility = request.visibility().into();
|
||||
|
||||
let channel = db
|
||||
.set_channel_visibility(channel_id, visibility, session.user_id)
|
||||
.await?;
|
||||
|
||||
let members = db
|
||||
let previous_members = db
|
||||
.get_channel_participant_details(channel_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
db.set_channel_visibility(channel_id, visibility, session.user_id)
|
||||
.await?;
|
||||
|
||||
let mut updates: HashMap<UserId, ChannelsForUser> = db
|
||||
.participants_to_notify_for_channel_change(channel_id, session.user_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let mut participants_who_lost_access: HashSet<UserId> = HashSet::default();
|
||||
match visibility {
|
||||
ChannelVisibility::Members => {
|
||||
for member in previous_members {
|
||||
if ChannelRole::from(member.role()).can_only_see_public_descendants() {
|
||||
participants_who_lost_access.insert(UserId::from_proto(member.user_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
ChannelVisibility::Public => {
|
||||
if let Some(public_parent_id) = db.public_parent_channel_id(channel_id).await? {
|
||||
let parent_updates = db
|
||||
.participants_to_notify_for_channel_change(public_parent_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
for (user_id, channels) in parent_updates {
|
||||
updates.insert(user_id, channels);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
// TODO: notify people who were guests and are now not allowed.
|
||||
for member in members {
|
||||
for connection_id in connection_pool.user_connection_ids(UserId::from_proto(member.user_id))
|
||||
{
|
||||
session.peer.send(
|
||||
connection_id,
|
||||
proto::UpdateChannels {
|
||||
channels: vec![proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
name: channel.name.clone(),
|
||||
visibility: channel.visibility.into(),
|
||||
role: member.role.into(),
|
||||
}],
|
||||
..Default::default()
|
||||
},
|
||||
)?;
|
||||
for (user_id, channels) in updates {
|
||||
let update = build_initial_channels_update(channels, vec![]);
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
for user_id in participants_who_lost_access {
|
||||
let update = proto::UpdateChannels {
|
||||
delete_channels: vec![channel_id.to_proto()],
|
||||
..Default::default()
|
||||
};
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2485,42 +2496,20 @@ async fn link_channel(
|
|||
// TODO: Remove this restriction once we have symlinks
|
||||
db.assert_root_channel(channel_id).await?;
|
||||
|
||||
let channels_to_send = db.link_channel(session.user_id, channel_id, to).await?;
|
||||
let members = db.get_channel_members_and_roles(to).await?;
|
||||
db.link_channel(session.user_id, channel_id, to).await?;
|
||||
|
||||
let member_updates = db
|
||||
.participants_to_notify_for_channel_change(to, session.user_id)
|
||||
.await?;
|
||||
|
||||
dbg!(&member_updates);
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
|
||||
for (member_id, role) in members {
|
||||
let build_channel_proto = |channel: &db::Channel| proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
visibility: channel.visibility.into(),
|
||||
name: channel.name.clone(),
|
||||
role: role.into(),
|
||||
};
|
||||
|
||||
for (member_id, channels) in member_updates {
|
||||
let update = build_initial_channels_update(channels, vec![]);
|
||||
for connection_id in connection_pool.user_connection_ids(member_id) {
|
||||
let channels: Vec<_> = if role == ChannelRole::Guest {
|
||||
channels_to_send
|
||||
.channels
|
||||
.iter()
|
||||
.filter(|channel| channel.visibility != ChannelVisibility::Public)
|
||||
.map(build_channel_proto)
|
||||
.collect()
|
||||
} else {
|
||||
channels_to_send
|
||||
.channels
|
||||
.iter()
|
||||
.map(build_channel_proto)
|
||||
.collect()
|
||||
};
|
||||
|
||||
session.peer.send(
|
||||
connection_id,
|
||||
proto::UpdateChannels {
|
||||
channels,
|
||||
insert_edge: channels_to_send.edges.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
)?;
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2548,11 +2537,11 @@ async fn move_channel(
|
|||
let from_parent = ChannelId::from_proto(request.from);
|
||||
let to = ChannelId::from_proto(request.to);
|
||||
|
||||
let from_public_parent = db
|
||||
.public_path_to_channel(from_parent)
|
||||
.await?
|
||||
.last()
|
||||
.copied();
|
||||
let previous_participants = db
|
||||
.get_channel_participant_details(channel_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
debug_assert_eq!(db.parent_channel_id(channel_id).await?, Some(from_parent));
|
||||
|
||||
let channels_to_send = db
|
||||
.move_channel(session.user_id, channel_id, from_parent, to)
|
||||
|
@ -2563,68 +2552,42 @@ async fn move_channel(
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let to_public_parent = db.public_path_to_channel(to).await?.last().cloned();
|
||||
let updates = db
|
||||
.participants_to_notify_for_channel_change(to, session.user_id)
|
||||
.await?;
|
||||
|
||||
let members_from = db
|
||||
.get_channel_participant_details(from_parent, session.user_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|member| {
|
||||
member.role() == proto::ChannelRole::Admin || member.role() == proto::ChannelRole::Guest
|
||||
});
|
||||
let mut participants_who_lost_access: HashSet<UserId> = HashSet::default();
|
||||
let mut channels_to_delete = db.get_channel_descendant_ids(channel_id).await?;
|
||||
channels_to_delete.insert(channel_id);
|
||||
|
||||
let members_to = db
|
||||
.get_channel_participant_details(to, session.user_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|member| {
|
||||
member.role() == proto::ChannelRole::Admin || member.role() == proto::ChannelRole::Guest
|
||||
});
|
||||
|
||||
let mut updates: HashMap<UserId, proto::UpdateChannels> = HashMap::default();
|
||||
|
||||
for member in members_to {
|
||||
let user_id = UserId::from_proto(member.user_id);
|
||||
let channels = db.get_channel_for_user(to, user_id).await?;
|
||||
updates.insert(user_id, build_initial_channels_update(channels, vec![]));
|
||||
}
|
||||
|
||||
if let Some(to_public_parent) = to_public_parent {
|
||||
// only notify guests of public channels (admins/members are notified by members_to above, and banned users don't care)
|
||||
let public_members_to = db
|
||||
.get_channel_participant_details(to, session.user_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|member| member.role() == proto::ChannelRole::Guest);
|
||||
|
||||
for member in public_members_to {
|
||||
let user_id = UserId::from_proto(member.user_id);
|
||||
if updates.contains_key(&user_id) {
|
||||
continue;
|
||||
}
|
||||
let channels = db.get_channel_for_user(to_public_parent, user_id).await?;
|
||||
updates.insert(user_id, build_initial_channels_update(channels, vec![]));
|
||||
for previous_participant in previous_participants.iter() {
|
||||
let user_id = UserId::from_proto(previous_participant.user_id);
|
||||
if previous_participant.kind() == proto::channel_member::Kind::AncestorMember {
|
||||
participants_who_lost_access.insert(user_id);
|
||||
}
|
||||
}
|
||||
|
||||
for member in members_from {
|
||||
let user_id = UserId::from_proto(member.user_id);
|
||||
let update = updates
|
||||
.entry(user_id)
|
||||
.or_insert(proto::UpdateChannels::default());
|
||||
update.delete_edge.push(proto::ChannelEdge {
|
||||
channel_id: channel_id.to_proto(),
|
||||
parent_id: from_parent.to_proto(),
|
||||
})
|
||||
}
|
||||
|
||||
if let Some(_from_public_parent) = from_public_parent {
|
||||
// TODO: for each guest member of the old public parent
|
||||
// delete the edge that they could see (from the from_public_parent down)
|
||||
}
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
for (user_id, update) in updates {
|
||||
for (user_id, channels) in updates {
|
||||
let mut update = build_initial_channels_update(channels, vec![]);
|
||||
update.delete_channels = channels_to_delete
|
||||
.iter()
|
||||
.map(|channel_id| channel_id.to_proto())
|
||||
.collect();
|
||||
participants_who_lost_access.remove(&user_id);
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
for user_id in participants_who_lost_access {
|
||||
let update = proto::UpdateChannels {
|
||||
delete_channels: channels_to_delete
|
||||
.iter()
|
||||
.map(|channel_id| channel_id.to_proto())
|
||||
.collect(),
|
||||
..Default::default()
|
||||
};
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue