Identify LiveKit room participants by user id, not peer id

This way, their participant id can remain the same when they reconnect.
This commit is contained in:
Max Brunsfeld 2022-12-15 17:19:32 -08:00
parent c2f5381e5a
commit ad37034960
9 changed files with 55 additions and 42 deletions

View file

@ -39,6 +39,7 @@ pub struct LocalParticipant {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RemoteParticipant { pub struct RemoteParticipant {
pub user: Arc<User>, pub user: Arc<User>,
pub peer_id: proto::PeerId,
pub projects: Vec<proto::ParticipantProject>, pub projects: Vec<proto::ParticipantProject>,
pub location: ParticipantLocation, pub location: ParticipantLocation,
pub tracks: HashMap<live_kit_client::Sid, Arc<RemoteVideoTrack>>, pub tracks: HashMap<live_kit_client::Sid, Arc<RemoteVideoTrack>>,

View file

@ -3,7 +3,10 @@ use crate::{
IncomingCall, IncomingCall,
}; };
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use client::{proto, Client, TypedEnvelope, User, UserStore}; use client::{
proto::{self, PeerId},
Client, TypedEnvelope, User, UserStore,
};
use collections::{BTreeMap, HashSet}; use collections::{BTreeMap, HashSet};
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use gpui::{ use gpui::{
@ -41,7 +44,7 @@ pub struct Room {
live_kit: Option<LiveKitRoom>, live_kit: Option<LiveKitRoom>,
status: RoomStatus, status: RoomStatus,
local_participant: LocalParticipant, local_participant: LocalParticipant,
remote_participants: BTreeMap<proto::PeerId, RemoteParticipant>, remote_participants: BTreeMap<u64, RemoteParticipant>,
pending_participants: Vec<Arc<User>>, pending_participants: Vec<Arc<User>>,
participant_user_ids: HashSet<u64>, participant_user_ids: HashSet<u64>,
pending_call_count: usize, pending_call_count: usize,
@ -349,10 +352,16 @@ impl Room {
&self.local_participant &self.local_participant
} }
pub fn remote_participants(&self) -> &BTreeMap<proto::PeerId, RemoteParticipant> { pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
&self.remote_participants &self.remote_participants
} }
pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
self.remote_participants
.values()
.find(|p| p.peer_id == peer_id)
}
pub fn pending_participants(&self) -> &[Arc<User>] { pub fn pending_participants(&self) -> &[Arc<User>] {
&self.pending_participants &self.pending_participants
} }
@ -421,11 +430,11 @@ impl Room {
for (participant, user) in room.participants.into_iter().zip(participants) { for (participant, user) in room.participants.into_iter().zip(participants) {
let Some(peer_id) = participant.peer_id else { continue }; let Some(peer_id) = participant.peer_id else { continue };
this.participant_user_ids.insert(participant.user_id); this.participant_user_ids.insert(participant.user_id);
participant_peer_ids.insert(peer_id); participant_peer_ids.insert(participant.user_id);
let old_projects = this let old_projects = this
.remote_participants .remote_participants
.get(&peer_id) .get(&participant.user_id)
.into_iter() .into_iter()
.flat_map(|existing| &existing.projects) .flat_map(|existing| &existing.projects)
.map(|project| project.id) .map(|project| project.id)
@ -454,7 +463,8 @@ impl Room {
let location = ParticipantLocation::from_proto(participant.location) let location = ParticipantLocation::from_proto(participant.location)
.unwrap_or(ParticipantLocation::External); .unwrap_or(ParticipantLocation::External);
if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id) if let Some(remote_participant) =
this.remote_participants.get_mut(&participant.user_id)
{ {
remote_participant.projects = participant.projects; remote_participant.projects = participant.projects;
if location != remote_participant.location { if location != remote_participant.location {
@ -465,9 +475,10 @@ impl Room {
} }
} else { } else {
this.remote_participants.insert( this.remote_participants.insert(
peer_id, participant.user_id,
RemoteParticipant { RemoteParticipant {
user: user.clone(), user: user.clone(),
peer_id,
projects: participant.projects, projects: participant.projects,
location, location,
tracks: Default::default(), tracks: Default::default(),
@ -488,8 +499,8 @@ impl Room {
} }
} }
this.remote_participants.retain(|peer_id, participant| { this.remote_participants.retain(|user_id, participant| {
if participant_peer_ids.contains(peer_id) { if participant_peer_ids.contains(user_id) {
true true
} else { } else {
for project in &participant.projects { for project in &participant.projects {
@ -531,11 +542,11 @@ impl Room {
) -> Result<()> { ) -> Result<()> {
match change { match change {
RemoteVideoTrackUpdate::Subscribed(track) => { RemoteVideoTrackUpdate::Subscribed(track) => {
let peer_id = track.publisher_id().parse()?; let user_id = track.publisher_id().parse()?;
let track_id = track.sid().to_string(); let track_id = track.sid().to_string();
let participant = self let participant = self
.remote_participants .remote_participants
.get_mut(&peer_id) .get_mut(&user_id)
.ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?; .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
participant.tracks.insert( participant.tracks.insert(
track_id.clone(), track_id.clone(),
@ -544,21 +555,21 @@ impl Room {
}), }),
); );
cx.emit(Event::RemoteVideoTracksChanged { cx.emit(Event::RemoteVideoTracksChanged {
participant_id: peer_id, participant_id: participant.peer_id,
}); });
} }
RemoteVideoTrackUpdate::Unsubscribed { RemoteVideoTrackUpdate::Unsubscribed {
publisher_id, publisher_id,
track_id, track_id,
} => { } => {
let peer_id = publisher_id.parse()?; let user_id = publisher_id.parse()?;
let participant = self let participant = self
.remote_participants .remote_participants
.get_mut(&peer_id) .get_mut(&user_id)
.ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?; .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
participant.tracks.remove(&track_id); participant.tracks.remove(&track_id);
cx.emit(Event::RemoteVideoTracksChanged { cx.emit(Event::RemoteVideoTracksChanged {
participant_id: peer_id, participant_id: participant.peer_id,
}); });
} }
} }

View file

@ -199,7 +199,7 @@ async fn test_basic_calls(
assert_eq!(participant_id, client_a.peer_id().unwrap()); assert_eq!(participant_id, client_a.peer_id().unwrap());
room_b.read_with(cx_b, |room, _| { room_b.read_with(cx_b, |room, _| {
assert_eq!( assert_eq!(
room.remote_participants()[&client_a.peer_id().unwrap()] room.remote_participants()[&client_a.user_id().unwrap()]
.tracks .tracks
.len(), .len(),
1 1

View file

@ -850,7 +850,7 @@ async fn create_room(
.trace_err() .trace_err()
{ {
if let Some(token) = live_kit if let Some(token) = live_kit
.room_token(&live_kit_room, &session.connection_id.to_string()) .room_token(&live_kit_room, &session.user_id.to_string())
.trace_err() .trace_err()
{ {
Some(proto::LiveKitConnectionInfo { Some(proto::LiveKitConnectionInfo {
@ -918,7 +918,7 @@ async fn join_room(
let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() { let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
if let Some(token) = live_kit if let Some(token) = live_kit
.room_token(&room.live_kit_room, &session.connection_id.to_string()) .room_token(&room.live_kit_room, &session.user_id.to_string())
.trace_err() .trace_err()
{ {
Some(proto::LiveKitConnectionInfo { Some(proto::LiveKitConnectionInfo {
@ -2066,7 +2066,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
if let Some(live_kit) = session.live_kit_client.as_ref() { if let Some(live_kit) = session.live_kit_client.as_ref() {
live_kit live_kit
.remove_participant(live_kit_room.clone(), session.connection_id.to_string()) .remove_participant(live_kit_room.clone(), session.user_id.to_string())
.await .await
.trace_err(); .trace_err();

View file

@ -342,24 +342,27 @@ impl CollabTitlebarItem {
let mut participants = room let mut participants = room
.read(cx) .read(cx)
.remote_participants() .remote_participants()
.iter() .values()
.map(|(peer_id, collaborator)| (*peer_id, collaborator.clone())) .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
participants participants.sort_by_key(|p| Some(project.collaborators().get(&p.peer_id)?.replica_id));
.sort_by_key(|(peer_id, _)| Some(project.collaborators().get(peer_id)?.replica_id));
participants participants
.into_iter() .into_iter()
.filter_map(|(peer_id, participant)| { .filter_map(|participant| {
let project = workspace.read(cx).project().read(cx); let project = workspace.read(cx).project().read(cx);
let replica_id = project let replica_id = project
.collaborators() .collaborators()
.get(&peer_id) .get(&participant.peer_id)
.map(|collaborator| collaborator.replica_id); .map(|collaborator| collaborator.replica_id);
let user = participant.user.clone(); let user = participant.user.clone();
Some(self.render_avatar( Some(self.render_avatar(
&user, &user,
replica_id, replica_id,
Some((peer_id, &user.github_login, participant.location)), Some((
participant.peer_id,
&user.github_login,
participant.location,
)),
workspace, workspace,
theme, theme,
cx, cx,

View file

@ -73,7 +73,7 @@ pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
.remote_participants() .remote_participants()
.iter() .iter()
.find(|(_, participant)| participant.user.id == follow_user_id) .find(|(_, participant)| participant.user.id == follow_user_id)
.map(|(peer_id, _)| *peer_id) .map(|(_, p)| p.peer_id)
.or_else(|| { .or_else(|| {
// If we couldn't follow the given user, follow the host instead. // If we couldn't follow the given user, follow the host instead.
let collaborator = workspace let collaborator = workspace

View file

@ -461,15 +461,13 @@ impl ContactList {
// Populate remote participants. // Populate remote participants.
self.match_candidates.clear(); self.match_candidates.clear();
self.match_candidates self.match_candidates
.extend( .extend(room.remote_participants().iter().map(|(_, participant)| {
room.remote_participants() StringMatchCandidate {
.iter() id: participant.user.id as usize,
.map(|(peer_id, participant)| StringMatchCandidate { string: participant.user.github_login.clone(),
id: peer_id.as_u64() as usize, char_bag: participant.user.github_login.chars().collect(),
string: participant.user.github_login.clone(), }
char_bag: participant.user.github_login.chars().collect(), }));
}),
);
let matches = executor.block(match_strings( let matches = executor.block(match_strings(
&self.match_candidates, &self.match_candidates,
&query, &query,
@ -479,8 +477,8 @@ impl ContactList {
executor.clone(), executor.clone(),
)); ));
for mat in matches { for mat in matches {
let peer_id = PeerId::from_u64(mat.candidate_id as u64); let user_id = mat.candidate_id as u64;
let participant = &room.remote_participants()[&peer_id]; let participant = &room.remote_participants()[&user_id];
participant_entries.push(ContactEntry::CallParticipant { participant_entries.push(ContactEntry::CallParticipant {
user: participant.user.clone(), user: participant.user.clone(),
is_pending: false, is_pending: false,
@ -496,7 +494,7 @@ impl ContactList {
} }
if !participant.tracks.is_empty() { if !participant.tracks.is_empty() {
participant_entries.push(ContactEntry::ParticipantScreen { participant_entries.push(ContactEntry::ParticipantScreen {
peer_id, peer_id: participant.peer_id,
is_last: true, is_last: true,
}); });
} }

View file

@ -148,7 +148,7 @@ impl Member {
.and_then(|leader_id| { .and_then(|leader_id| {
let room = active_call?.read(cx).room()?.read(cx); let room = active_call?.read(cx).room()?.read(cx);
let collaborator = project.read(cx).collaborators().get(leader_id)?; let collaborator = project.read(cx).collaborators().get(leader_id)?;
let participant = room.remote_participants().get(&leader_id)?; let participant = room.remote_participant_for_peer_id(*leader_id)?;
Some((collaborator.replica_id, participant)) Some((collaborator.replica_id, participant))
}); });

View file

@ -2141,7 +2141,7 @@ impl Workspace {
let call = self.active_call()?; let call = self.active_call()?;
let room = call.read(cx).room()?.read(cx); let room = call.read(cx).room()?.read(cx);
let participant = room.remote_participants().get(&leader_id)?; let participant = room.remote_participant_for_peer_id(leader_id)?;
let mut items_to_add = Vec::new(); let mut items_to_add = Vec::new();
match participant.location { match participant.location {
@ -2190,7 +2190,7 @@ impl Workspace {
) -> Option<ViewHandle<SharedScreen>> { ) -> Option<ViewHandle<SharedScreen>> {
let call = self.active_call()?; let call = self.active_call()?;
let room = call.read(cx).room()?.read(cx); let room = call.read(cx).room()?.read(cx);
let participant = room.remote_participants().get(&peer_id)?; let participant = room.remote_participant_for_peer_id(peer_id)?;
let track = participant.tracks.values().next()?.clone(); let track = participant.tracks.values().next()?.clone();
let user = participant.user.clone(); let user = participant.user.clone();