This commit is contained in:
Antonio Scandurra 2022-10-17 18:00:54 +02:00
parent 81d83841ab
commit 499b8f5f55
13 changed files with 245 additions and 263 deletions

View file

@ -19,8 +19,9 @@ test-support = [
[dependencies]
client = { path = "../client" }
collections = { path = "../collections" }
live_kit_client = { path = "../live_kit_client" }
gpui = { path = "../gpui" }
live_kit_client = { path = "../live_kit_client" }
media = { path = "../media" }
project = { path = "../project" }
util = { path = "../util" }

View file

@ -132,6 +132,8 @@ impl ActiveCall {
Room::create(recipient_user_id, initial_project, client, user_store, cx)
})
.await?;
room.update(&mut cx, |room, cx| room.share_screen(cx))
.await?;
this.update(&mut cx, |this, cx| this.set_room(Some(room), cx));
};

View file

@ -1,6 +1,8 @@
use anyhow::{anyhow, Result};
use client::{proto, User};
use gpui::WeakModelHandle;
use collections::HashMap;
use gpui::{Task, WeakModelHandle};
use media::core_video::CVImageBuffer;
use project::Project;
use std::sync::Arc;
@ -34,9 +36,23 @@ pub struct LocalParticipant {
pub active_project: Option<WeakModelHandle<Project>>,
}
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct RemoteParticipant {
pub user: Arc<User>,
pub projects: Vec<proto::ParticipantProject>,
pub location: ParticipantLocation,
pub tracks: HashMap<String, RemoteVideoTrack>,
}
#[derive(Clone)]
pub struct RemoteVideoTrack {
pub(crate) frame: Option<CVImageBuffer>,
pub(crate) _live_kit_track: Arc<live_kit_client::RemoteVideoTrack>,
pub(crate) _maintain_frame: Arc<Task<()>>,
}
impl RemoteVideoTrack {
pub fn frame(&self) -> Option<&CVImageBuffer> {
self.frame.as_ref()
}
}

View file

@ -1,5 +1,5 @@
use crate::{
participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
IncomingCall,
};
use anyhow::{anyhow, Result};
@ -7,7 +7,8 @@ use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
use collections::{BTreeMap, HashSet};
use futures::StreamExt;
use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
use live_kit_client::LocalVideoTrack;
use live_kit_client::{LocalVideoTrack, RemoteVideoTrackChange};
use postage::watch;
use project::Project;
use std::sync::Arc;
use util::ResultExt;
@ -27,7 +28,7 @@ pub enum Event {
pub struct Room {
id: u64,
live_kit_room: Option<Arc<live_kit_client::Room>>,
live_kit_room: Option<(Arc<live_kit_client::Room>, Task<()>)>,
status: RoomStatus,
local_participant: LocalParticipant,
remote_participants: BTreeMap<PeerId, RemoteParticipant>,
@ -75,17 +76,23 @@ impl Room {
let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
let room = live_kit_client::Room::new();
let mut tracks = room.remote_video_tracks();
cx.foreground()
.spawn(async move {
while let Some(track) = tracks.next().await {
dbg!("received track");
}
})
.detach();
let maintain_room = cx.spawn_weak(|this, mut cx| async move {
while let Some(track_change) = tracks.next().await {
let this = if let Some(this) = this.upgrade(&cx) {
this
} else {
break;
};
this.update(&mut cx, |this, cx| {
this.remote_video_track_changed(track_change, cx).log_err()
});
}
});
cx.foreground()
.spawn(room.connect(&connection_info.server_url, &connection_info.token))
.detach_and_log_err(cx);
Some(room)
Some((room, maintain_room))
} else {
None
};
@ -318,8 +325,20 @@ impl Room {
projects: participant.projects,
location: ParticipantLocation::from_proto(participant.location)
.unwrap_or(ParticipantLocation::External),
tracks: Default::default(),
},
);
if let Some((room, _)) = this.live_kit_room.as_ref() {
for track in
room.video_tracks_for_remote_participant(peer_id.0.to_string())
{
this.remote_video_track_changed(
RemoteVideoTrackChange::Subscribed(track),
cx,
);
}
}
}
this.remote_participants.retain(|_, participant| {
@ -357,6 +376,74 @@ impl Room {
Ok(())
}
fn remote_video_track_changed(
&mut self,
change: RemoteVideoTrackChange,
cx: &mut ModelContext<Self>,
) -> Result<()> {
match change {
RemoteVideoTrackChange::Subscribed(track) => {
let peer_id = PeerId(track.publisher_id().parse()?);
let track_id = track.id().to_string();
let participant = self
.remote_participants
.get_mut(&peer_id)
.ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
let (mut tx, mut rx) = watch::channel();
track.add_renderer(move |frame| *tx.borrow_mut() = Some(frame));
participant.tracks.insert(
track_id.clone(),
RemoteVideoTrack {
frame: None,
_live_kit_track: track,
_maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
while let Some(frame) = rx.next().await {
let this = if let Some(this) = this.upgrade(&cx) {
this
} else {
break;
};
let done = this.update(&mut cx, |this, cx| {
// TODO: replace this with an emit.
cx.notify();
if let Some(track) =
this.remote_participants.get_mut(&peer_id).and_then(
|participant| participant.tracks.get_mut(&track_id),
)
{
track.frame = frame;
false
} else {
true
}
});
if done {
break;
}
}
})),
},
);
}
RemoteVideoTrackChange::Unsubscribed {
publisher_id,
track_id,
} => {
let peer_id = PeerId(publisher_id.parse()?);
let participant = self
.remote_participants
.get_mut(&peer_id)
.ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
participant.tracks.remove(&track_id);
}
}
cx.notify();
Ok(())
}
fn check_invariants(&self) {
#[cfg(any(test, feature = "test-support"))]
{
@ -502,7 +589,7 @@ impl Room {
return Task::ready(Err(anyhow!("room is offline")));
}
let room = if let Some(room) = self.live_kit_room.as_ref() {
let room = if let Some((room, _)) = self.live_kit_room.as_ref() {
room.clone()
} else {
return Task::ready(Err(anyhow!("not connected to LiveKit")));