Refactor to avoid some (mostly hypothetical) races
Tidy up added code to reduce duplicity of X and X_internals.
This commit is contained in:
parent
2b11463567
commit
3853009d92
13 changed files with 715 additions and 765 deletions
|
@ -3,8 +3,9 @@ mod connection_pool;
|
|||
use crate::{
|
||||
auth,
|
||||
db::{
|
||||
self, BufferId, ChannelId, ChannelRole, ChannelVisibility, ChannelsForUser, Database,
|
||||
MessageId, ProjectId, RoomId, ServerId, User, UserId,
|
||||
self, BufferId, ChannelId, ChannelsForUser, CreateChannelResult, Database, MessageId,
|
||||
MoveChannelResult, ProjectId, RenameChannelResult, RoomId, ServerId,
|
||||
SetChannelVisibilityResult, User, UserId,
|
||||
},
|
||||
executor::Executor,
|
||||
AppState, Result,
|
||||
|
@ -590,7 +591,7 @@ impl Server {
|
|||
let mut pool = this.connection_pool.lock();
|
||||
pool.add_connection(connection_id, user_id, user.admin);
|
||||
this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
|
||||
this.peer.send(connection_id, build_initial_channels_update(
|
||||
this.peer.send(connection_id, build_channels_update(
|
||||
channels_for_user,
|
||||
channel_invites
|
||||
))?;
|
||||
|
@ -2202,31 +2203,21 @@ async fn create_channel(
|
|||
let db = session.db().await;
|
||||
|
||||
let parent_id = request.parent_id.map(|id| ChannelId::from_proto(id));
|
||||
let id = db
|
||||
let CreateChannelResult {
|
||||
channel,
|
||||
participants_to_update,
|
||||
} = db
|
||||
.create_channel(&request.name, parent_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
response.send(proto::CreateChannelResponse {
|
||||
channel: Some(proto::Channel {
|
||||
id: id.to_proto(),
|
||||
name: request.name,
|
||||
visibility: proto::ChannelVisibility::Members as i32,
|
||||
role: proto::ChannelRole::Admin.into(),
|
||||
}),
|
||||
channel: Some(channel.to_proto()),
|
||||
parent_id: request.parent_id,
|
||||
})?;
|
||||
|
||||
let Some(parent_id) = parent_id else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let updates = db
|
||||
.participants_to_notify_for_channel_change(parent_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
for (user_id, channels) in updates {
|
||||
let update = build_initial_channels_update(channels, vec![]);
|
||||
for (user_id, channels) in participants_to_update {
|
||||
let update = build_channels_update(channels, vec![]);
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
if user_id == session.user_id {
|
||||
continue;
|
||||
|
@ -2340,49 +2331,21 @@ async fn set_channel_visibility(
|
|||
let channel_id = ChannelId::from_proto(request.channel_id);
|
||||
let visibility = request.visibility().into();
|
||||
|
||||
let previous_members = db
|
||||
.get_channel_participant_details(channel_id, session.user_id)
|
||||
let SetChannelVisibilityResult {
|
||||
participants_to_update,
|
||||
participants_to_remove,
|
||||
} = db
|
||||
.set_channel_visibility(channel_id, visibility, session.user_id)
|
||||
.await?;
|
||||
|
||||
db.set_channel_visibility(channel_id, visibility, session.user_id)
|
||||
.await?;
|
||||
|
||||
let mut updates: HashMap<UserId, ChannelsForUser> = db
|
||||
.participants_to_notify_for_channel_change(channel_id, session.user_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let mut participants_who_lost_access: HashSet<UserId> = HashSet::default();
|
||||
match visibility {
|
||||
ChannelVisibility::Members => {
|
||||
for member in previous_members {
|
||||
if ChannelRole::from(member.role()).can_only_see_public_descendants() {
|
||||
participants_who_lost_access.insert(UserId::from_proto(member.user_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
ChannelVisibility::Public => {
|
||||
if let Some(public_parent_id) = db.public_parent_channel_id(channel_id).await? {
|
||||
let parent_updates = db
|
||||
.participants_to_notify_for_channel_change(public_parent_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
for (user_id, channels) in parent_updates {
|
||||
updates.insert(user_id, channels);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
for (user_id, channels) in updates {
|
||||
let update = build_initial_channels_update(channels, vec![]);
|
||||
for (user_id, channels) in participants_to_update {
|
||||
let update = build_channels_update(channels, vec![]);
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
for user_id in participants_who_lost_access {
|
||||
for user_id in participants_to_remove {
|
||||
let update = proto::UpdateChannels {
|
||||
delete_channels: vec![channel_id.to_proto()],
|
||||
..Default::default()
|
||||
|
@ -2416,7 +2379,7 @@ async fn set_channel_member_role(
|
|||
let mut update = proto::UpdateChannels::default();
|
||||
if channel_member.accepted {
|
||||
let channels = db.get_channel_for_user(channel_id, member_id).await?;
|
||||
update = build_initial_channels_update(channels, vec![]);
|
||||
update = build_channels_update(channels, vec![]);
|
||||
} else {
|
||||
let channel = db.get_channel(channel_id, session.user_id).await?;
|
||||
update.channel_invitations.push(proto::Channel {
|
||||
|
@ -2446,34 +2409,24 @@ async fn rename_channel(
|
|||
) -> Result<()> {
|
||||
let db = session.db().await;
|
||||
let channel_id = ChannelId::from_proto(request.channel_id);
|
||||
let channel = db
|
||||
let RenameChannelResult {
|
||||
channel,
|
||||
participants_to_update,
|
||||
} = db
|
||||
.rename_channel(channel_id, session.user_id, &request.name)
|
||||
.await?;
|
||||
|
||||
response.send(proto::RenameChannelResponse {
|
||||
channel: Some(proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
name: channel.name.clone(),
|
||||
visibility: channel.visibility.into(),
|
||||
role: proto::ChannelRole::Admin.into(),
|
||||
}),
|
||||
channel: Some(channel.to_proto()),
|
||||
})?;
|
||||
|
||||
let members = db
|
||||
.get_channel_participant_details(channel_id, session.user_id)
|
||||
.await?;
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
for member in members {
|
||||
for connection_id in connection_pool.user_connection_ids(UserId::from_proto(member.user_id))
|
||||
{
|
||||
let mut update = proto::UpdateChannels::default();
|
||||
update.channels.push(proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
name: channel.name.clone(),
|
||||
visibility: channel.visibility.into(),
|
||||
role: member.role.into(),
|
||||
});
|
||||
for (user_id, channel) in participants_to_update {
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
let update = proto::UpdateChannels {
|
||||
channels: vec![channel.to_proto()],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
|
@ -2493,25 +2446,12 @@ async fn link_channel(
|
|||
let channel_id = ChannelId::from_proto(request.channel_id);
|
||||
let to = ChannelId::from_proto(request.to);
|
||||
|
||||
// TODO: Remove this restriction once we have symlinks
|
||||
db.assert_root_channel(channel_id).await?;
|
||||
|
||||
db.link_channel(session.user_id, channel_id, to).await?;
|
||||
|
||||
let member_updates = db
|
||||
.participants_to_notify_for_channel_change(to, session.user_id)
|
||||
let result = db
|
||||
.move_channel(channel_id, None, to, session.user_id)
|
||||
.await?;
|
||||
drop(db);
|
||||
|
||||
dbg!(&member_updates);
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
|
||||
for (member_id, channels) in member_updates {
|
||||
let update = build_initial_channels_update(channels, vec![]);
|
||||
for connection_id in connection_pool.user_connection_ids(member_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
notify_channel_moved(result, session).await?;
|
||||
|
||||
response.send(Ack {})?;
|
||||
|
||||
|
@ -2537,64 +2477,46 @@ async fn move_channel(
|
|||
let from_parent = ChannelId::from_proto(request.from);
|
||||
let to = ChannelId::from_proto(request.to);
|
||||
|
||||
let previous_participants = db
|
||||
.get_channel_participant_details(channel_id, session.user_id)
|
||||
let result = db
|
||||
.move_channel(channel_id, Some(from_parent), to, session.user_id)
|
||||
.await?;
|
||||
drop(db);
|
||||
|
||||
debug_assert_eq!(db.parent_channel_id(channel_id).await?, Some(from_parent));
|
||||
notify_channel_moved(result, session).await?;
|
||||
|
||||
let channels_to_send = db
|
||||
.move_channel(session.user_id, channel_id, from_parent, to)
|
||||
.await?;
|
||||
response.send(Ack {})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if channels_to_send.is_empty() {
|
||||
response.send(Ack {})?;
|
||||
async fn notify_channel_moved(result: Option<MoveChannelResult>, session: Session) -> Result<()> {
|
||||
let Some(MoveChannelResult {
|
||||
participants_to_remove,
|
||||
participants_to_update,
|
||||
moved_channels,
|
||||
}) = result
|
||||
else {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let updates = db
|
||||
.participants_to_notify_for_channel_change(to, session.user_id)
|
||||
.await?;
|
||||
|
||||
let mut participants_who_lost_access: HashSet<UserId> = HashSet::default();
|
||||
let mut channels_to_delete = db.get_channel_descendant_ids(channel_id).await?;
|
||||
channels_to_delete.insert(channel_id);
|
||||
|
||||
for previous_participant in previous_participants.iter() {
|
||||
let user_id = UserId::from_proto(previous_participant.user_id);
|
||||
if previous_participant.kind() == proto::channel_member::Kind::AncestorMember {
|
||||
participants_who_lost_access.insert(user_id);
|
||||
}
|
||||
}
|
||||
};
|
||||
let moved_channels: Vec<u64> = moved_channels.iter().map(|id| id.to_proto()).collect();
|
||||
|
||||
let connection_pool = session.connection_pool().await;
|
||||
for (user_id, channels) in updates {
|
||||
let mut update = build_initial_channels_update(channels, vec![]);
|
||||
update.delete_channels = channels_to_delete
|
||||
.iter()
|
||||
.map(|channel_id| channel_id.to_proto())
|
||||
.collect();
|
||||
participants_who_lost_access.remove(&user_id);
|
||||
for (user_id, channels) in participants_to_update {
|
||||
let mut update = build_channels_update(channels, vec![]);
|
||||
update.delete_channels = moved_channels.clone();
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
for user_id in participants_who_lost_access {
|
||||
for user_id in participants_to_remove {
|
||||
let update = proto::UpdateChannels {
|
||||
delete_channels: channels_to_delete
|
||||
.iter()
|
||||
.map(|channel_id| channel_id.to_proto())
|
||||
.collect(),
|
||||
delete_channels: moved_channels.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
for connection_id in connection_pool.user_connection_ids(user_id) {
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
response.send(Ack {})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -2641,38 +2563,12 @@ async fn channel_membership_updated(
|
|||
channel_id: ChannelId,
|
||||
session: &Session,
|
||||
) -> Result<(), crate::Error> {
|
||||
let mut update = proto::UpdateChannels::default();
|
||||
let result = db.get_channel_for_user(channel_id, session.user_id).await?;
|
||||
let mut update = build_channels_update(result, vec![]);
|
||||
update
|
||||
.remove_channel_invitations
|
||||
.push(channel_id.to_proto());
|
||||
|
||||
let result = db.get_channel_for_user(channel_id, session.user_id).await?;
|
||||
update.channels.extend(
|
||||
result
|
||||
.channels
|
||||
.channels
|
||||
.into_iter()
|
||||
.map(|channel| proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
visibility: channel.visibility.into(),
|
||||
role: channel.role.into(),
|
||||
name: channel.name,
|
||||
}),
|
||||
);
|
||||
update.unseen_channel_messages = result.channel_messages;
|
||||
update.unseen_channel_buffer_changes = result.unseen_buffer_changes;
|
||||
update.insert_edge = result.channels.edges;
|
||||
update
|
||||
.channel_participants
|
||||
.extend(
|
||||
result
|
||||
.channel_participants
|
||||
.into_iter()
|
||||
.map(|(channel_id, user_ids)| proto::ChannelParticipants {
|
||||
channel_id: channel_id.to_proto(),
|
||||
participant_user_ids: user_ids.into_iter().map(UserId::to_proto).collect(),
|
||||
}),
|
||||
);
|
||||
session.peer.send(session.connection_id, update)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -3155,7 +3051,7 @@ fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
|
|||
}
|
||||
}
|
||||
|
||||
fn build_initial_channels_update(
|
||||
fn build_channels_update(
|
||||
channels: ChannelsForUser,
|
||||
channel_invites: Vec<db::Channel>,
|
||||
) -> proto::UpdateChannels {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue