This commit is contained in:
Max Brunsfeld 2023-08-02 12:15:06 -07:00
parent 61a6892b8c
commit a9de73739a
8 changed files with 289 additions and 100 deletions

View file

@ -2,7 +2,7 @@ mod connection_pool;
use crate::{
auth,
db::{self, ChannelId, ChannelRoom, Database, ProjectId, RoomId, ServerId, User, UserId},
db::{self, ChannelId, Database, ProjectId, RoomId, ServerId, User, UserId},
executor::Executor,
AppState, Result,
};
@ -296,6 +296,15 @@ impl Server {
"refreshed room"
);
room_updated(&refreshed_room.room, &peer);
if let Some(channel_id) = refreshed_room.channel_id {
channel_updated(
channel_id,
&refreshed_room.room,
&refreshed_room.channel_members,
&peer,
&*pool.lock(),
);
}
contacts_to_update
.extend(refreshed_room.stale_participant_user_ids.iter().copied());
contacts_to_update
@ -517,7 +526,7 @@ impl Server {
this.app_state.db.set_user_connected_once(user_id, true).await?;
}
let (contacts, invite_code, channels, channel_invites) = future::try_join4(
let (contacts, invite_code, (channels, channel_participants), channel_invites) = future::try_join4(
this.app_state.db.get_contacts(user_id),
this.app_state.db.get_invite_code_for_user(user_id),
this.app_state.db.get_channels(user_id),
@ -528,7 +537,7 @@ impl Server {
let mut pool = this.connection_pool.lock();
pool.add_connection(connection_id, user_id, user.admin);
this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
this.peer.send(connection_id, build_initial_channels_update(channels, channel_invites))?;
this.peer.send(connection_id, build_initial_channels_update(channels, channel_participants, channel_invites))?;
if let Some((code, count)) = invite_code {
this.peer.send(connection_id, proto::UpdateInviteInfo {
@ -921,8 +930,8 @@ async fn join_room(
.await
.join_room(room_id, session.user_id, None, session.connection_id)
.await?;
room_updated(&room, &session.peer);
room.clone()
room_updated(&room.room, &session.peer);
room.room.clone()
};
for connection_id in session
@ -971,6 +980,9 @@ async fn rejoin_room(
response: Response<proto::RejoinRoom>,
session: Session,
) -> Result<()> {
let room;
let channel_id;
let channel_members;
{
let mut rejoined_room = session
.db()
@ -1132,6 +1144,21 @@ async fn rejoin_room(
)?;
}
}
room = mem::take(&mut rejoined_room.room);
channel_id = rejoined_room.channel_id;
channel_members = mem::take(&mut rejoined_room.channel_members);
}
//TODO: move this into the room guard
if let Some(channel_id) = channel_id {
channel_updated(
channel_id,
&room,
&channel_members,
&session.peer,
&*session.connection_pool().await,
);
}
update_user_contacts(session.user_id, &session).await?;
@ -2202,9 +2229,9 @@ async fn invite_channel_member(
}
async fn remove_channel_member(
request: proto::RemoveChannelMember,
response: Response<proto::RemoveChannelMember>,
session: Session,
_request: proto::RemoveChannelMember,
_response: Response<proto::RemoveChannelMember>,
_session: Session,
) -> Result<()> {
Ok(())
}
@ -2247,11 +2274,11 @@ async fn join_channel(
) -> Result<()> {
let channel_id = ChannelId::from_proto(request.channel_id);
{
let joined_room = {
let db = session.db().await;
let room_id = db.get_channel_room(channel_id).await?;
let room_id = db.room_id_for_channel(channel_id).await?;
let room = db
let joined_room = db
.join_room(
room_id,
session.user_id,
@ -2262,7 +2289,10 @@ async fn join_channel(
let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
let token = live_kit
.room_token(&room.live_kit_room, &session.user_id.to_string())
.room_token(
&joined_room.room.live_kit_room,
&session.user_id.to_string(),
)
.trace_err()?;
Some(LiveKitConnectionInfo {
@ -2272,12 +2302,25 @@ async fn join_channel(
});
response.send(proto::JoinRoomResponse {
room: Some(room.clone()),
room: Some(joined_room.room.clone()),
live_kit_connection_info,
})?;
room_updated(&room, &session.peer);
}
room_updated(&joined_room.room, &session.peer);
joined_room.clone()
};
// TODO - do this while still holding the room guard,
// currently there's a possible race condition if someone joins the channel
// after we've dropped the lock but before we finish sending these updates
channel_updated(
channel_id,
&joined_room.room,
&joined_room.channel_members,
&session.peer,
&*session.connection_pool().await,
);
update_user_contacts(session.user_id, &session).await?;
@ -2356,6 +2399,7 @@ fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
fn build_initial_channels_update(
channels: Vec<db::Channel>,
channel_participants: HashMap<db::ChannelId, Vec<UserId>>,
channel_invites: Vec<db::Channel>,
) -> proto::UpdateChannels {
let mut update = proto::UpdateChannels::default();
@ -2426,10 +2470,7 @@ fn contact_for_user(
}
}
fn room_updated(room: &ChannelRoom, peer: &Peer, pool: &ConnectionPool) {
let channel_ids = &room.channel_participants;
let room = &room.room;
fn room_updated(room: &proto::Room, peer: &Peer) {
broadcast(
None,
room.participants
@ -2444,17 +2485,41 @@ fn room_updated(room: &ChannelRoom, peer: &Peer, pool: &ConnectionPool) {
)
},
);
}
fn channel_updated(
channel_id: ChannelId,
room: &proto::Room,
channel_members: &[UserId],
peer: &Peer,
pool: &ConnectionPool,
) {
let participants = room
.participants
.iter()
.map(|p| p.user_id)
.collect::<Vec<_>>();
broadcast(
None,
channel_ids
channel_members
.iter()
.filter(|user_id| {
!room
.participants
.iter()
.any(|p| p.user_id == user_id.to_proto())
})
.flat_map(|user_id| pool.user_connection_ids(*user_id)),
|peer_id| {
peer.send(
peer_id.into(),
proto::RoomUpdated {
room: Some(room.clone()),
proto::UpdateChannels {
channel_participants: vec![proto::ChannelParticipants {
channel_id: channel_id.to_proto(),
participant_user_ids: participants.clone(),
}],
..Default::default()
},
)
},
@ -2502,6 +2567,10 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
let canceled_calls_to_user_ids;
let live_kit_room;
let delete_live_kit_room;
let room;
let channel_members;
let channel_id;
if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? {
contacts_to_update.insert(session.user_id);
@ -2509,19 +2578,30 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
project_left(project, session);
}
{
let connection_pool = session.connection_pool().await;
room_updated(&left_room.room, &session.peer, &connection_pool);
}
room_id = RoomId::from_proto(left_room.room.id);
canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
live_kit_room = mem::take(&mut left_room.room.live_kit_room);
delete_live_kit_room = left_room.deleted;
room = mem::take(&mut left_room.room);
channel_members = mem::take(&mut left_room.channel_members);
channel_id = left_room.channel_id;
room_updated(&room, &session.peer);
} else {
return Ok(());
}
// TODO - do this while holding the room guard.
if let Some(channel_id) = channel_id {
channel_updated(
channel_id,
&room,
&channel_members,
&session.peer,
&*session.connection_pool().await,
);
}
{
let pool = session.connection_pool().await;
for canceled_user_id in canceled_calls_to_user_ids {