Merge pull request #1793 from zed-industries/screen-sharing

Introduce screen-sharing
This commit is contained in:
Antonio Scandurra 2022-10-24 16:53:05 +01:00 committed by GitHub
commit fb7a92242b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
61 changed files with 3028 additions and 897 deletions

View file

@ -12,6 +12,7 @@ test-support = [
"client/test-support",
"collections/test-support",
"gpui/test-support",
"live_kit_client/test-support",
"project/test-support",
"util/test-support"
]
@ -20,10 +21,13 @@ test-support = [
client = { path = "../client" }
collections = { path = "../collections" }
gpui = { path = "../gpui" }
live_kit_client = { path = "../live_kit_client" }
media = { path = "../media" }
project = { path = "../project" }
util = { path = "../util" }
anyhow = "1.0.38"
async-broadcast = "0.4"
futures = "0.3"
postage = { version = "0.4.1", features = ["futures-traits"] }
@ -31,5 +35,6 @@ postage = { version = "0.4.1", features = ["futures-traits"] }
client = { path = "../client", features = ["test-support"] }
collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }
live_kit_client = { path = "../live_kit_client", features = ["test-support"] }
project = { path = "../project", features = ["test-support"] }
util = { path = "../util", features = ["test-support"] }

View file

@ -1,11 +1,11 @@
mod participant;
pub mod participant;
pub mod room;
use anyhow::{anyhow, Result};
use client::{proto, Client, TypedEnvelope, User, UserStore};
use gpui::{
AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
Subscription, Task,
Subscription, Task, WeakModelHandle,
};
pub use participant::ParticipantLocation;
use postage::watch;
@ -27,6 +27,7 @@ pub struct IncomingCall {
}
pub struct ActiveCall {
location: Option<WeakModelHandle<Project>>,
room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
incoming_call: (
watch::Sender<Option<IncomingCall>>,
@ -49,6 +50,7 @@ impl ActiveCall {
) -> Self {
Self {
room: None,
location: None,
incoming_call: watch::channel(),
_subscriptions: vec![
client.add_request_handler(cx.handle(), Self::handle_incoming_call),
@ -132,7 +134,9 @@ impl ActiveCall {
Room::create(recipient_user_id, initial_project, client, user_store, cx)
})
.await?;
this.update(&mut cx, |this, cx| this.set_room(Some(room), cx));
this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
.await?;
};
Ok(())
@ -180,7 +184,8 @@ impl ActiveCall {
let join = Room::join(&call, self.client.clone(), self.user_store.clone(), cx);
cx.spawn(|this, mut cx| async move {
let room = join.await?;
this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx));
this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
.await?;
Ok(())
})
}
@ -223,35 +228,46 @@ impl ActiveCall {
project: Option<&ModelHandle<Project>>,
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
self.location = project.map(|project| project.downgrade());
if let Some((room, _)) = self.room.as_ref() {
room.update(cx, |room, cx| room.set_location(project, cx))
} else {
Task::ready(Err(anyhow!("no active call")))
Task::ready(Ok(()))
}
}
fn set_room(&mut self, room: Option<ModelHandle<Room>>, cx: &mut ModelContext<Self>) {
fn set_room(
&mut self,
room: Option<ModelHandle<Room>>,
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
if room.as_ref() != self.room.as_ref().map(|room| &room.0) {
cx.notify();
if let Some(room) = room {
if room.read(cx).status().is_offline() {
self.room = None;
Task::ready(Ok(()))
} else {
let subscriptions = vec![
cx.observe(&room, |this, room, cx| {
if room.read(cx).status().is_offline() {
this.set_room(None, cx);
this.set_room(None, cx).detach_and_log_err(cx);
}
cx.notify();
}),
cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
];
self.room = Some((room, subscriptions));
self.room = Some((room.clone(), subscriptions));
let location = self.location.and_then(|location| location.upgrade(cx));
room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
}
} else {
self.room = None;
Task::ready(Ok(()))
}
cx.notify();
} else {
Task::ready(Ok(()))
}
}

View file

@ -1,6 +1,8 @@
use anyhow::{anyhow, Result};
use client::{proto, User};
use collections::HashMap;
use gpui::WeakModelHandle;
pub use live_kit_client::Frame;
use project::Project;
use std::sync::Arc;
@ -34,9 +36,21 @@ pub struct LocalParticipant {
pub active_project: Option<WeakModelHandle<Project>>,
}
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct RemoteParticipant {
pub user: Arc<User>,
pub projects: Vec<proto::ParticipantProject>,
pub location: ParticipantLocation,
pub tracks: HashMap<live_kit_client::Sid, Arc<RemoteVideoTrack>>,
}
#[derive(Clone)]
pub struct RemoteVideoTrack {
pub(crate) live_kit_track: Arc<live_kit_client::RemoteVideoTrack>,
}
impl RemoteVideoTrack {
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
self.live_kit_track.frames()
}
}

View file

@ -1,5 +1,5 @@
use crate::{
participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
IncomingCall,
};
use anyhow::{anyhow, Result};
@ -7,12 +7,20 @@ use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
use collections::{BTreeMap, HashSet};
use futures::StreamExt;
use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
use postage::stream::Stream;
use project::Project;
use std::{os::unix::prelude::OsStrExt, sync::Arc};
use util::ResultExt;
use std::{mem, os::unix::prelude::OsStrExt, sync::Arc};
use util::{post_inc, ResultExt};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Event {
ParticipantLocationChanged {
participant_id: PeerId,
},
RemoteVideoTracksChanged {
participant_id: PeerId,
},
RemoteProjectShared {
owner: Arc<User>,
project_id: u64,
@ -26,6 +34,7 @@ pub enum Event {
pub struct Room {
id: u64,
live_kit: Option<LiveKitRoom>,
status: RoomStatus,
local_participant: LocalParticipant,
remote_participants: BTreeMap<PeerId, RemoteParticipant>,
@ -43,13 +52,16 @@ impl Entity for Room {
type Event = Event;
fn release(&mut self, _: &mut MutableAppContext) {
self.client.send(proto::LeaveRoom { id: self.id }).log_err();
if self.status.is_online() {
self.client.send(proto::LeaveRoom { id: self.id }).log_err();
}
}
}
impl Room {
fn new(
id: u64,
live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
cx: &mut ModelContext<Self>,
@ -69,8 +81,59 @@ impl Room {
})
.detach();
let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
let room = live_kit_client::Room::new();
let mut status = room.status();
// Consume the initial status of the room.
let _ = status.try_recv();
let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
while let Some(status) = status.next().await {
let this = if let Some(this) = this.upgrade(&cx) {
this
} else {
break;
};
if status == live_kit_client::ConnectionState::Disconnected {
this.update(&mut cx, |this, cx| this.leave(cx).log_err());
break;
}
}
});
let mut track_changes = room.remote_video_track_updates();
let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
while let Some(track_change) = track_changes.next().await {
let this = if let Some(this) = this.upgrade(&cx) {
this
} else {
break;
};
this.update(&mut cx, |this, cx| {
this.remote_video_track_updated(track_change, cx).log_err()
});
}
});
cx.foreground()
.spawn(room.connect(&connection_info.server_url, &connection_info.token))
.detach_and_log_err(cx);
Some(LiveKitRoom {
room,
screen_track: ScreenTrack::None,
next_publish_id: 0,
_maintain_room,
_maintain_tracks,
})
} else {
None
};
Self {
id,
live_kit: live_kit_room,
status: RoomStatus::Online,
participant_user_ids: Default::default(),
local_participant: Default::default(),
@ -94,7 +157,16 @@ impl Room {
) -> Task<Result<ModelHandle<Self>>> {
cx.spawn(|mut cx| async move {
let response = client.request(proto::CreateRoom {}).await?;
let room = cx.add_model(|cx| Self::new(response.id, client, user_store, cx));
let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
let room = cx.add_model(|cx| {
Self::new(
room_proto.id,
response.live_kit_connection_info,
client,
user_store,
cx,
)
});
let initial_project_id = if let Some(initial_project) = initial_project {
let initial_project_id = room
@ -130,7 +202,15 @@ impl Room {
cx.spawn(|mut cx| async move {
let response = client.request(proto::JoinRoom { id: room_id }).await?;
let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
let room = cx.add_model(|cx| Self::new(room_id, client, user_store, cx));
let room = cx.add_model(|cx| {
Self::new(
room_id,
response.live_kit_connection_info,
client,
user_store,
cx,
)
});
room.update(&mut cx, |room, cx| {
room.leave_when_empty = true;
room.apply_room_update(room_proto, cx)?;
@ -160,6 +240,7 @@ impl Room {
self.pending_participants.clear();
self.participant_user_ids.clear();
self.subscriptions.clear();
self.live_kit.take();
self.client.send(proto::LeaveRoom { id: self.id })?;
Ok(())
}
@ -272,15 +353,40 @@ impl Room {
});
}
this.remote_participants.insert(
peer_id,
RemoteParticipant {
user: user.clone(),
projects: participant.projects,
location: ParticipantLocation::from_proto(participant.location)
.unwrap_or(ParticipantLocation::External),
},
);
let location = ParticipantLocation::from_proto(participant.location)
.unwrap_or(ParticipantLocation::External);
if let Some(remote_participant) = this.remote_participants.get_mut(&peer_id)
{
remote_participant.projects = participant.projects;
if location != remote_participant.location {
remote_participant.location = location;
cx.emit(Event::ParticipantLocationChanged {
participant_id: peer_id,
});
}
} else {
this.remote_participants.insert(
peer_id,
RemoteParticipant {
user: user.clone(),
projects: participant.projects,
location,
tracks: Default::default(),
},
);
if let Some(live_kit) = this.live_kit.as_ref() {
let tracks =
live_kit.room.remote_video_tracks(&peer_id.0.to_string());
for track in tracks {
this.remote_video_track_updated(
RemoteVideoTrackUpdate::Subscribed(track),
cx,
)
.log_err();
}
}
}
}
this.remote_participants.retain(|_, participant| {
@ -318,6 +424,49 @@ impl Room {
Ok(())
}
fn remote_video_track_updated(
&mut self,
change: RemoteVideoTrackUpdate,
cx: &mut ModelContext<Self>,
) -> Result<()> {
match change {
RemoteVideoTrackUpdate::Subscribed(track) => {
let peer_id = PeerId(track.publisher_id().parse()?);
let track_id = track.sid().to_string();
let participant = self
.remote_participants
.get_mut(&peer_id)
.ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
participant.tracks.insert(
track_id.clone(),
Arc::new(RemoteVideoTrack {
live_kit_track: track,
}),
);
cx.emit(Event::RemoteVideoTracksChanged {
participant_id: peer_id,
});
}
RemoteVideoTrackUpdate::Unsubscribed {
publisher_id,
track_id,
} => {
let peer_id = PeerId(publisher_id.parse()?);
let participant = self
.remote_participants
.get_mut(&peer_id)
.ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
participant.tracks.remove(&track_id);
cx.emit(Event::RemoteVideoTracksChanged {
participant_id: peer_id,
});
}
}
cx.notify();
Ok(())
}
fn check_invariants(&self) {
#[cfg(any(test, feature = "test-support"))]
{
@ -418,7 +567,7 @@ impl Room {
})
}
pub fn set_location(
pub(crate) fn set_location(
&mut self,
project: Option<&ModelHandle<Project>>,
cx: &mut ModelContext<Self>,
@ -458,6 +607,140 @@ impl Room {
Ok(())
})
}
pub fn is_screen_sharing(&self) -> bool {
self.live_kit.as_ref().map_or(false, |live_kit| {
!matches!(live_kit.screen_track, ScreenTrack::None)
})
}
pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if self.status.is_offline() {
return Task::ready(Err(anyhow!("room is offline")));
} else if self.is_screen_sharing() {
return Task::ready(Err(anyhow!("screen was already shared")));
}
let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
let publish_id = post_inc(&mut live_kit.next_publish_id);
live_kit.screen_track = ScreenTrack::Pending { publish_id };
cx.notify();
(live_kit.room.display_sources(), publish_id)
} else {
return Task::ready(Err(anyhow!("live-kit was not initialized")));
};
cx.spawn_weak(|this, mut cx| async move {
let publish_track = async {
let displays = displays.await?;
let display = displays
.first()
.ok_or_else(|| anyhow!("no display found"))?;
let track = LocalVideoTrack::screen_share_for_display(&display);
this.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
.read_with(&cx, |this, _| {
this.live_kit
.as_ref()
.map(|live_kit| live_kit.room.publish_video_track(&track))
})
.ok_or_else(|| anyhow!("live-kit was not initialized"))?
.await
};
let publication = publish_track.await;
this.upgrade(&cx)
.ok_or_else(|| anyhow!("room was dropped"))?
.update(&mut cx, |this, cx| {
let live_kit = this
.live_kit
.as_mut()
.ok_or_else(|| anyhow!("live-kit was not initialized"))?;
let canceled = if let ScreenTrack::Pending {
publish_id: cur_publish_id,
} = &live_kit.screen_track
{
*cur_publish_id != publish_id
} else {
true
};
match publication {
Ok(publication) => {
if canceled {
live_kit.room.unpublish_track(publication);
} else {
live_kit.screen_track = ScreenTrack::Published(publication);
cx.notify();
}
Ok(())
}
Err(error) => {
if canceled {
Ok(())
} else {
live_kit.screen_track = ScreenTrack::None;
cx.notify();
Err(error)
}
}
}
})
})
}
pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
if self.status.is_offline() {
return Err(anyhow!("room is offline"));
}
let live_kit = self
.live_kit
.as_mut()
.ok_or_else(|| anyhow!("live-kit was not initialized"))?;
match mem::take(&mut live_kit.screen_track) {
ScreenTrack::None => Err(anyhow!("screen was not shared")),
ScreenTrack::Pending { .. } => {
cx.notify();
Ok(())
}
ScreenTrack::Published(track) => {
live_kit.room.unpublish_track(track);
cx.notify();
Ok(())
}
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
self.live_kit
.as_ref()
.unwrap()
.room
.set_display_sources(sources);
}
}
struct LiveKitRoom {
room: Arc<live_kit_client::Room>,
screen_track: ScreenTrack,
next_publish_id: usize,
_maintain_room: Task<()>,
_maintain_tracks: Task<()>,
}
enum ScreenTrack {
None,
Pending { publish_id: usize },
Published(LocalTrackPublication),
}
impl Default for ScreenTrack {
fn default() -> Self {
Self::None
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
@ -470,4 +753,8 @@ impl RoomStatus {
pub fn is_offline(&self) -> bool {
matches!(self, RoomStatus::Offline)
}
pub fn is_online(&self) -> bool {
matches!(self, RoomStatus::Online)
}
}

View file

@ -1,32 +0,0 @@
[package]
name = "capture"
version = "0.1.0"
edition = "2021"
description = "An example of screen capture"
[dependencies]
gpui = { path = "../gpui" }
live_kit = { path = "../live_kit" }
media = { path = "../media" }
anyhow = "1.0.38"
block = "0.1"
bytes = "1.2"
byteorder = "1.4"
cocoa = "0.24"
core-foundation = "0.9.3"
core-graphics = "0.22.3"
foreign-types = "0.3"
futures = "0.3"
hmac = "0.12"
jwt = "0.16"
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
objc = "0.2"
parking_lot = "0.11.1"
postage = { version = "0.4.1", features = ["futures-traits"] }
serde = { version = "1.0", features = ["derive", "rc"] }
sha2 = "0.10"
simplelog = "0.9"
[build-dependencies]
bindgen = "0.59.2"

View file

@ -1,7 +0,0 @@
fn main() {
// Find WebRTC.framework as a sibling of the executable when running outside of an application bundle
println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path");
// Register exported Objective-C selectors, protocols, etc
println!("cargo:rustc-link-arg=-Wl,-ObjC");
}

View file

@ -1,71 +0,0 @@
use anyhow::Result;
use hmac::{Hmac, Mac};
use jwt::SignWithKey;
use serde::Serialize;
use sha2::Sha256;
use std::{
ops::Add,
time::{Duration, SystemTime, UNIX_EPOCH},
};
static DEFAULT_TTL: Duration = Duration::from_secs(6 * 60 * 60); // 6 hours
#[derive(Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct ClaimGrants<'a> {
iss: &'a str,
sub: &'a str,
iat: u64,
exp: u64,
nbf: u64,
jwtid: &'a str,
video: VideoGrant<'a>,
}
#[derive(Default, Serialize)]
#[serde(rename_all = "camelCase")]
struct VideoGrant<'a> {
room_create: Option<bool>,
room_join: Option<bool>,
room_list: Option<bool>,
room_record: Option<bool>,
room_admin: Option<bool>,
room: Option<&'a str>,
can_publish: Option<bool>,
can_subscribe: Option<bool>,
can_publish_data: Option<bool>,
hidden: Option<bool>,
recorder: Option<bool>,
}
pub fn create_token(
api_key: &str,
secret_key: &str,
room_name: &str,
participant_name: &str,
) -> Result<String> {
let secret_key: Hmac<Sha256> = Hmac::new_from_slice(secret_key.as_bytes())?;
let now = SystemTime::now();
let claims = ClaimGrants {
iss: api_key,
sub: participant_name,
iat: now.duration_since(UNIX_EPOCH).unwrap().as_secs(),
exp: now
.add(DEFAULT_TTL)
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
nbf: 0,
jwtid: participant_name,
video: VideoGrant {
room: Some(room_name),
room_join: Some(true),
can_publish: Some(true),
can_subscribe: Some(true),
..Default::default()
},
};
Ok(claims.sign_with_key(&secret_key)?)
}

View file

@ -1,143 +0,0 @@
mod live_kit_token;
use futures::StreamExt;
use gpui::{
actions,
elements::{Canvas, *},
keymap::Binding,
platform::current::Surface,
Menu, MenuItem, ViewContext,
};
use live_kit::{LocalVideoTrack, Room};
use log::LevelFilter;
use media::core_video::CVImageBuffer;
use postage::watch;
use simplelog::SimpleLogger;
use std::sync::Arc;
actions!(capture, [Quit]);
fn main() {
SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
gpui::App::new(()).unwrap().run(|cx| {
cx.platform().activate(true);
cx.add_global_action(quit);
cx.add_bindings([Binding::new("cmd-q", Quit, None)]);
cx.set_menus(vec![Menu {
name: "Zed",
items: vec![MenuItem::Action {
name: "Quit",
action: Box::new(Quit),
}],
}]);
let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap();
let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap();
let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap();
cx.spawn(|mut cx| async move {
let user1_token = live_kit_token::create_token(
&live_kit_key,
&live_kit_secret,
"test-room",
"test-participant-1",
)
.unwrap();
let room1 = Room::new();
room1.connect(&live_kit_url, &user1_token).await.unwrap();
let user2_token = live_kit_token::create_token(
&live_kit_key,
&live_kit_secret,
"test-room",
"test-participant-2",
)
.unwrap();
let room2 = Room::new();
room2.connect(&live_kit_url, &user2_token).await.unwrap();
cx.add_window(Default::default(), |cx| ScreenCaptureView::new(room2, cx));
let windows = live_kit::list_windows();
let window = windows
.iter()
.find(|w| w.owner_name.as_deref() == Some("Safari"))
.unwrap();
let track = LocalVideoTrack::screen_share_for_window(window.id);
room1.publish_video_track(&track).await.unwrap();
})
.detach();
});
}
struct ScreenCaptureView {
image_buffer: Option<CVImageBuffer>,
_room: Arc<Room>,
}
impl gpui::Entity for ScreenCaptureView {
type Event = ();
}
impl ScreenCaptureView {
pub fn new(room: Arc<Room>, cx: &mut ViewContext<Self>) -> Self {
let mut remote_video_tracks = room.remote_video_tracks();
cx.spawn_weak(|this, mut cx| async move {
if let Some(video_track) = remote_video_tracks.next().await {
let (mut frames_tx, mut frames_rx) = watch::channel_with(None);
video_track.add_renderer(move |frame| *frames_tx.borrow_mut() = Some(frame));
while let Some(frame) = frames_rx.next().await {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
this.image_buffer = frame;
cx.notify();
});
} else {
break;
}
}
}
})
.detach();
Self {
image_buffer: None,
_room: room,
}
}
}
impl gpui::View for ScreenCaptureView {
fn ui_name() -> &'static str {
"View"
}
fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
let image_buffer = self.image_buffer.clone();
let canvas = Canvas::new(move |bounds, _, cx| {
if let Some(image_buffer) = image_buffer.clone() {
cx.scene.push_surface(Surface {
bounds,
image_buffer,
});
}
});
if let Some(image_buffer) = self.image_buffer.as_ref() {
canvas
.constrained()
.with_width(image_buffer.width() as f32)
.with_height(image_buffer.height() as f32)
.aligned()
.boxed()
} else {
canvas.boxed()
}
}
}
fn quit(_: &Quit, cx: &mut gpui::MutableAppContext) {
cx.platform().quit();
}

View file

@ -2,6 +2,9 @@ DATABASE_URL = "postgres://postgres@localhost/zed"
HTTP_PORT = 8080
API_TOKEN = "secret"
INVITE_LINK_PREFIX = "http://localhost:3000/invites/"
LIVE_KIT_SERVER = "http://localhost:7880"
LIVE_KIT_KEY = "devkey"
LIVE_KIT_SECRET = "secret"
# RUST_LOG=info
# LOG_JSON=true

View file

@ -14,8 +14,10 @@ required-features = ["seed-support"]
[dependencies]
collections = { path = "../collections" }
live_kit_server = { path = "../live_kit_server" }
rpc = { path = "../rpc" }
util = { path = "../util" }
anyhow = "1.0.40"
async-trait = "0.1.50"
async-tungstenite = "0.16"
@ -60,15 +62,17 @@ editor = { path = "../editor", features = ["test-support"] }
language = { path = "../language", features = ["test-support"] }
fs = { path = "../fs", features = ["test-support"] }
git = { path = "../git", features = ["test-support"] }
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
live_kit_client = { path = "../live_kit_client", features = ["test-support"] }
lsp = { path = "../lsp", features = ["test-support"] }
project = { path = "../project", features = ["test-support"] }
rpc = { path = "../rpc", features = ["test-support"] }
settings = { path = "../settings", features = ["test-support"] }
theme = { path = "../theme" }
workspace = { path = "../workspace", features = ["test-support"] }
ctor = "0.1"
env_logger = "0.9"
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
util = { path = "../util" }
lazy_static = "1.4"
serde_json = { version = "1.0", features = ["preserve_order"] }

View file

@ -70,6 +70,21 @@ spec:
secretKeyRef:
name: api
key: token
- name: LIVE_KIT_SERVER
valueFrom:
secretKeyRef:
name: livekit
key: server
- name: LIVE_KIT_KEY
valueFrom:
secretKeyRef:
name: livekit
key: key
- name: LIVE_KIT_SECRET
valueFrom:
secretKeyRef:
name: livekit
key: secret
- name: INVITE_LINK_PREFIX
value: ${INVITE_LINK_PREFIX}
- name: RUST_LOG

View file

@ -22,7 +22,7 @@ use time::OffsetDateTime;
use tower::ServiceBuilder;
use tracing::instrument;
pub fn routes(rpc_server: &Arc<rpc::Server>, state: Arc<AppState>) -> Router<Body> {
pub fn routes(rpc_server: Arc<rpc::Server>, state: Arc<AppState>) -> Router<Body> {
Router::new()
.route("/user", get(get_authenticated_user))
.route("/users", get(get_users).post(create_user))
@ -50,7 +50,7 @@ pub fn routes(rpc_server: &Arc<rpc::Server>, state: Arc<AppState>) -> Router<Bod
.layer(
ServiceBuilder::new()
.layer(Extension(state))
.layer(Extension(rpc_server.clone()))
.layer(Extension(rpc_server))
.layer(middleware::from_fn(validate_api_token)),
)
}

View file

@ -30,6 +30,7 @@ use language::{
range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
};
use live_kit_client::MacOSDisplay;
use lsp::{self, FakeLanguageServer};
use parking_lot::Mutex;
use project::{
@ -47,14 +48,14 @@ use std::{
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
},
time::Duration,
};
use theme::ThemeRegistry;
use unindent::Unindent as _;
use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
use workspace::{shared_screen::SharedScreen, Item, SplitDirection, ToggleFollow, Workspace};
#[ctor::ctor]
fn init_logger() {
@ -185,6 +186,37 @@ async fn test_basic_calls(
}
);
// User A shares their screen
let display = MacOSDisplay::new();
let events_b = active_call_events(cx_b);
active_call_a
.update(cx_a, |call, cx| {
call.room().unwrap().update(cx, |room, cx| {
room.set_display_sources(vec![display.clone()]);
room.share_screen(cx)
})
})
.await
.unwrap();
deterministic.run_until_parked();
assert_eq!(events_b.borrow().len(), 1);
let event = events_b.borrow().first().unwrap().clone();
if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event {
assert_eq!(participant_id, client_a.peer_id().unwrap());
room_b.read_with(cx_b, |room, _| {
assert_eq!(
room.remote_participants()[&client_a.peer_id().unwrap()]
.tracks
.len(),
1
);
});
} else {
panic!("unexpected event")
}
// User A leaves the room.
active_call_a.update(cx_a, |call, cx| {
call.hang_up(cx).unwrap();
@ -206,12 +238,13 @@ async fn test_basic_calls(
}
);
// User B leaves the room.
active_call_b.update(cx_b, |call, cx| {
call.hang_up(cx).unwrap();
assert!(call.room().is_none());
});
deterministic.run_until_parked();
// User B gets disconnected from the LiveKit server, which causes them
// to automatically leave the room.
server
.test_live_kit_server
.disconnect_client(client_b.peer_id().unwrap().to_string())
.await;
active_call_b.update(cx_b, |call, _| assert!(call.room().is_none()));
assert_eq!(
room_participants(&room_a, cx_a),
RoomParticipants {
@ -405,6 +438,63 @@ async fn test_leaving_room_on_disconnection(
pending: Default::default()
}
);
// Call user B again from client A.
active_call_a
.update(cx_a, |call, cx| {
call.invite(client_b.user_id().unwrap(), None, cx)
})
.await
.unwrap();
let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
// User B receives the call and joins the room.
let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
incoming_call_b.next().await.unwrap().unwrap();
active_call_b
.update(cx_b, |call, cx| call.accept_incoming(cx))
.await
.unwrap();
let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
deterministic.run_until_parked();
assert_eq!(
room_participants(&room_a, cx_a),
RoomParticipants {
remote: vec!["user_b".to_string()],
pending: Default::default()
}
);
assert_eq!(
room_participants(&room_b, cx_b),
RoomParticipants {
remote: vec!["user_a".to_string()],
pending: Default::default()
}
);
// User B gets disconnected from the LiveKit server, which causes it
// to automatically leave the room.
server
.test_live_kit_server
.disconnect_client(client_b.peer_id().unwrap().to_string())
.await;
deterministic.run_until_parked();
active_call_a.update(cx_a, |call, _| assert!(call.room().is_none()));
active_call_b.update(cx_b, |call, _| assert!(call.room().is_none()));
assert_eq!(
room_participants(&room_a, cx_a),
RoomParticipants {
remote: Default::default(),
pending: Default::default()
}
);
assert_eq!(
room_participants(&room_b, cx_b),
RoomParticipants {
remote: Default::default(),
pending: Default::default()
}
);
}
#[gpui::test(iterations = 10)]
@ -954,21 +1044,21 @@ async fn test_active_call_events(
deterministic.run_until_parked();
assert_eq!(mem::take(&mut *events_a.borrow_mut()), vec![]);
assert_eq!(mem::take(&mut *events_b.borrow_mut()), vec![]);
}
fn active_call_events(cx: &mut TestAppContext) -> Rc<RefCell<Vec<room::Event>>> {
let events = Rc::new(RefCell::new(Vec::new()));
let active_call = cx.read(ActiveCall::global);
cx.update({
let events = events.clone();
|cx| {
cx.subscribe(&active_call, move |_, event, _| {
events.borrow_mut().push(event.clone())
})
.detach()
}
});
events
}
fn active_call_events(cx: &mut TestAppContext) -> Rc<RefCell<Vec<room::Event>>> {
let events = Rc::new(RefCell::new(Vec::new()));
let active_call = cx.read(ActiveCall::global);
cx.update({
let events = events.clone();
|cx| {
cx.subscribe(&active_call, move |_, event, _| {
events.borrow_mut().push(event.clone())
})
.detach()
}
});
events
}
#[gpui::test(iterations = 10)]
@ -984,15 +1074,9 @@ async fn test_room_location(
client_a.fs.insert_tree("/a", json!({})).await;
client_b.fs.insert_tree("/b", json!({})).await;
let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
server
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
let active_call_b = cx_b.read(ActiveCall::global);
let a_notified = Rc::new(Cell::new(false));
cx_a.update({
let notified = a_notified.clone();
@ -1002,8 +1086,6 @@ async fn test_room_location(
}
});
let active_call_b = cx_b.read(ActiveCall::global);
let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
let b_notified = Rc::new(Cell::new(false));
cx_b.update({
let b_notified = b_notified.clone();
@ -1013,10 +1095,18 @@ async fn test_room_location(
}
});
room_a
.update(cx_a, |room, cx| room.set_location(Some(&project_a), cx))
let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
server
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
deterministic.run_until_parked();
assert!(a_notified.take());
assert_eq!(
@ -1071,8 +1161,8 @@ async fn test_room_location(
)]
);
room_b
.update(cx_b, |room, cx| room.set_location(Some(&project_b), cx))
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
deterministic.run_until_parked();
@ -1097,8 +1187,8 @@ async fn test_room_location(
)]
);
room_b
.update(cx_b, |room, cx| room.set_location(None, cx))
active_call_b
.update(cx_b, |call, cx| call.set_location(None, cx))
.await
.unwrap();
deterministic.run_until_parked();
@ -4968,7 +5058,11 @@ async fn test_contact_requests(
}
#[gpui::test(iterations = 10)]
async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
async fn test_following(
deterministic: Arc<Deterministic>,
cx_a: &mut TestAppContext,
cx_b: &mut TestAppContext,
) {
cx_a.foreground().forbid_parking();
cx_a.update(editor::init);
cx_b.update(editor::init);
@ -4980,6 +5074,7 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let active_call_b = cx_b.read(ActiveCall::global);
client_a
.fs
@ -4993,11 +5088,20 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
)
.await;
let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let project_id = active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
.unwrap();
let project_b = client_b.build_remote_project(project_id, cx_b).await;
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
// Client A opens some editors.
let workspace_a = client_a.build_workspace(&project_a, cx_a);
@ -5139,7 +5243,7 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
workspace_a.update(cx_a, |workspace, cx| {
workspace.activate_item(&editor_a2, cx)
});
cx_a.foreground().run_until_parked();
deterministic.run_until_parked();
assert_eq!(
workspace_b.read_with(cx_b, |workspace, cx| workspace
.active_item(cx)
@ -5169,9 +5273,62 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
editor_a1.id()
);
// Client B activates an external window, which causes a new screen-sharing item to be added to the pane.
let display = MacOSDisplay::new();
active_call_b
.update(cx_b, |call, cx| call.set_location(None, cx))
.await
.unwrap();
active_call_b
.update(cx_b, |call, cx| {
call.room().unwrap().update(cx, |room, cx| {
room.set_display_sources(vec![display.clone()]);
room.share_screen(cx)
})
})
.await
.unwrap();
deterministic.run_until_parked();
let shared_screen = workspace_a.read_with(cx_a, |workspace, cx| {
workspace
.active_item(cx)
.unwrap()
.downcast::<SharedScreen>()
.unwrap()
});
// Client B activates Zed again, which causes the previous editor to become focused again.
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
deterministic.run_until_parked();
assert_eq!(
workspace_a.read_with(cx_a, |workspace, cx| workspace
.active_item(cx)
.unwrap()
.id()),
editor_a1.id()
);
// Client B activates an external window again, and the previously-opened screen-sharing item
// gets activated.
active_call_b
.update(cx_b, |call, cx| call.set_location(None, cx))
.await
.unwrap();
deterministic.run_until_parked();
assert_eq!(
workspace_a.read_with(cx_a, |workspace, cx| workspace
.active_item(cx)
.unwrap()
.id()),
shared_screen.id()
);
// Following interrupts when client B disconnects.
client_b.disconnect(&cx_b.to_async()).unwrap();
cx_a.foreground().run_until_parked();
deterministic.run_until_parked();
assert_eq!(
workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
None
@ -5191,6 +5348,7 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let active_call_b = cx_b.read(ActiveCall::global);
// Client A shares a project.
client_a
@ -5206,6 +5364,10 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T
)
.await;
let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let project_id = active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
@ -5213,6 +5375,10 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T
// Client B joins the project.
let project_b = client_b.build_remote_project(project_id, cx_b).await;
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
// Client A opens some editors.
let workspace_a = client_a.build_workspace(&project_a, cx_a);
@ -5360,6 +5526,7 @@ async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let active_call_b = cx_b.read(ActiveCall::global);
// Client A shares a project.
client_a
@ -5374,11 +5541,20 @@ async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont
)
.await;
let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let project_id = active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
.unwrap();
let project_b = client_b.build_remote_project(project_id, cx_b).await;
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
// Client A opens some editors.
let workspace_a = client_a.build_workspace(&project_a, cx_a);
@ -6138,6 +6314,7 @@ struct TestServer {
connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
forbid_connections: Arc<AtomicBool>,
_test_db: TestDb,
test_live_kit_server: Arc<live_kit_client::TestServer>,
}
impl TestServer {
@ -6145,8 +6322,18 @@ impl TestServer {
foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>,
) -> Self {
static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
let test_db = TestDb::fake(background.clone());
let app_state = Self::build_app_state(&test_db).await;
let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
let live_kit_server = live_kit_client::TestServer::create(
format!("http://livekit.{}.test", live_kit_server_id),
format!("devkey-{}", live_kit_server_id),
format!("secret-{}", live_kit_server_id),
background.clone(),
)
.unwrap();
let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
let peer = Peer::new();
let notifications = mpsc::unbounded();
let server = Server::new(app_state.clone(), Some(notifications.0));
@ -6159,6 +6346,7 @@ impl TestServer {
connection_killers: Default::default(),
forbid_connections: Default::default(),
_test_db: test_db,
test_live_kit_server: live_kit_server,
}
}
@ -6354,9 +6542,13 @@ impl TestServer {
}
}
async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
async fn build_app_state(
test_db: &TestDb,
fake_server: &live_kit_client::TestServer,
) -> Arc<AppState> {
Arc::new(AppState {
db: test_db.db().clone(),
live_kit_client: Some(Arc::new(fake_server.create_api_client())),
config: Default::default(),
})
}
@ -6388,6 +6580,7 @@ impl Deref for TestServer {
impl Drop for TestServer {
fn drop(&mut self) {
self.peer.reset();
self.test_live_kit_server.teardown().unwrap();
}
}

View file

@ -9,6 +9,7 @@ mod db_tests;
#[cfg(test)]
mod integration_tests;
use crate::rpc::ResultExt as _;
use axum::{body::Body, Router};
use collab::{Error, Result};
use db::{Db, PostgresDb};
@ -18,6 +19,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::signal;
use tracing_log::LogTracer;
use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer};
use util::ResultExt;
@ -28,20 +30,40 @@ pub struct Config {
pub database_url: String,
pub api_token: String,
pub invite_link_prefix: String,
pub live_kit_server: Option<String>,
pub live_kit_key: Option<String>,
pub live_kit_secret: Option<String>,
pub rust_log: Option<String>,
pub log_json: Option<bool>,
}
pub struct AppState {
db: Arc<dyn Db>,
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
config: Config,
}
impl AppState {
async fn new(config: Config) -> Result<Arc<Self>> {
let db = PostgresDb::new(&config.database_url, 5).await?;
let live_kit_client = if let Some(((server, key), secret)) = config
.live_kit_server
.as_ref()
.zip(config.live_kit_key.as_ref())
.zip(config.live_kit_secret.as_ref())
{
Some(Arc::new(live_kit_server::api::LiveKitClient::new(
server.clone(),
key.clone(),
secret.clone(),
)) as Arc<dyn live_kit_server::api::Client>)
} else {
None
};
let this = Self {
db: Arc::new(db),
live_kit_client,
config,
};
Ok(Arc::new(this))
@ -68,11 +90,12 @@ async fn main() -> Result<()> {
rpc_server.start_recording_project_activity(Duration::from_secs(5 * 60), rpc::RealExecutor);
let app = Router::<Body>::new()
.merge(api::routes(&rpc_server, state.clone()))
.merge(rpc::routes(rpc_server));
.merge(api::routes(rpc_server.clone(), state.clone()))
.merge(rpc::routes(rpc_server.clone()));
axum::Server::from_tcp(listener)?
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(graceful_shutdown(rpc_server, state))
.await?;
Ok(())
@ -109,3 +132,52 @@ pub fn init_tracing(config: &Config) -> Option<()> {
None
}
async fn graceful_shutdown(rpc_server: Arc<rpc::Server>, state: Arc<AppState>) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
if let Some(live_kit) = state.live_kit_client.as_ref() {
let deletions = rpc_server
.store()
.await
.rooms()
.values()
.map(|room| {
let name = room.live_kit_room.clone();
async {
live_kit.delete_room(name).await.trace_err();
}
})
.collect::<Vec<_>>();
tracing::info!("deleting all live-kit rooms");
if let Err(_) = tokio::time::timeout(
Duration::from_secs(10),
futures::future::join_all(deletions),
)
.await
{
tracing::error!("timed out waiting for live-kit room deletion");
}
}
}

View file

@ -50,6 +50,7 @@ use std::{
},
time::Duration,
};
pub use store::{Store, Worktree};
use time::OffsetDateTime;
use tokio::{
sync::{Mutex, MutexGuard},
@ -58,8 +59,6 @@ use tokio::{
use tower::ServiceBuilder;
use tracing::{info_span, instrument, Instrument};
pub use store::{Store, Worktree};
lazy_static! {
static ref METRIC_CONNECTIONS: IntGauge =
register_int_gauge!("connections", "number of connections").unwrap();
@ -477,6 +476,7 @@ impl Server {
let mut projects_to_unshare = Vec::new();
let mut contacts_to_update = HashSet::default();
let mut room_left = None;
{
let mut store = self.store().await;
@ -509,23 +509,24 @@ impl Server {
});
}
if let Some(room) = removed_connection.room {
self.room_updated(&room);
room_left = Some(self.room_left(&room, connection_id));
}
contacts_to_update.insert(removed_connection.user_id);
for connection_id in removed_connection.canceled_call_connection_ids {
self.peer
.send(connection_id, proto::CallCanceled {})
.trace_err();
contacts_to_update.extend(store.user_id_for_connection(connection_id).ok());
}
if let Some(room) = removed_connection
.room_id
.and_then(|room_id| store.room(room_id))
{
self.room_updated(room);
}
contacts_to_update.insert(removed_connection.user_id);
};
if let Some(room_left) = room_left {
room_left.await.trace_err();
}
for user_id in contacts_to_update {
self.update_user_contacts(user_id).await.trace_err();
}
@ -607,13 +608,42 @@ impl Server {
response: Response<proto::CreateRoom>,
) -> Result<()> {
let user_id;
let room_id;
let room;
{
let mut store = self.store().await;
user_id = store.user_id_for_connection(request.sender_id)?;
room_id = store.create_room(request.sender_id)?;
room = store.create_room(request.sender_id)?.clone();
}
response.send(proto::CreateRoomResponse { id: room_id })?;
let live_kit_connection_info =
if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
if let Some(_) = live_kit
.create_room(room.live_kit_room.clone())
.await
.trace_err()
{
if let Some(token) = live_kit
.room_token(&room.live_kit_room, &request.sender_id.to_string())
.trace_err()
{
Some(proto::LiveKitConnectionInfo {
server_url: live_kit.url().into(),
token,
})
} else {
None
}
} else {
None
}
} else {
None
};
response.send(proto::CreateRoomResponse {
room: Some(room),
live_kit_connection_info,
})?;
self.update_user_contacts(user_id).await?;
Ok(())
}
@ -634,8 +664,27 @@ impl Server {
.send(recipient_id, proto::CallCanceled {})
.trace_err();
}
let live_kit_connection_info =
if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
if let Some(token) = live_kit
.room_token(&room.live_kit_room, &request.sender_id.to_string())
.trace_err()
{
Some(proto::LiveKitConnectionInfo {
server_url: live_kit.url().into(),
token,
})
} else {
None
}
} else {
None
};
response.send(proto::JoinRoomResponse {
room: Some(room.clone()),
live_kit_connection_info,
})?;
self.room_updated(room);
}
@ -645,6 +694,7 @@ impl Server {
async fn leave_room(self: Arc<Server>, message: TypedEnvelope<proto::LeaveRoom>) -> Result<()> {
let mut contacts_to_update = HashSet::default();
let room_left;
{
let mut store = self.store().await;
let user_id = store.user_id_for_connection(message.sender_id)?;
@ -683,9 +733,8 @@ impl Server {
}
}
if let Some(room) = left_room.room {
self.room_updated(room);
}
self.room_updated(&left_room.room);
room_left = self.room_left(&left_room.room, message.sender_id);
for connection_id in left_room.canceled_call_connection_ids {
self.peer
@ -695,6 +744,7 @@ impl Server {
}
}
room_left.await.trace_err();
for user_id in contacts_to_update {
self.update_user_contacts(user_id).await?;
}
@ -843,6 +893,29 @@ impl Server {
}
}
fn room_left(
&self,
room: &proto::Room,
connection_id: ConnectionId,
) -> impl Future<Output = Result<()>> {
let client = self.app_state.live_kit_client.clone();
let room_name = room.live_kit_room.clone();
let participant_count = room.participants.len();
async move {
if let Some(client) = client {
client
.remove_participant(room_name.clone(), connection_id.to_string())
.await?;
if participant_count == 0 {
client.delete_room(room_name).await?;
}
}
Ok(())
}
}
async fn share_project(
self: Arc<Server>,
request: TypedEnvelope<proto::ShareProject>,

View file

@ -1,9 +1,10 @@
use crate::db::{self, ChannelId, ProjectId, UserId};
use anyhow::{anyhow, Result};
use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
use nanoid::nanoid;
use rpc::{proto, ConnectionId};
use serde::Serialize;
use std::{mem, path::PathBuf, str, time::Duration};
use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration};
use time::OffsetDateTime;
use tracing::instrument;
use util::post_inc;
@ -85,12 +86,12 @@ pub struct Channel {
pub type ReplicaId = u16;
#[derive(Default)]
pub struct RemovedConnectionState {
pub struct RemovedConnectionState<'a> {
pub user_id: UserId,
pub hosted_projects: Vec<Project>,
pub guest_projects: Vec<LeftProject>,
pub contact_ids: HashSet<UserId>,
pub room_id: Option<RoomId>,
pub room: Option<Cow<'a, proto::Room>>,
pub canceled_call_connection_ids: Vec<ConnectionId>,
}
@ -103,7 +104,7 @@ pub struct LeftProject {
}
pub struct LeftRoom<'a> {
pub room: Option<&'a proto::Room>,
pub room: Cow<'a, proto::Room>,
pub unshared_projects: Vec<Project>,
pub left_projects: Vec<LeftProject>,
pub canceled_call_connection_ids: Vec<ConnectionId>,
@ -219,11 +220,11 @@ impl Store {
let left_room = self.leave_room(room_id, connection_id)?;
result.hosted_projects = left_room.unshared_projects;
result.guest_projects = left_room.left_projects;
result.room_id = Some(room_id);
result.room = Some(Cow::Owned(left_room.room.into_owned()));
result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
} else if connected_user.connection_ids.len() == 1 {
self.decline_call(room_id, connection_id)?;
result.room_id = Some(room_id);
let (room, _) = self.decline_call(room_id, connection_id)?;
result.room = Some(Cow::Owned(room.clone()));
}
}
@ -345,7 +346,7 @@ impl Store {
}
}
pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
let connection = self
.connections
.get_mut(&creator_connection_id)
@ -359,19 +360,23 @@ impl Store {
"can't create a room with an active call"
);
let mut room = proto::Room::default();
room.participants.push(proto::Participant {
user_id: connection.user_id.to_proto(),
peer_id: creator_connection_id.0,
projects: Default::default(),
location: Some(proto::ParticipantLocation {
variant: Some(proto::participant_location::Variant::External(
proto::participant_location::External {},
)),
}),
});
let room_id = post_inc(&mut self.next_room_id);
let room = proto::Room {
id: room_id,
participants: vec![proto::Participant {
user_id: connection.user_id.to_proto(),
peer_id: creator_connection_id.0,
projects: Default::default(),
location: Some(proto::ParticipantLocation {
variant: Some(proto::participant_location::Variant::External(
proto::participant_location::External {},
)),
}),
}],
pending_participant_user_ids: Default::default(),
live_kit_room: nanoid!(30),
};
self.rooms.insert(room_id, room);
connected_user.active_call = Some(Call {
caller_user_id: connection.user_id,
@ -379,7 +384,7 @@ impl Store {
connection_id: Some(creator_connection_id),
initial_project_id: None,
});
Ok(room_id)
Ok(self.rooms.get(&room_id).unwrap())
}
pub fn join_room(
@ -496,12 +501,14 @@ impl Store {
}
});
if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
self.rooms.remove(&room_id);
}
let room = if room.participants.is_empty() {
Cow::Owned(self.rooms.remove(&room_id).unwrap())
} else {
Cow::Borrowed(self.rooms.get(&room_id).unwrap())
};
Ok(LeftRoom {
room: self.rooms.get(&room_id),
room,
unshared_projects,
left_projects,
canceled_call_connection_ids,
@ -512,6 +519,10 @@ impl Store {
self.rooms.get(&room_id)
}
pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
&self.rooms
}
pub fn call(
&mut self,
room_id: RoomId,

View file

@ -10,17 +10,21 @@ use gpui::{
geometry::{rect::RectF, vector::vec2f, PathBuilder},
json::{self, ToJson},
Border, CursorStyle, Entity, ModelHandle, MouseButton, MutableAppContext, RenderContext,
Subscription, View, ViewContext, ViewHandle, WeakViewHandle,
Subscription, Task, View, ViewContext, ViewHandle, WeakViewHandle,
};
use settings::Settings;
use std::ops::Range;
use theme::Theme;
use workspace::{FollowNextCollaborator, JoinProject, ToggleFollow, Workspace};
actions!(collab, [ToggleCollaborationMenu, ShareProject]);
actions!(
collab,
[ToggleCollaborationMenu, ToggleScreenSharing, ShareProject]
);
pub fn init(cx: &mut MutableAppContext) {
cx.add_action(CollabTitlebarItem::toggle_contacts_popover);
cx.add_action(CollabTitlebarItem::toggle_screen_sharing);
cx.add_action(CollabTitlebarItem::share_project);
}
@ -48,10 +52,12 @@ impl View for CollabTitlebarItem {
};
let theme = cx.global::<Settings>().theme.clone();
let project = workspace.read(cx).project().read(cx);
let mut container = Flex::row();
container.add_children(self.render_toggle_screen_sharing_button(&theme, cx));
if workspace.read(cx).client().status().borrow().is_connected() {
let project = workspace.read(cx).project().read(cx);
if project.is_shared()
|| project.is_remote()
|| ActiveCall::global(cx).read(cx).room().is_none()
@ -114,19 +120,15 @@ impl CollabTitlebarItem {
}
fn window_activation_changed(&mut self, active: bool, cx: &mut ViewContext<Self>) {
let workspace = self.workspace.upgrade(cx);
let room = ActiveCall::global(cx).read(cx).room().cloned();
if let Some((workspace, room)) = workspace.zip(room) {
let workspace = workspace.read(cx);
if let Some(workspace) = self.workspace.upgrade(cx) {
let project = if active {
Some(workspace.project().clone())
Some(workspace.read(cx).project().clone())
} else {
None
};
room.update(cx, |room, cx| {
room.set_location(project.as_ref(), cx)
.detach_and_log_err(cx);
});
ActiveCall::global(cx)
.update(cx, |call, cx| call.set_location(project.as_ref(), cx))
.detach_and_log_err(cx);
}
}
@ -169,6 +171,19 @@ impl CollabTitlebarItem {
cx.notify();
}
pub fn toggle_screen_sharing(&mut self, _: &ToggleScreenSharing, cx: &mut ViewContext<Self>) {
if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
let toggle_screen_sharing = room.update(cx, |room, cx| {
if room.is_screen_sharing() {
Task::ready(room.unshare_screen(cx))
} else {
room.share_screen(cx)
}
});
toggle_screen_sharing.detach_and_log_err(cx);
}
}
fn render_toggle_contacts_button(
&self,
theme: &Theme,
@ -237,6 +252,56 @@ impl CollabTitlebarItem {
.boxed()
}
fn render_toggle_screen_sharing_button(
&self,
theme: &Theme,
cx: &mut RenderContext<Self>,
) -> Option<ElementBox> {
let active_call = ActiveCall::global(cx);
let room = active_call.read(cx).room().cloned()?;
let icon;
let tooltip;
if room.read(cx).is_screen_sharing() {
icon = "icons/disable_screen_sharing_12.svg";
tooltip = "Stop Sharing Screen"
} else {
icon = "icons/enable_screen_sharing_12.svg";
tooltip = "Share Screen";
}
let titlebar = &theme.workspace.titlebar;
Some(
MouseEventHandler::<ToggleScreenSharing>::new(0, cx, |state, _| {
let style = titlebar.call_control.style_for(state, false);
Svg::new(icon)
.with_color(style.color)
.constrained()
.with_width(style.icon_width)
.aligned()
.constrained()
.with_width(style.button_width)
.with_height(style.button_width)
.contained()
.with_style(style.container)
.boxed()
})
.with_cursor_style(CursorStyle::PointingHand)
.on_click(MouseButton::Left, move |_, cx| {
cx.dispatch_action(ToggleScreenSharing);
})
.with_tooltip::<ToggleScreenSharing, _>(
0,
tooltip.into(),
Some(Box::new(ToggleScreenSharing)),
theme.tooltip.clone(),
cx,
)
.aligned()
.boxed(),
)
}
fn render_share_button(&self, theme: &Theme, cx: &mut RenderContext<Self>) -> ElementBox {
enum Share {}

View file

@ -17,7 +17,7 @@ use serde::Deserialize;
use settings::Settings;
use theme::IconButton;
use util::ResultExt;
use workspace::JoinProject;
use workspace::{JoinProject, OpenSharedScreen};
impl_actions!(contact_list, [RemoveContact, RespondToContactRequest]);
impl_internal_actions!(contact_list, [ToggleExpanded, Call, LeaveCall]);
@ -67,6 +67,10 @@ enum ContactEntry {
host_user_id: u64,
is_last: bool,
},
ParticipantScreen {
peer_id: PeerId,
is_last: bool,
},
IncomingRequest(Arc<User>),
OutgoingRequest(Arc<User>),
Contact(Arc<Contact>),
@ -97,6 +101,16 @@ impl PartialEq for ContactEntry {
return project_id_1 == project_id_2;
}
}
ContactEntry::ParticipantScreen {
peer_id: peer_id_1, ..
} => {
if let ContactEntry::ParticipantScreen {
peer_id: peer_id_2, ..
} = other
{
return peer_id_1 == peer_id_2;
}
}
ContactEntry::IncomingRequest(user_1) => {
if let ContactEntry::IncomingRequest(user_2) = other {
return user_1.id == user_2.id;
@ -216,6 +230,15 @@ impl ContactList {
&theme.contact_list,
cx,
),
ContactEntry::ParticipantScreen { peer_id, is_last } => {
Self::render_participant_screen(
*peer_id,
*is_last,
is_selected,
&theme.contact_list,
cx,
)
}
ContactEntry::IncomingRequest(user) => Self::render_contact_request(
user.clone(),
this.user_store.clone(),
@ -347,6 +370,9 @@ impl ContactList {
follow_user_id: *host_user_id,
});
}
ContactEntry::ParticipantScreen { peer_id, .. } => {
cx.dispatch_action(OpenSharedScreen { peer_id: *peer_id });
}
_ => {}
}
}
@ -430,11 +456,10 @@ impl ContactList {
executor.clone(),
));
for mat in matches {
let participant = &room.remote_participants()[&PeerId(mat.candidate_id as u32)];
let peer_id = PeerId(mat.candidate_id as u32);
let participant = &room.remote_participants()[&peer_id];
participant_entries.push(ContactEntry::CallParticipant {
user: room.remote_participants()[&PeerId(mat.candidate_id as u32)]
.user
.clone(),
user: participant.user.clone(),
is_pending: false,
});
let mut projects = participant.projects.iter().peekable();
@ -443,7 +468,13 @@ impl ContactList {
project_id: project.id,
worktree_root_names: project.worktree_root_names.clone(),
host_user_id: participant.user.id,
is_last: projects.peek().is_none(),
is_last: projects.peek().is_none() && participant.tracks.is_empty(),
});
}
if !participant.tracks.is_empty() {
participant_entries.push(ContactEntry::ParticipantScreen {
peer_id,
is_last: true,
});
}
}
@ -763,6 +794,102 @@ impl ContactList {
.boxed()
}
fn render_participant_screen(
peer_id: PeerId,
is_last: bool,
is_selected: bool,
theme: &theme::ContactList,
cx: &mut RenderContext<Self>,
) -> ElementBox {
let font_cache = cx.font_cache();
let host_avatar_height = theme
.contact_avatar
.width
.or(theme.contact_avatar.height)
.unwrap_or(0.);
let row = &theme.project_row.default;
let tree_branch = theme.tree_branch;
let line_height = row.name.text.line_height(font_cache);
let cap_height = row.name.text.cap_height(font_cache);
let baseline_offset =
row.name.text.baseline_offset(font_cache) + (theme.row_height - line_height) / 2.;
MouseEventHandler::<OpenSharedScreen>::new(peer_id.0 as usize, cx, |mouse_state, _| {
let tree_branch = *tree_branch.style_for(mouse_state, is_selected);
let row = theme.project_row.style_for(mouse_state, is_selected);
Flex::row()
.with_child(
Stack::new()
.with_child(
Canvas::new(move |bounds, _, cx| {
let start_x = bounds.min_x() + (bounds.width() / 2.)
- (tree_branch.width / 2.);
let end_x = bounds.max_x();
let start_y = bounds.min_y();
let end_y = bounds.min_y() + baseline_offset - (cap_height / 2.);
cx.scene.push_quad(gpui::Quad {
bounds: RectF::from_points(
vec2f(start_x, start_y),
vec2f(
start_x + tree_branch.width,
if is_last { end_y } else { bounds.max_y() },
),
),
background: Some(tree_branch.color),
border: gpui::Border::default(),
corner_radius: 0.,
});
cx.scene.push_quad(gpui::Quad {
bounds: RectF::from_points(
vec2f(start_x, end_y),
vec2f(end_x, end_y + tree_branch.width),
),
background: Some(tree_branch.color),
border: gpui::Border::default(),
corner_radius: 0.,
});
})
.boxed(),
)
.constrained()
.with_width(host_avatar_height)
.boxed(),
)
.with_child(
Svg::new("icons/disable_screen_sharing_12.svg")
.with_color(row.icon.color)
.constrained()
.with_width(row.icon.width)
.aligned()
.left()
.contained()
.with_style(row.icon.container)
.boxed(),
)
.with_child(
Label::new("Screen".into(), row.name.text.clone())
.aligned()
.left()
.contained()
.with_style(row.name.container)
.flex(1., false)
.boxed(),
)
.constrained()
.with_height(theme.row_height)
.contained()
.with_style(row.container)
.boxed()
})
.with_cursor_style(CursorStyle::PointingHand)
.on_click(MouseButton::Left, move |_, cx| {
cx.dispatch_action(OpenSharedScreen { peer_id });
})
.boxed()
}
fn render_header(
section: Section,
theme: &theme::ContactList,
@ -1035,25 +1162,11 @@ impl ContactList {
fn call(&mut self, action: &Call, cx: &mut ViewContext<Self>) {
let recipient_user_id = action.recipient_user_id;
let initial_project = action.initial_project.clone();
let window_id = cx.window_id();
let active_call = ActiveCall::global(cx);
cx.spawn_weak(|_, mut cx| async move {
active_call
.update(&mut cx, |active_call, cx| {
active_call.invite(recipient_user_id, initial_project.clone(), cx)
})
.await?;
if cx.update(|cx| cx.window_is_active(window_id)) {
active_call
.update(&mut cx, |call, cx| {
call.set_location(initial_project.as_ref(), cx)
})
.await?;
}
anyhow::Ok(())
})
.detach_and_log_err(cx);
ActiveCall::global(cx)
.update(cx, |call, cx| {
call.invite(recipient_user_id, initial_project.clone(), cx)
})
.detach_and_log_err(cx);
}
fn leave_call(&mut self, _: &LeaveCall, cx: &mut ViewContext<Self>) {

View file

@ -62,6 +62,7 @@ pub fn init(cx: &mut MutableAppContext) {
cx.remove_window(window_id);
}
}
_ => {}
})
.detach();
}

View file

@ -57,7 +57,7 @@ fn compile_metal_shaders() {
"macosx",
"metal",
"-gline-tables-only",
"-mmacosx-version-min=10.14",
"-mmacosx-version-min=10.15.7",
"-MO",
"-c",
shader_path,

View file

@ -3835,6 +3835,11 @@ impl<'a, T: View> ViewContext<'a, T> {
self.app.notify_view(self.window_id, self.view_id);
}
pub fn dispatch_action(&mut self, action: impl Action) {
self.app
.dispatch_action_at(self.window_id, self.view_id, action)
}
pub fn dispatch_any_action(&mut self, action: Box<dyn Action>) {
self.app
.dispatch_any_action_at(self.window_id, self.view_id, action)

View file

@ -464,7 +464,7 @@ pub trait ParentElement<'a>: Extend<ElementBox> + Sized {
impl<'a, T> ParentElement<'a> for T where T: Extend<ElementBox> {}
fn constrain_size_preserving_aspect_ratio(max_size: Vector2F, size: Vector2F) -> Vector2F {
pub fn constrain_size_preserving_aspect_ratio(max_size: Vector2F, size: Vector2F) -> Vector2F {
if max_size.x().is_infinite() && max_size.y().is_infinite() {
size
} else if max_size.x().is_infinite() || max_size.x() / max_size.y() > size.x() / size.y() {

View file

@ -36,7 +36,7 @@ text = { path = "../text" }
theme = { path = "../theme" }
util = { path = "../util" }
anyhow = "1.0.38"
async-broadcast = "0.3.4"
async-broadcast = "0.4"
async-trait = "0.1"
futures = "0.3"
lazy_static = "1.4"

View file

@ -1,22 +0,0 @@
[package]
name = "live_kit"
version = "0.1.0"
edition = "2021"
description = "Bindings to LiveKit Swift client SDK"
[lib]
path = "src/live_kit.rs"
doctest = false
[dependencies]
media = { path = "../media" }
anyhow = "1.0.38"
core-foundation = "0.9.3"
core-graphics = "0.22.3"
futures = "0.3"
parking_lot = "0.11.1"
[build-dependencies]
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = { version = "1.0", features = ["preserve_order"] }

View file

@ -1,105 +0,0 @@
import Foundation
import LiveKit
import WebRTC
class LKRoomDelegate: RoomDelegate {
var data: UnsafeRawPointer
var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void
init(data: UnsafeRawPointer, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void) {
self.data = data
self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack
}
func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) {
if track.kind == .video {
self.onDidSubscribeToRemoteVideoTrack(self.data, Unmanaged.passRetained(track).toOpaque())
}
}
}
class LKVideoRenderer: NSObject, VideoRenderer {
var data: UnsafeRawPointer
var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void
var onDrop: @convention(c) (UnsafeRawPointer) -> Void
var adaptiveStreamIsEnabled: Bool = false
var adaptiveStreamSize: CGSize = .zero
init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) {
self.data = data
self.onFrame = onFrame
self.onDrop = onDrop
}
deinit {
self.onDrop(self.data)
}
func setSize(_ size: CGSize) {
print("Called setSize", size);
}
func renderFrame(_ frame: RTCVideoFrame?) {
let buffer = frame?.buffer as? RTCCVPixelBuffer
if let pixelBuffer = buffer?.pixelBuffer {
self.onFrame(self.data, pixelBuffer)
}
}
}
@_cdecl("LKRelease")
public func LKRelease(ptr: UnsafeRawPointer) {
let _ = Unmanaged<AnyObject>.fromOpaque(ptr).takeRetainedValue()
}
@_cdecl("LKRoomDelegateCreate")
public func LKRoomDelegateCreate(data: UnsafeRawPointer, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
let delegate = LKRoomDelegate(data: data, onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack)
return Unmanaged.passRetained(delegate).toOpaque()
}
@_cdecl("LKRoomCreate")
public func LKRoomCreate(delegate: UnsafeRawPointer) -> UnsafeMutableRawPointer {
let delegate = Unmanaged<LKRoomDelegate>.fromOpaque(delegate).takeUnretainedValue()
return Unmanaged.passRetained(Room(delegate: delegate)).toOpaque()
}
@_cdecl("LKRoomConnect")
public func LKRoomConnect(room: UnsafeRawPointer, url: CFString, token: CFString, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
room.connect(url as String, token as String).then { _ in
callback(callback_data, UnsafeRawPointer(nil) as! CFString?)
}.catch { error in
callback(callback_data, error.localizedDescription as CFString)
}
}
@_cdecl("LKRoomPublishVideoTrack")
public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
let track = Unmanaged<LocalVideoTrack>.fromOpaque(track).takeUnretainedValue()
room.localParticipant?.publishVideoTrack(track: track).then { _ in
callback(callback_data, UnsafeRawPointer(nil) as! CFString?)
}.catch { error in
callback(callback_data, error.localizedDescription as CFString)
}
}
@_cdecl("LKCreateScreenShareTrackForWindow")
public func LKCreateScreenShareTrackForWindow(windowId: uint32) -> UnsafeMutableRawPointer {
let track = LocalVideoTrack.createMacOSScreenShareTrack(source: .window(id: windowId))
return Unmanaged.passRetained(track).toOpaque()
}
@_cdecl("LKVideoRendererCreate")
public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque()
}
@_cdecl("LKVideoTrackAddRenderer")
public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) {
let track = Unmanaged<Track>.fromOpaque(track).takeUnretainedValue() as! VideoTrack
let renderer = Unmanaged<LKVideoRenderer>.fromOpaque(renderer).takeRetainedValue()
track.add(videoRenderer: renderer)
}

View file

@ -1,276 +0,0 @@
use anyhow::{anyhow, Context, Result};
use core_foundation::{
array::CFArray,
base::{TCFType, TCFTypeRef},
dictionary::CFDictionary,
number::CFNumber,
string::{CFString, CFStringRef},
};
use core_graphics::window::{
kCGNullWindowID, kCGWindowListOptionExcludeDesktopElements, kCGWindowListOptionOnScreenOnly,
kCGWindowNumber, kCGWindowOwnerName, kCGWindowOwnerPID, CGWindowListCopyWindowInfo,
};
use futures::{
channel::{mpsc, oneshot},
Future,
};
use media::core_video::{CVImageBuffer, CVImageBufferRef};
use parking_lot::Mutex;
use std::{
ffi::c_void,
sync::{Arc, Weak},
};
extern "C" {
fn LKRelease(object: *const c_void);
fn LKRoomDelegateCreate(
callback_data: *mut c_void,
on_did_subscribe_to_remote_video_track: extern "C" fn(
callback_data: *mut c_void,
remote_track: *const c_void,
),
) -> *const c_void;
fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
fn LKRoomConnect(
room: *const c_void,
url: CFStringRef,
token: CFStringRef,
callback: extern "C" fn(*mut c_void, CFStringRef),
callback_data: *mut c_void,
);
fn LKRoomPublishVideoTrack(
room: *const c_void,
track: *const c_void,
callback: extern "C" fn(*mut c_void, CFStringRef),
callback_data: *mut c_void,
);
fn LKVideoRendererCreate(
callback_data: *mut c_void,
on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef),
on_drop: extern "C" fn(callback_data: *mut c_void),
) -> *const c_void;
fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
fn LKCreateScreenShareTrackForWindow(windowId: u32) -> *const c_void;
}
pub struct Room {
native_room: *const c_void,
remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<Arc<RemoteVideoTrack>>>>,
_delegate: RoomDelegate,
}
impl Room {
pub fn new() -> Arc<Self> {
Arc::new_cyclic(|weak_room| {
let delegate = RoomDelegate::new(weak_room.clone());
Self {
native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
remote_video_track_subscribers: Default::default(),
_delegate: delegate,
}
})
}
pub fn connect(&self, url: &str, token: &str) -> impl Future<Output = Result<()>> {
let url = CFString::new(url);
let token = CFString::new(token);
let (did_connect, tx, rx) = Self::build_done_callback();
unsafe {
LKRoomConnect(
self.native_room,
url.as_concrete_TypeRef(),
token.as_concrete_TypeRef(),
did_connect,
tx,
)
}
async { rx.await.unwrap().context("error connecting to room") }
}
pub fn publish_video_track(&self, track: &LocalVideoTrack) -> impl Future<Output = Result<()>> {
let (did_publish, tx, rx) = Self::build_done_callback();
unsafe {
LKRoomPublishVideoTrack(self.native_room, track.0, did_publish, tx);
}
async { rx.await.unwrap().context("error publishing video track") }
}
pub fn remote_video_tracks(&self) -> mpsc::UnboundedReceiver<Arc<RemoteVideoTrack>> {
let (tx, rx) = mpsc::unbounded();
self.remote_video_track_subscribers.lock().push(tx);
rx
}
fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
let track = Arc::new(track);
self.remote_video_track_subscribers
.lock()
.retain(|tx| tx.unbounded_send(track.clone()).is_ok());
}
fn build_done_callback() -> (
extern "C" fn(*mut c_void, CFStringRef),
*mut c_void,
oneshot::Receiver<Result<()>>,
) {
let (tx, rx) = oneshot::channel();
extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
if error.is_null() {
let _ = tx.send(Ok(()));
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
let _ = tx.send(Err(anyhow!(error)));
}
}
(
done_callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
rx,
)
}
}
impl Drop for Room {
fn drop(&mut self) {
unsafe { LKRelease(self.native_room) }
}
}
struct RoomDelegate {
native_delegate: *const c_void,
weak_room: *const Room,
}
impl RoomDelegate {
fn new(weak_room: Weak<Room>) -> Self {
let weak_room = Weak::into_raw(weak_room);
let native_delegate = unsafe {
LKRoomDelegateCreate(
weak_room as *mut c_void,
Self::on_did_subscribe_to_remote_video_track,
)
};
Self {
native_delegate,
weak_room,
}
}
extern "C" fn on_did_subscribe_to_remote_video_track(room: *mut c_void, track: *const c_void) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let track = RemoteVideoTrack(track);
if let Some(room) = room.upgrade() {
room.did_subscribe_to_remote_video_track(track);
}
let _ = Weak::into_raw(room);
}
}
impl Drop for RoomDelegate {
fn drop(&mut self) {
unsafe {
LKRelease(self.native_delegate);
let _ = Weak::from_raw(self.weak_room);
}
}
}
pub struct LocalVideoTrack(*const c_void);
impl LocalVideoTrack {
pub fn screen_share_for_window(window_id: u32) -> Self {
Self(unsafe { LKCreateScreenShareTrackForWindow(window_id) })
}
}
impl Drop for LocalVideoTrack {
fn drop(&mut self) {
unsafe { LKRelease(self.0) }
}
}
pub struct RemoteVideoTrack(*const c_void);
impl RemoteVideoTrack {
pub fn add_renderer<F>(&self, callback: F)
where
F: 'static + FnMut(CVImageBuffer),
{
extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
where
F: FnMut(CVImageBuffer),
{
unsafe {
let buffer = CVImageBuffer::wrap_under_get_rule(frame);
let callback = &mut *(callback_data as *mut F);
callback(buffer);
}
}
extern "C" fn on_drop<F>(callback_data: *mut c_void) {
unsafe {
let _ = Box::from_raw(callback_data as *mut F);
}
}
let callback_data = Box::into_raw(Box::new(callback));
unsafe {
let renderer =
LKVideoRendererCreate(callback_data as *mut c_void, on_frame::<F>, on_drop::<F>);
LKVideoTrackAddRenderer(self.0, renderer);
}
}
}
impl Drop for RemoteVideoTrack {
fn drop(&mut self) {
unsafe { LKRelease(self.0) }
}
}
#[derive(Debug)]
pub struct WindowInfo {
pub id: u32,
pub owner_pid: i32,
pub owner_name: Option<String>,
}
pub fn list_windows() -> Vec<WindowInfo> {
unsafe {
let dicts = CFArray::<CFDictionary>::wrap_under_get_rule(CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly | kCGWindowListOptionExcludeDesktopElements,
kCGNullWindowID,
));
dicts
.iter()
.map(|dict| {
let id =
CFNumber::wrap_under_get_rule(*dict.get(kCGWindowNumber.as_void_ptr()) as _)
.to_i64()
.unwrap() as u32;
let owner_pid =
CFNumber::wrap_under_get_rule(*dict.get(kCGWindowOwnerPID.as_void_ptr()) as _)
.to_i32()
.unwrap();
let owner_name = dict
.find(kCGWindowOwnerName.as_void_ptr())
.map(|name| CFString::wrap_under_get_rule(*name as _).to_string());
WindowInfo {
id,
owner_pid,
owner_name,
}
})
.collect()
}
}

View file

@ -0,0 +1,2 @@
[live_kit_client_test]
rustflags = ["-C", "link-args=-ObjC"]

View file

@ -0,0 +1,70 @@
[package]
name = "live_kit_client"
version = "0.1.0"
edition = "2021"
description = "Bindings to LiveKit Swift client SDK"
[lib]
path = "src/live_kit_client.rs"
doctest = false
[[example]]
name = "test_app"
[features]
test-support = [
"async-trait",
"collections/test-support",
"gpui/test-support",
"lazy_static",
"live_kit_server",
"nanoid",
]
[dependencies]
collections = { path = "../collections", optional = true }
gpui = { path = "../gpui", optional = true }
live_kit_server = { path = "../live_kit_server", optional = true }
media = { path = "../media" }
anyhow = "1.0.38"
async-broadcast = "0.4"
core-foundation = "0.9.3"
core-graphics = "0.22.3"
futures = "0.3"
log = { version = "0.4.16", features = ["kv_unstable_serde"] }
parking_lot = "0.11.1"
postage = { version = "0.4.1", features = ["futures-traits"] }
async-trait = { version = "0.1", optional = true }
lazy_static = { version = "1.4", optional = true }
nanoid = { version ="0.4", optional = true}
[dev-dependencies]
collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }
live_kit_server = { path = "../live_kit_server" }
media = { path = "../media" }
anyhow = "1.0.38"
async-trait = "0.1"
block = "0.1"
bytes = "1.2"
byteorder = "1.4"
cocoa = "0.24"
core-foundation = "0.9.3"
core-graphics = "0.22.3"
foreign-types = "0.3"
futures = "0.3"
hmac = "0.12"
jwt = "0.16"
lazy_static = "1.4"
objc = "0.2"
parking_lot = "0.11.1"
serde = { version = "1.0", features = ["derive", "rc"] }
sha2 = "0.10"
simplelog = "0.9"
[build-dependencies]
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = { version = "1.0", features = ["preserve_order"] }

View file

@ -6,7 +6,7 @@
"repositoryURL": "https://github.com/livekit/client-sdk-swift.git",
"state": {
"branch": null,
"revision": "5cc3c001779ab147199ce3ea0dce465b846368b4",
"revision": "f6ca534eb334e99acb8e82cc99b491717df28d8a",
"version": null
}
},
@ -24,8 +24,8 @@
"repositoryURL": "https://github.com/webrtc-sdk/Specs.git",
"state": {
"branch": null,
"revision": "5225f2de4b6d0098803b3a0e55b255a41f293dad",
"version": "104.5112.2"
"revision": "38ac06261e62f980652278c69b70284324c769e0",
"version": "104.5112.5"
}
},
{
@ -42,8 +42,8 @@
"repositoryURL": "https://github.com/apple/swift-protobuf.git",
"state": {
"branch": null,
"revision": "b8230909dedc640294d7324d37f4c91ad3dcf177",
"version": "1.20.1"
"revision": "88c7d15e1242fdb6ecbafbc7926426a19be1e98a",
"version": "1.20.2"
}
}
]

View file

@ -15,7 +15,7 @@ let package = Package(
targets: ["LiveKitBridge"]),
],
dependencies: [
.package(url: "https://github.com/livekit/client-sdk-swift.git", revision: "5cc3c001779ab147199ce3ea0dce465b846368b4"),
.package(url: "https://github.com/livekit/client-sdk-swift.git", revision: "f6ca534eb334e99acb8e82cc99b491717df28d8a"),
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.

View file

@ -0,0 +1,179 @@
import Foundation
import LiveKit
import WebRTC
import ScreenCaptureKit
class LKRoomDelegate: RoomDelegate {
var data: UnsafeRawPointer
var onDidDisconnect: @convention(c) (UnsafeRawPointer) -> Void
var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void
var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
init(
data: UnsafeRawPointer,
onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void,
onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void)
{
self.data = data
self.onDidDisconnect = onDidDisconnect
self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack
self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack
}
func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) {
if connectionState.isDisconnected {
self.onDidDisconnect(self.data)
}
}
func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) {
if track.kind == .video {
self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString, Unmanaged.passUnretained(track).toOpaque())
}
}
func room(_ room: Room, participant: RemoteParticipant, didUnsubscribe publication: RemoteTrackPublication, track: Track) {
if track.kind == .video {
self.onDidUnsubscribeFromRemoteVideoTrack(self.data, participant.identity as CFString, track.sid! as CFString)
}
}
}
class LKVideoRenderer: NSObject, VideoRenderer {
var data: UnsafeRawPointer
var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool
var onDrop: @convention(c) (UnsafeRawPointer) -> Void
var adaptiveStreamIsEnabled: Bool = false
var adaptiveStreamSize: CGSize = .zero
weak var track: VideoTrack?
init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) {
self.data = data
self.onFrame = onFrame
self.onDrop = onDrop
}
deinit {
self.onDrop(self.data)
}
func setSize(_ size: CGSize) {
}
func renderFrame(_ frame: RTCVideoFrame?) {
let buffer = frame?.buffer as? RTCCVPixelBuffer
if let pixelBuffer = buffer?.pixelBuffer {
if !self.onFrame(self.data, pixelBuffer) {
DispatchQueue.main.async {
self.track?.remove(videoRenderer: self)
}
}
}
}
}
@_cdecl("LKRoomDelegateCreate")
public func LKRoomDelegateCreate(
data: UnsafeRawPointer,
onDidDisconnect: @escaping @convention(c) (UnsafeRawPointer) -> Void,
onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
) -> UnsafeMutableRawPointer {
let delegate = LKRoomDelegate(
data: data,
onDidDisconnect: onDidDisconnect,
onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack,
onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack
)
return Unmanaged.passRetained(delegate).toOpaque()
}
@_cdecl("LKRoomCreate")
public func LKRoomCreate(delegate: UnsafeRawPointer) -> UnsafeMutableRawPointer {
let delegate = Unmanaged<LKRoomDelegate>.fromOpaque(delegate).takeUnretainedValue()
return Unmanaged.passRetained(Room(delegate: delegate)).toOpaque()
}
@_cdecl("LKRoomConnect")
public func LKRoomConnect(room: UnsafeRawPointer, url: CFString, token: CFString, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
room.connect(url as String, token as String).then { _ in
callback(callback_data, UnsafeRawPointer(nil) as! CFString?)
}.catch { error in
callback(callback_data, error.localizedDescription as CFString)
}
}
@_cdecl("LKRoomDisconnect")
public func LKRoomDisconnect(room: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
room.disconnect()
}
@_cdecl("LKRoomPublishVideoTrack")
public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
let track = Unmanaged<LocalVideoTrack>.fromOpaque(track).takeUnretainedValue()
room.localParticipant?.publishVideoTrack(track: track).then { publication in
callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil)
}.catch { error in
callback(callback_data, nil, error.localizedDescription as CFString)
}
}
@_cdecl("LKRoomUnpublishTrack")
public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
let _ = room.localParticipant?.unpublish(publication: publication)
}
@_cdecl("LKRoomVideoTracksForRemoteParticipant")
public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
for (_, participant) in room.remoteParticipants {
if participant.identity == participantId as String {
return participant.videoTracks.compactMap { $0.track as? RemoteVideoTrack } as CFArray?
}
}
return nil;
}
@_cdecl("LKCreateScreenShareTrackForDisplay")
public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
let display = Unmanaged<MacOSDisplay>.fromOpaque(display).takeUnretainedValue()
let track = LocalVideoTrack.createMacOSScreenShareTrack(source: display, preferredMethod: .legacy)
return Unmanaged.passRetained(track).toOpaque()
}
@_cdecl("LKVideoRendererCreate")
public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque()
}
@_cdecl("LKVideoTrackAddRenderer")
public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) {
let track = Unmanaged<Track>.fromOpaque(track).takeUnretainedValue() as! VideoTrack
let renderer = Unmanaged<LKVideoRenderer>.fromOpaque(renderer).takeRetainedValue()
renderer.track = track
track.add(videoRenderer: renderer)
}
@_cdecl("LKRemoteVideoTrackGetSid")
public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString {
let track = Unmanaged<RemoteVideoTrack>.fromOpaque(track).takeUnretainedValue()
return track.sid! as CFString
}
@_cdecl("LKDisplaySources")
public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) {
MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in
callback(data, displaySources as CFArray, nil)
}.catch { error in
callback(data, nil, error.localizedDescription as CFString)
}
}

View file

@ -32,17 +32,23 @@ pub struct SwiftTarget {
pub paths: SwiftPaths,
}
const MACOS_TARGET_VERSION: &str = "10.15";
const MACOS_TARGET_VERSION: &str = "10.15.7";
fn main() {
let swift_target = get_swift_target();
if cfg!(not(any(test, feature = "test-support"))) {
let swift_target = get_swift_target();
build_bridge(&swift_target);
link_swift_stdlib(&swift_target);
link_webrtc_framework(&swift_target);
build_bridge(&swift_target);
link_swift_stdlib(&swift_target);
link_webrtc_framework(&swift_target);
// Register exported Objective-C selectors, protocols, etc when building example binaries.
println!("cargo:rustc-link-arg=-Wl,-ObjC");
}
}
fn build_bridge(swift_target: &SwiftTarget) {
println!("cargo:rerun-if-env-changed=MACOSX_DEPLOYMENT_TARGET");
println!("cargo:rerun-if-changed={}/Sources", SWIFT_PACKAGE_NAME);
println!(
"cargo:rerun-if-changed={}/Package.swift",
@ -76,13 +82,9 @@ fn build_bridge(swift_target: &SwiftTarget) {
}
fn link_swift_stdlib(swift_target: &SwiftTarget) {
swift_target
.paths
.runtime_library_paths
.iter()
.for_each(|path| {
println!("cargo:rustc-link-search=native={}", path);
});
for path in &swift_target.paths.runtime_library_paths {
println!("cargo:rustc-link-search=native={}", path);
}
}
fn link_webrtc_framework(swift_target: &SwiftTarget) {
@ -94,6 +96,8 @@ fn link_webrtc_framework(swift_target: &SwiftTarget) {
);
// Find WebRTC.framework as a sibling of the executable when running tests.
println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path");
// Find WebRTC.framework in parent directory of the executable when running examples.
println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/..");
let source_path = swift_out_dir_path.join("WebRTC.framework");
let deps_dir_path =
@ -125,9 +129,20 @@ fn swift_package_root() -> PathBuf {
}
fn copy_dir(source: &Path, destination: &Path) {
assert!(
Command::new("rm")
.arg("-rf")
.arg(destination)
.status()
.unwrap()
.success(),
"could not remove {:?} before copying",
destination
);
assert!(
Command::new("cp")
.arg("-r")
.arg("-R")
.args(&[source, destination])
.status()
.unwrap()

View file

@ -0,0 +1,93 @@
use futures::StreamExt;
use gpui::{actions, keymap::Binding, Menu, MenuItem};
use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate, Room};
use live_kit_server::token::{self, VideoGrant};
use log::LevelFilter;
use simplelog::SimpleLogger;
actions!(capture, [Quit]);
fn main() {
SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
gpui::App::new(()).unwrap().run(|cx| {
cx.platform().activate(true);
cx.add_global_action(quit);
cx.add_bindings([Binding::new("cmd-q", Quit, None)]);
cx.set_menus(vec![Menu {
name: "Zed",
items: vec![MenuItem::Action {
name: "Quit",
action: Box::new(Quit),
}],
}]);
let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into());
let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into());
let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into());
cx.spawn(|cx| async move {
let user_a_token = token::create(
&live_kit_key,
&live_kit_secret,
Some("test-participant-1"),
VideoGrant::to_join("test-room"),
)
.unwrap();
let room_a = Room::new();
room_a.connect(&live_kit_url, &user_a_token).await.unwrap();
let user2_token = token::create(
&live_kit_key,
&live_kit_secret,
Some("test-participant-2"),
VideoGrant::to_join("test-room"),
)
.unwrap();
let room_b = Room::new();
room_b.connect(&live_kit_url, &user2_token).await.unwrap();
let mut track_changes = room_b.remote_video_track_updates();
let displays = room_a.display_sources().await.unwrap();
let display = displays.into_iter().next().unwrap();
let track_a = LocalVideoTrack::screen_share_for_display(&display);
let track_a_publication = room_a.publish_video_track(&track_a).await.unwrap();
if let RemoteVideoTrackUpdate::Subscribed(track) = track_changes.next().await.unwrap() {
let remote_tracks = room_b.remote_video_tracks("test-participant-1");
assert_eq!(remote_tracks.len(), 1);
assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1");
assert_eq!(track.publisher_id(), "test-participant-1");
} else {
panic!("unexpected message");
}
let remote_track = room_b
.remote_video_tracks("test-participant-1")
.pop()
.unwrap();
room_a.unpublish_track(track_a_publication);
if let RemoteVideoTrackUpdate::Unsubscribed {
publisher_id,
track_id,
} = track_changes.next().await.unwrap()
{
assert_eq!(publisher_id, "test-participant-1");
assert_eq!(remote_track.sid(), track_id);
assert_eq!(room_b.remote_video_tracks("test-participant-1").len(), 0);
} else {
panic!("unexpected message");
}
cx.platform().quit();
})
.detach();
});
}
fn quit(_: &Quit, cx: &mut gpui::MutableAppContext) {
cx.platform().quit();
}

View file

@ -0,0 +1,10 @@
pub mod prod;
#[cfg(not(any(test, feature = "test-support")))]
pub use prod::*;
#[cfg(any(test, feature = "test-support"))]
mod test;
#[cfg(any(test, feature = "test-support"))]
pub use test::*;

View file

@ -0,0 +1,493 @@
use anyhow::{anyhow, Context, Result};
use core_foundation::{
array::{CFArray, CFArrayRef},
base::{CFRelease, CFRetain, TCFType},
string::{CFString, CFStringRef},
};
use futures::{
channel::{mpsc, oneshot},
Future,
};
pub use media::core_video::CVImageBuffer;
use media::core_video::CVImageBufferRef;
use parking_lot::Mutex;
use postage::watch;
use std::{
ffi::c_void,
sync::{Arc, Weak},
};
extern "C" {
fn LKRoomDelegateCreate(
callback_data: *mut c_void,
on_did_disconnect: extern "C" fn(callback_data: *mut c_void),
on_did_subscribe_to_remote_video_track: extern "C" fn(
callback_data: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
remote_track: *const c_void,
),
on_did_unsubscribe_from_remote_video_track: extern "C" fn(
callback_data: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
),
) -> *const c_void;
fn LKRoomCreate(delegate: *const c_void) -> *const c_void;
fn LKRoomConnect(
room: *const c_void,
url: CFStringRef,
token: CFStringRef,
callback: extern "C" fn(*mut c_void, CFStringRef),
callback_data: *mut c_void,
);
fn LKRoomDisconnect(room: *const c_void);
fn LKRoomPublishVideoTrack(
room: *const c_void,
track: *const c_void,
callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
callback_data: *mut c_void,
);
fn LKRoomUnpublishTrack(room: *const c_void, publication: *const c_void);
fn LKRoomVideoTracksForRemoteParticipant(
room: *const c_void,
participant_id: CFStringRef,
) -> CFArrayRef;
fn LKVideoRendererCreate(
callback_data: *mut c_void,
on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
on_drop: extern "C" fn(callback_data: *mut c_void),
) -> *const c_void;
fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
fn LKDisplaySources(
callback_data: *mut c_void,
callback: extern "C" fn(
callback_data: *mut c_void,
sources: CFArrayRef,
error: CFStringRef,
),
);
fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void;
}
pub type Sid = String;
#[derive(Clone, Eq, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connected { url: String, token: String },
}
pub struct Room {
native_room: *const c_void,
connection: Mutex<(
watch::Sender<ConnectionState>,
watch::Receiver<ConnectionState>,
)>,
remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
_delegate: RoomDelegate,
}
impl Room {
pub fn new() -> Arc<Self> {
Arc::new_cyclic(|weak_room| {
let delegate = RoomDelegate::new(weak_room.clone());
Self {
native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
remote_video_track_subscribers: Default::default(),
_delegate: delegate,
}
})
}
pub fn status(&self) -> watch::Receiver<ConnectionState> {
self.connection.lock().1.clone()
}
pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
let url = CFString::new(url);
let token = CFString::new(token);
let (did_connect, tx, rx) = Self::build_done_callback();
unsafe {
LKRoomConnect(
self.native_room,
url.as_concrete_TypeRef(),
token.as_concrete_TypeRef(),
did_connect,
tx,
)
}
let this = self.clone();
let url = url.to_string();
let token = token.to_string();
async move {
match rx.await.unwrap().context("error connecting to room") {
Ok(()) => {
*this.connection.lock().0.borrow_mut() =
ConnectionState::Connected { url, token };
Ok(())
}
Err(err) => Err(err),
}
}
}
fn did_disconnect(&self) {
*self.connection.lock().0.borrow_mut() = ConnectionState::Disconnected;
}
pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
unsafe {
let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
if sources.is_null() {
let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
} else {
let sources = CFArray::wrap_under_get_rule(sources)
.into_iter()
.map(|source| MacOSDisplay::new(*source))
.collect();
let _ = tx.send(Ok(sources));
}
}
}
let (tx, rx) = oneshot::channel();
unsafe {
LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
}
async move { rx.await.unwrap() }
}
pub fn publish_video_track(
self: &Arc<Self>,
track: &LocalVideoTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
let tx =
unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
if error.is_null() {
let _ = tx.send(Ok(LocalTrackPublication(publication)));
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
let _ = tx.send(Err(anyhow!(error)));
}
}
unsafe {
LKRoomPublishVideoTrack(
self.native_room,
track.0,
callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
);
}
async { rx.await.unwrap().context("error publishing video track") }
}
pub fn unpublish_track(&self, publication: LocalTrackPublication) {
unsafe {
LKRoomUnpublishTrack(self.native_room, publication.0);
}
}
pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
unsafe {
let tracks = LKRoomVideoTracksForRemoteParticipant(
self.native_room,
CFString::new(participant_id).as_concrete_TypeRef(),
);
if tracks.is_null() {
Vec::new()
} else {
let tracks = CFArray::wrap_under_get_rule(tracks);
tracks
.into_iter()
.map(|native_track| {
let native_track = *native_track;
let id =
CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
.to_string();
Arc::new(RemoteVideoTrack::new(
native_track,
id,
participant_id.into(),
))
})
.collect()
}
}
}
pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
let (tx, rx) = mpsc::unbounded();
self.remote_video_track_subscribers.lock().push(tx);
rx
}
fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
let track = Arc::new(track);
self.remote_video_track_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
.is_ok()
});
}
fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
self.remote_video_track_subscribers.lock().retain(|tx| {
tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
publisher_id: publisher_id.clone(),
track_id: track_id.clone(),
})
.is_ok()
});
}
fn build_done_callback() -> (
extern "C" fn(*mut c_void, CFStringRef),
*mut c_void,
oneshot::Receiver<Result<()>>,
) {
let (tx, rx) = oneshot::channel();
extern "C" fn done_callback(tx: *mut c_void, error: CFStringRef) {
let tx = unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<()>>) };
if error.is_null() {
let _ = tx.send(Ok(()));
} else {
let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
let _ = tx.send(Err(anyhow!(error)));
}
}
(
done_callback,
Box::into_raw(Box::new(tx)) as *mut c_void,
rx,
)
}
}
impl Drop for Room {
fn drop(&mut self) {
unsafe {
LKRoomDisconnect(self.native_room);
CFRelease(self.native_room);
}
}
}
struct RoomDelegate {
native_delegate: *const c_void,
weak_room: *const Room,
}
impl RoomDelegate {
fn new(weak_room: Weak<Room>) -> Self {
let weak_room = Weak::into_raw(weak_room);
let native_delegate = unsafe {
LKRoomDelegateCreate(
weak_room as *mut c_void,
Self::on_did_disconnect,
Self::on_did_subscribe_to_remote_video_track,
Self::on_did_unsubscribe_from_remote_video_track,
)
};
Self {
native_delegate,
weak_room,
}
}
extern "C" fn on_did_disconnect(room: *mut c_void) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
if let Some(room) = room.upgrade() {
room.did_disconnect();
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_did_subscribe_to_remote_video_track(
room: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
track: *const c_void,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
let track = RemoteVideoTrack::new(track, track_id, publisher_id);
if let Some(room) = room.upgrade() {
room.did_subscribe_to_remote_video_track(track);
}
let _ = Weak::into_raw(room);
}
extern "C" fn on_did_unsubscribe_from_remote_video_track(
room: *mut c_void,
publisher_id: CFStringRef,
track_id: CFStringRef,
) {
let room = unsafe { Weak::from_raw(room as *mut Room) };
let publisher_id = unsafe { CFString::wrap_under_get_rule(publisher_id).to_string() };
let track_id = unsafe { CFString::wrap_under_get_rule(track_id).to_string() };
if let Some(room) = room.upgrade() {
room.did_unsubscribe_from_remote_video_track(publisher_id, track_id);
}
let _ = Weak::into_raw(room);
}
}
impl Drop for RoomDelegate {
fn drop(&mut self) {
unsafe {
CFRelease(self.native_delegate);
let _ = Weak::from_raw(self.weak_room);
}
}
}
pub struct LocalVideoTrack(*const c_void);
impl LocalVideoTrack {
pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
Self(unsafe { LKCreateScreenShareTrackForDisplay(display.0) })
}
}
impl Drop for LocalVideoTrack {
fn drop(&mut self) {
unsafe { CFRelease(self.0) }
}
}
pub struct LocalTrackPublication(*const c_void);
impl Drop for LocalTrackPublication {
fn drop(&mut self) {
unsafe { CFRelease(self.0) }
}
}
#[derive(Debug)]
pub struct RemoteVideoTrack {
native_track: *const c_void,
sid: Sid,
publisher_id: String,
}
impl RemoteVideoTrack {
fn new(native_track: *const c_void, sid: Sid, publisher_id: String) -> Self {
unsafe {
CFRetain(native_track);
}
Self {
native_track,
sid,
publisher_id,
}
}
pub fn sid(&self) -> &str {
&self.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
}
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
unsafe {
let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
let buffer = CVImageBuffer::wrap_under_get_rule(frame);
let result = tx.try_broadcast(Frame(buffer));
let _ = Box::into_raw(tx);
match result {
Ok(_) => true,
Err(async_broadcast::TrySendError::Closed(_))
| Err(async_broadcast::TrySendError::Inactive(_)) => {
log::warn!("no active receiver for frame");
false
}
Err(async_broadcast::TrySendError::Full(_)) => {
log::warn!("skipping frame as receiver is not keeping up");
true
}
}
}
}
extern "C" fn on_drop(callback_data: *mut c_void) {
unsafe {
let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
}
}
let (tx, rx) = async_broadcast::broadcast(64);
unsafe {
let renderer = LKVideoRendererCreate(
Box::into_raw(Box::new(tx)) as *mut c_void,
on_frame,
on_drop,
);
LKVideoTrackAddRenderer(self.native_track, renderer);
rx
}
}
}
impl Drop for RemoteVideoTrack {
fn drop(&mut self) {
unsafe { CFRelease(self.native_track) }
}
}
pub enum RemoteVideoTrackUpdate {
Subscribed(Arc<RemoteVideoTrack>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
pub struct MacOSDisplay(*const c_void);
impl MacOSDisplay {
fn new(ptr: *const c_void) -> Self {
unsafe {
CFRetain(ptr);
}
Self(ptr)
}
}
impl Drop for MacOSDisplay {
fn drop(&mut self) {
unsafe { CFRelease(self.0) }
}
}
#[derive(Clone)]
pub struct Frame(CVImageBuffer);
impl Frame {
pub fn width(&self) -> usize {
self.0.width()
}
pub fn height(&self) -> usize {
self.0.height()
}
pub fn image(&self) -> CVImageBuffer {
self.0.clone()
}
}

View file

@ -0,0 +1,433 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use collections::HashMap;
use futures::Stream;
use gpui::executor::Background;
use lazy_static::lazy_static;
use live_kit_server::token;
use media::core_video::CVImageBuffer;
use parking_lot::Mutex;
use postage::watch;
use std::{future::Future, mem, sync::Arc};
lazy_static! {
static ref SERVERS: Mutex<HashMap<String, Arc<TestServer>>> = Default::default();
}
pub struct TestServer {
pub url: String,
pub api_key: String,
pub secret_key: String,
rooms: Mutex<HashMap<String, TestServerRoom>>,
background: Arc<Background>,
}
impl TestServer {
pub fn create(
url: String,
api_key: String,
secret_key: String,
background: Arc<Background>,
) -> Result<Arc<TestServer>> {
let mut servers = SERVERS.lock();
if servers.contains_key(&url) {
Err(anyhow!("a server with url {:?} already exists", url))
} else {
let server = Arc::new(TestServer {
url: url.clone(),
api_key,
secret_key,
rooms: Default::default(),
background,
});
servers.insert(url, server.clone());
Ok(server)
}
}
fn get(url: &str) -> Result<Arc<TestServer>> {
Ok(SERVERS
.lock()
.get(url)
.ok_or_else(|| anyhow!("no server found for url"))?
.clone())
}
pub fn teardown(&self) -> Result<()> {
SERVERS
.lock()
.remove(&self.url)
.ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?;
Ok(())
}
pub fn create_api_client(&self) -> TestApiClient {
TestApiClient {
url: self.url.clone(),
}
}
async fn create_room(&self, room: String) -> Result<()> {
self.background.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
if server_rooms.contains_key(&room) {
Err(anyhow!("room {:?} already exists", room))
} else {
server_rooms.insert(room, Default::default());
Ok(())
}
}
async fn delete_room(&self, room: String) -> Result<()> {
// TODO: clear state associated with all `Room`s.
self.background.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
server_rooms
.remove(&room)
.ok_or_else(|| anyhow!("room {:?} does not exist", room))?;
Ok(())
}
async fn join_room(&self, token: String, client_room: Arc<Room>) -> Result<()> {
self.background.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
if room.client_rooms.contains_key(&identity) {
Err(anyhow!(
"{:?} attempted to join room {:?} twice",
identity,
room_name
))
} else {
room.client_rooms.insert(identity, client_room);
Ok(())
}
}
async fn leave_room(&self, token: String) -> Result<()> {
self.background.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
room.client_rooms.remove(&identity).ok_or_else(|| {
anyhow!(
"{:?} attempted to leave room {:?} before joining it",
identity,
room_name
)
})?;
Ok(())
}
async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> {
// TODO: clear state associated with the `Room`.
self.background.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
room.client_rooms.remove(&identity).ok_or_else(|| {
anyhow!(
"participant {:?} did not join room {:?}",
identity,
room_name
)
})?;
Ok(())
}
pub async fn disconnect_client(&self, client_identity: String) {
self.background.simulate_random_delay().await;
let mut server_rooms = self.rooms.lock();
for room in server_rooms.values_mut() {
if let Some(room) = room.client_rooms.remove(&client_identity) {
*room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected;
}
}
}
async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
self.background.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
sid: nanoid::nanoid!(17),
publisher_id: identity.clone(),
frames_rx: local_track.frames_rx.clone(),
}));
for (id, client_room) in &room.client_rooms {
if *id != identity {
let _ = client_room
.0
.lock()
.video_track_updates
.0
.try_broadcast(update.clone())
.unwrap();
}
}
Ok(())
}
}
#[derive(Default)]
struct TestServerRoom {
client_rooms: HashMap<Sid, Arc<Room>>,
}
impl TestServerRoom {}
pub struct TestApiClient {
url: String,
}
#[async_trait]
impl live_kit_server::api::Client for TestApiClient {
fn url(&self) -> &str {
&self.url
}
async fn create_room(&self, name: String) -> Result<()> {
let server = TestServer::get(&self.url)?;
server.create_room(name).await?;
Ok(())
}
async fn delete_room(&self, name: String) -> Result<()> {
let server = TestServer::get(&self.url)?;
server.delete_room(name).await?;
Ok(())
}
async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
let server = TestServer::get(&self.url)?;
server.remove_participant(room, identity).await?;
Ok(())
}
fn room_token(&self, room: &str, identity: &str) -> Result<String> {
let server = TestServer::get(&self.url)?;
token::create(
&server.api_key,
&server.secret_key,
Some(identity),
token::VideoGrant::to_join(room),
)
}
}
pub type Sid = String;
struct RoomState {
connection: (
watch::Sender<ConnectionState>,
watch::Receiver<ConnectionState>,
),
display_sources: Vec<MacOSDisplay>,
video_track_updates: (
async_broadcast::Sender<RemoteVideoTrackUpdate>,
async_broadcast::Receiver<RemoteVideoTrackUpdate>,
),
}
#[derive(Clone, Eq, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connected { url: String, token: String },
}
pub struct Room(Mutex<RoomState>);
impl Room {
pub fn new() -> Arc<Self> {
Arc::new(Self(Mutex::new(RoomState {
connection: watch::channel_with(ConnectionState::Disconnected),
display_sources: Default::default(),
video_track_updates: async_broadcast::broadcast(128),
})))
}
pub fn status(&self) -> watch::Receiver<ConnectionState> {
self.0.lock().connection.1.clone()
}
pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
let this = self.clone();
let url = url.to_string();
let token = token.to_string();
async move {
let server = TestServer::get(&url)?;
server.join_room(token.clone(), this.clone()).await?;
*this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
Ok(())
}
}
pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
let this = self.clone();
async move {
let server = this.test_server();
server.background.simulate_random_delay().await;
Ok(this.0.lock().display_sources.clone())
}
}
pub fn publish_video_track(
self: &Arc<Self>,
track: &LocalVideoTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let this = self.clone();
let track = track.clone();
async move {
this.test_server()
.publish_video_track(this.token(), track)
.await?;
Ok(LocalTrackPublication)
}
}
pub fn unpublish_track(&self, _: LocalTrackPublication) {}
pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
Default::default()
}
pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
self.0.lock().video_track_updates.1.clone()
}
pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
self.0.lock().display_sources = sources;
}
fn test_server(&self) -> Arc<TestServer> {
match self.0.lock().connection.1.borrow().clone() {
ConnectionState::Disconnected => panic!("must be connected to call this method"),
ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(),
}
}
fn token(&self) -> String {
match self.0.lock().connection.1.borrow().clone() {
ConnectionState::Disconnected => panic!("must be connected to call this method"),
ConnectionState::Connected { token, .. } => token,
}
}
}
impl Drop for Room {
fn drop(&mut self) {
if let ConnectionState::Connected { token, .. } = mem::replace(
&mut *self.0.lock().connection.0.borrow_mut(),
ConnectionState::Disconnected,
) {
if let Ok(server) = TestServer::get(&token) {
let background = server.background.clone();
background
.spawn(async move { server.leave_room(token).await.unwrap() })
.detach();
}
}
}
}
pub struct LocalTrackPublication;
#[derive(Clone)]
pub struct LocalVideoTrack {
frames_rx: async_broadcast::Receiver<Frame>,
}
impl LocalVideoTrack {
pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
Self {
frames_rx: display.frames.1.clone(),
}
}
}
pub struct RemoteVideoTrack {
sid: Sid,
publisher_id: Sid,
frames_rx: async_broadcast::Receiver<Frame>,
}
impl RemoteVideoTrack {
pub fn sid(&self) -> &str {
&self.sid
}
pub fn publisher_id(&self) -> &str {
&self.publisher_id
}
pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
self.frames_rx.clone()
}
}
#[derive(Clone)]
pub enum RemoteVideoTrackUpdate {
Subscribed(Arc<RemoteVideoTrack>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
#[derive(Clone)]
pub struct MacOSDisplay {
frames: (
async_broadcast::Sender<Frame>,
async_broadcast::Receiver<Frame>,
),
}
impl MacOSDisplay {
pub fn new() -> Self {
Self {
frames: async_broadcast::broadcast(128),
}
}
pub fn send_frame(&self, frame: Frame) {
self.frames.0.try_broadcast(frame).unwrap();
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Frame {
pub label: String,
pub width: usize,
pub height: usize,
}
impl Frame {
pub fn width(&self) -> usize {
self.width
}
pub fn height(&self) -> usize {
self.height
}
pub fn image(&self) -> CVImageBuffer {
unimplemented!("you can't call this in test mode")
}
}

View file

@ -0,0 +1,25 @@
[package]
name = "live_kit_server"
version = "0.1.0"
edition = "2021"
description = "SDK for the LiveKit server API"
[lib]
path = "src/live_kit_server.rs"
doctest = false
[dependencies]
anyhow = "1.0.38"
async-trait = "0.1"
futures = "0.3"
hmac = "0.12"
log = "0.4"
jwt = "0.16"
prost = "0.8"
prost-types = "0.8"
reqwest = "0.11"
serde = { version = "1.0", features = ["derive", "rc"] }
sha2 = "0.10"
[build-dependencies]
prost-build = "0.9"

View file

@ -0,0 +1,5 @@
fn main() {
prost_build::Config::new()
.compile_protos(&["protocol/livekit_room.proto"], &["protocol"])
.unwrap();
}

@ -0,0 +1 @@
Subproject commit 8645a138fb2ea72c4dab13e739b1f3c9ea29ac84

View file

@ -0,0 +1,141 @@
use crate::{proto, token};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use prost::Message;
use reqwest::header::CONTENT_TYPE;
use std::{future::Future, sync::Arc, time::Duration};
#[async_trait]
pub trait Client: Send + Sync {
fn url(&self) -> &str;
async fn create_room(&self, name: String) -> Result<()>;
async fn delete_room(&self, name: String) -> Result<()>;
async fn remove_participant(&self, room: String, identity: String) -> Result<()>;
fn room_token(&self, room: &str, identity: &str) -> Result<String>;
}
#[derive(Clone)]
pub struct LiveKitClient {
http: reqwest::Client,
url: Arc<str>,
key: Arc<str>,
secret: Arc<str>,
}
impl LiveKitClient {
pub fn new(mut url: String, key: String, secret: String) -> Self {
if url.ends_with('/') {
url.pop();
}
Self {
http: reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(5))
.build()
.unwrap(),
url: url.into(),
key: key.into(),
secret: secret.into(),
}
}
fn request<Req, Res>(
&self,
path: &str,
grant: token::VideoGrant,
body: Req,
) -> impl Future<Output = Result<Res>>
where
Req: Message,
Res: Default + Message,
{
let client = self.http.clone();
let token = token::create(&self.key, &self.secret, None, grant);
let url = format!("{}/{}", self.url, path);
log::info!("Request {}: {:?}", url, body);
async move {
let token = token?;
let response = client
.post(&url)
.header(CONTENT_TYPE, "application/protobuf")
.bearer_auth(token)
.body(body.encode_to_vec())
.send()
.await?;
if response.status().is_success() {
log::info!("Response {}: {:?}", url, response.status());
Ok(Res::decode(response.bytes().await?)?)
} else {
log::error!("Response {}: {:?}", url, response.status());
Err(anyhow!(
"POST {} failed with status code {:?}, {:?}",
url,
response.status(),
response.text().await
))
}
}
}
}
#[async_trait]
impl Client for LiveKitClient {
fn url(&self) -> &str {
&self.url
}
async fn create_room(&self, name: String) -> Result<()> {
let _: proto::Room = self
.request(
"twirp/livekit.RoomService/CreateRoom",
token::VideoGrant {
room_create: Some(true),
..Default::default()
},
proto::CreateRoomRequest {
name,
..Default::default()
},
)
.await?;
Ok(())
}
async fn delete_room(&self, name: String) -> Result<()> {
let _: proto::DeleteRoomResponse = self
.request(
"twirp/livekit.RoomService/DeleteRoom",
token::VideoGrant {
room_create: Some(true),
..Default::default()
},
proto::DeleteRoomRequest { room: name },
)
.await?;
Ok(())
}
async fn remove_participant(&self, room: String, identity: String) -> Result<()> {
let _: proto::RemoveParticipantResponse = self
.request(
"twirp/livekit.RoomService/RemoveParticipant",
token::VideoGrant::to_admin(&room),
proto::RoomParticipantIdentity {
room: room.clone(),
identity,
},
)
.await?;
Ok(())
}
fn room_token(&self, room: &str, identity: &str) -> Result<String> {
token::create(
&self.key,
&self.secret,
Some(identity),
token::VideoGrant::to_join(room),
)
}
}

View file

@ -0,0 +1,3 @@
pub mod api;
mod proto;
pub mod token;

View file

@ -0,0 +1 @@
include!(concat!(env!("OUT_DIR"), "/livekit.rs"));

View file

@ -0,0 +1,97 @@
use anyhow::{anyhow, Result};
use hmac::{Hmac, Mac};
use jwt::{SignWithKey, VerifyWithKey};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::{
borrow::Cow,
ops::Add,
time::{Duration, SystemTime, UNIX_EPOCH},
};
static DEFAULT_TTL: Duration = Duration::from_secs(6 * 60 * 60); // 6 hours
#[derive(Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ClaimGrants<'a> {
pub iss: Cow<'a, str>,
pub sub: Option<Cow<'a, str>>,
pub iat: u64,
pub exp: u64,
pub nbf: u64,
pub jwtid: Option<Cow<'a, str>>,
pub video: VideoGrant<'a>,
}
#[derive(Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct VideoGrant<'a> {
pub room_create: Option<bool>,
pub room_join: Option<bool>,
pub room_list: Option<bool>,
pub room_record: Option<bool>,
pub room_admin: Option<bool>,
pub room: Option<Cow<'a, str>>,
pub can_publish: Option<bool>,
pub can_subscribe: Option<bool>,
pub can_publish_data: Option<bool>,
pub hidden: Option<bool>,
pub recorder: Option<bool>,
}
impl<'a> VideoGrant<'a> {
pub fn to_admin(room: &'a str) -> Self {
Self {
room_admin: Some(true),
room: Some(Cow::Borrowed(room)),
..Default::default()
}
}
pub fn to_join(room: &'a str) -> Self {
Self {
room: Some(Cow::Borrowed(room)),
room_join: Some(true),
can_publish: Some(true),
can_subscribe: Some(true),
..Default::default()
}
}
}
pub fn create(
api_key: &str,
secret_key: &str,
identity: Option<&str>,
video_grant: VideoGrant,
) -> Result<String> {
if video_grant.room_join.is_some() && identity.is_none() {
Err(anyhow!(
"identity is required for room_join grant, but it is none"
))?;
}
let secret_key: Hmac<Sha256> = Hmac::new_from_slice(secret_key.as_bytes())?;
let now = SystemTime::now();
let claims = ClaimGrants {
iss: Cow::Borrowed(api_key),
sub: identity.map(Cow::Borrowed),
iat: now.duration_since(UNIX_EPOCH).unwrap().as_secs(),
exp: now
.add(DEFAULT_TTL)
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
nbf: 0,
jwtid: identity.map(Cow::Borrowed),
video: video_grant,
};
Ok(claims.sign_with_key(&secret_key)?)
}
pub fn validate<'a>(token: &'a str, secret_key: &str) -> Result<ClaimGrants<'a>> {
let secret_key: Hmac<Sha256> = Hmac::new_from_slice(secret_key.as_bytes())?;
Ok(token.verify_with_key(&secret_key)?)
}

View file

@ -145,7 +145,8 @@ message Test {
message CreateRoom {}
message CreateRoomResponse {
uint64 id = 1;
Room room = 1;
optional LiveKitConnectionInfo live_kit_connection_info = 2;
}
message JoinRoom {
@ -154,6 +155,7 @@ message JoinRoom {
message JoinRoomResponse {
Room room = 1;
optional LiveKitConnectionInfo live_kit_connection_info = 2;
}
message LeaveRoom {
@ -161,8 +163,10 @@ message LeaveRoom {
}
message Room {
repeated Participant participants = 1;
repeated uint64 pending_participant_user_ids = 2;
uint64 id = 1;
repeated Participant participants = 2;
repeated uint64 pending_participant_user_ids = 3;
string live_kit_room = 4;
}
message Participant {
@ -226,6 +230,11 @@ message RoomUpdated {
Room room = 1;
}
message LiveKitConnectionInfo {
string server_url = 1;
string token = 2;
}
message ShareProject {
uint64 room_id = 1;
repeated WorktreeMetadata worktrees = 2;

View file

@ -6,4 +6,4 @@ pub use conn::Connection;
pub use peer::*;
mod macros;
pub const PROTOCOL_VERSION: u32 = 38;
pub const PROTOCOL_VERSION: u32 = 39;

View file

@ -79,6 +79,7 @@ pub struct Titlebar {
pub sign_in_prompt: Interactive<ContainedText>,
pub outdated_warning: ContainedText,
pub share_button: Interactive<ContainedText>,
pub call_control: Interactive<IconButton>,
pub toggle_contacts_button: Interactive<IconButton>,
pub toggle_contacts_badge: ContainerStyle,
}
@ -119,6 +120,7 @@ pub struct ContactList {
pub struct ProjectRow {
#[serde(flatten)]
pub container: ContainerStyle,
pub icon: Icon,
pub name: ContainedText,
}
@ -380,7 +382,6 @@ pub struct Icon {
pub container: ContainerStyle,
pub color: Color,
pub width: f32,
pub path: String,
}
#[derive(Deserialize, Clone, Copy, Default)]

View file

@ -1,6 +1,6 @@
use crate::{FollowerStatesByLeader, JoinProject, Pane, Workspace};
use anyhow::{anyhow, Result};
use call::ActiveCall;
use call::{ActiveCall, ParticipantLocation};
use gpui::{
elements::*, Axis, Border, CursorStyle, ModelHandle, MouseButton, RenderContext, ViewHandle,
};
@ -130,18 +130,21 @@ impl Member {
Some((collaborator.replica_id, participant))
});
let mut border = Border::default();
let prompt = if let Some((replica_id, leader)) = leader {
let leader_color = theme.editor.replica_selection_style(replica_id).cursor;
border = Border::all(theme.workspace.leader_border_width, leader_color);
let border = if let Some((replica_id, _)) = leader.as_ref() {
let leader_color = theme.editor.replica_selection_style(*replica_id).cursor;
let mut border = Border::all(theme.workspace.leader_border_width, leader_color);
border
.color
.fade_out(1. - theme.workspace.leader_border_opacity);
border.overlay = true;
border
} else {
Border::default()
};
let prompt = if let Some((_, leader)) = leader {
match leader.location {
call::ParticipantLocation::SharedProject {
ParticipantLocation::SharedProject {
project_id: leader_project_id,
} => {
if Some(leader_project_id) == project.read(cx).remote_id() {
@ -186,7 +189,7 @@ impl Member {
)
}
}
call::ParticipantLocation::UnsharedProject => Some(
ParticipantLocation::UnsharedProject => Some(
Label::new(
format!(
"{} is viewing an unshared Zed project",
@ -201,7 +204,7 @@ impl Member {
.right()
.boxed(),
),
call::ParticipantLocation::External => Some(
ParticipantLocation::External => Some(
Label::new(
format!(
"{} is viewing a window outside of Zed",

View file

@ -0,0 +1,181 @@
use crate::{Item, ItemNavHistory};
use anyhow::{anyhow, Result};
use call::participant::{Frame, RemoteVideoTrack};
use client::{PeerId, User};
use futures::StreamExt;
use gpui::{
elements::*,
geometry::{rect::RectF, vector::vec2f},
Entity, ModelHandle, MouseButton, RenderContext, Task, View, ViewContext,
};
use smallvec::SmallVec;
use std::{
path::PathBuf,
sync::{Arc, Weak},
};
pub enum Event {
Close,
}
pub struct SharedScreen {
track: Weak<RemoteVideoTrack>,
frame: Option<Frame>,
pub peer_id: PeerId,
user: Arc<User>,
nav_history: Option<ItemNavHistory>,
_maintain_frame: Task<()>,
}
impl SharedScreen {
pub fn new(
track: &Arc<RemoteVideoTrack>,
peer_id: PeerId,
user: Arc<User>,
cx: &mut ViewContext<Self>,
) -> Self {
let mut frames = track.frames();
Self {
track: Arc::downgrade(track),
frame: None,
peer_id,
user,
nav_history: Default::default(),
_maintain_frame: cx.spawn(|this, mut cx| async move {
while let Some(frame) = frames.next().await {
this.update(&mut cx, |this, cx| {
this.frame = Some(frame);
cx.notify();
})
}
this.update(&mut cx, |_, cx| cx.emit(Event::Close));
}),
}
}
}
impl Entity for SharedScreen {
type Event = Event;
}
impl View for SharedScreen {
fn ui_name() -> &'static str {
"SharedScreen"
}
fn render(&mut self, cx: &mut RenderContext<Self>) -> ElementBox {
enum Focus {}
let frame = self.frame.clone();
MouseEventHandler::<Focus>::new(0, cx, |_, _| {
Canvas::new(move |bounds, _, cx| {
if let Some(frame) = frame.clone() {
let size = constrain_size_preserving_aspect_ratio(
bounds.size(),
vec2f(frame.width() as f32, frame.height() as f32),
);
let origin = bounds.origin() + (bounds.size() / 2.) - size / 2.;
cx.scene.push_surface(gpui::mac::Surface {
bounds: RectF::new(origin, size),
image_buffer: frame.image(),
});
}
})
.boxed()
})
.on_down(MouseButton::Left, |_, cx| cx.focus_parent_view())
.boxed()
}
}
impl Item for SharedScreen {
fn deactivated(&mut self, cx: &mut ViewContext<Self>) {
if let Some(nav_history) = self.nav_history.as_ref() {
nav_history.push::<()>(None, cx);
}
}
fn tab_content(
&self,
_: Option<usize>,
style: &theme::Tab,
_: &gpui::AppContext,
) -> gpui::ElementBox {
Flex::row()
.with_child(
Svg::new("icons/disable_screen_sharing_12.svg")
.with_color(style.label.text.color)
.constrained()
.with_width(style.icon_width)
.aligned()
.contained()
.with_margin_right(style.spacing)
.boxed(),
)
.with_child(
Label::new(
format!("{}'s screen", self.user.github_login),
style.label.clone(),
)
.aligned()
.boxed(),
)
.boxed()
}
fn project_path(&self, _: &gpui::AppContext) -> Option<project::ProjectPath> {
Default::default()
}
fn project_entry_ids(&self, _: &gpui::AppContext) -> SmallVec<[project::ProjectEntryId; 3]> {
Default::default()
}
fn is_singleton(&self, _: &gpui::AppContext) -> bool {
false
}
fn set_nav_history(&mut self, history: ItemNavHistory, _: &mut ViewContext<Self>) {
self.nav_history = Some(history);
}
fn clone_on_split(&self, cx: &mut ViewContext<Self>) -> Option<Self> {
let track = self.track.upgrade()?;
Some(Self::new(&track, self.peer_id, self.user.clone(), cx))
}
fn can_save(&self, _: &gpui::AppContext) -> bool {
false
}
fn save(
&mut self,
_: ModelHandle<project::Project>,
_: &mut ViewContext<Self>,
) -> Task<Result<()>> {
Task::ready(Err(anyhow!("Item::save called on SharedScreen")))
}
fn save_as(
&mut self,
_: ModelHandle<project::Project>,
_: PathBuf,
_: &mut ViewContext<Self>,
) -> Task<Result<()>> {
Task::ready(Err(anyhow!("Item::save_as called on SharedScreen")))
}
fn reload(
&mut self,
_: ModelHandle<project::Project>,
_: &mut ViewContext<Self>,
) -> Task<Result<()>> {
Task::ready(Err(anyhow!("Item::reload called on SharedScreen")))
}
fn to_item_events(event: &Self::Event) -> Vec<crate::ItemEvent> {
match event {
Event::Close => vec![crate::ItemEvent::CloseItem],
}
}
}

View file

@ -6,6 +6,7 @@ pub mod dock;
pub mod pane;
pub mod pane_group;
pub mod searchable;
pub mod shared_screen;
pub mod sidebar;
mod status_bar;
mod toolbar;
@ -36,6 +37,7 @@ use project::{Project, ProjectEntryId, ProjectPath, ProjectStore, Worktree, Work
use searchable::SearchableItemHandle;
use serde::Deserialize;
use settings::{Autosave, DockAnchor, Settings};
use shared_screen::SharedScreen;
use sidebar::{Sidebar, SidebarButtons, SidebarSide, ToggleSidebarItem};
use smallvec::SmallVec;
use status_bar::StatusBar;
@ -119,12 +121,18 @@ pub struct JoinProject {
pub follow_user_id: u64,
}
#[derive(Clone, PartialEq)]
pub struct OpenSharedScreen {
pub peer_id: PeerId,
}
impl_internal_actions!(
workspace,
[
OpenPaths,
ToggleFollow,
JoinProject,
OpenSharedScreen,
RemoveWorktreeFromProject
]
);
@ -164,6 +172,7 @@ pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
cx.add_async_action(Workspace::follow_next_collaborator);
cx.add_async_action(Workspace::close);
cx.add_async_action(Workspace::save_all);
cx.add_action(Workspace::open_shared_screen);
cx.add_action(Workspace::add_folder_to_project);
cx.add_action(Workspace::remove_folder_from_project);
cx.add_action(
@ -983,9 +992,8 @@ pub struct Workspace {
follower_states_by_leader: FollowerStatesByLeader,
last_leaders_by_pane: HashMap<WeakViewHandle<Pane>, PeerId>,
window_edited: bool,
active_call: Option<ModelHandle<ActiveCall>>,
active_call: Option<(ModelHandle<ActiveCall>, Vec<gpui::Subscription>)>,
_observe_current_user: Task<()>,
_active_call_observation: Option<gpui::Subscription>,
}
#[derive(Default)]
@ -1095,11 +1103,11 @@ impl Workspace {
});
let mut active_call = None;
let mut active_call_observation = None;
if cx.has_global::<ModelHandle<ActiveCall>>() {
let call = cx.global::<ModelHandle<ActiveCall>>().clone();
active_call_observation = Some(cx.observe(&call, |_, _, cx| cx.notify()));
active_call = Some(call);
let mut subscriptions = Vec::new();
subscriptions.push(cx.subscribe(&call, Self::on_active_call_event));
active_call = Some((call, subscriptions));
}
let mut this = Workspace {
@ -1130,7 +1138,6 @@ impl Workspace {
window_edited: false,
active_call,
_observe_current_user,
_active_call_observation: active_call_observation,
};
this.project_remote_id_changed(this.project.read(cx).remote_id(), cx);
cx.defer(|this, cx| this.update_window_title(cx));
@ -1265,7 +1272,7 @@ impl Workspace {
quitting: bool,
cx: &mut ViewContext<Self>,
) -> Task<Result<bool>> {
let active_call = self.active_call.clone();
let active_call = self.active_call().cloned();
let window_id = cx.window_id();
let workspace_count = cx
.window_ids()
@ -1788,6 +1795,15 @@ impl Workspace {
item
}
pub fn open_shared_screen(&mut self, action: &OpenSharedScreen, cx: &mut ViewContext<Self>) {
if let Some(shared_screen) =
self.shared_screen_for_peer(action.peer_id, &self.active_pane, cx)
{
let pane = self.active_pane.clone();
Pane::add_item(self, &pane, Box::new(shared_screen), false, true, None, cx);
}
}
pub fn activate_item(&mut self, item: &dyn ItemHandle, cx: &mut ViewContext<Self>) -> bool {
let result = self.panes.iter().find_map(|pane| {
pane.read(cx)
@ -2512,13 +2528,33 @@ impl Workspace {
}
fn leader_updated(&mut self, leader_id: PeerId, cx: &mut ViewContext<Self>) -> Option<()> {
cx.notify();
let call = self.active_call()?;
let room = call.read(cx).room()?.read(cx);
let participant = room.remote_participants().get(&leader_id)?;
let mut items_to_add = Vec::new();
for (pane, state) in self.follower_states_by_leader.get(&leader_id)? {
if let Some(FollowerItem::Loaded(item)) = state
.active_view_id
.and_then(|id| state.items_by_leader_view_id.get(&id))
{
items_to_add.push((pane.clone(), item.boxed_clone()));
match participant.location {
call::ParticipantLocation::SharedProject { project_id } => {
if Some(project_id) == self.project.read(cx).remote_id() {
for (pane, state) in self.follower_states_by_leader.get(&leader_id)? {
if let Some(FollowerItem::Loaded(item)) = state
.active_view_id
.and_then(|id| state.items_by_leader_view_id.get(&id))
{
items_to_add.push((pane.clone(), item.boxed_clone()));
}
}
}
}
call::ParticipantLocation::UnsharedProject => {}
call::ParticipantLocation::External => {
for (pane, _) in self.follower_states_by_leader.get(&leader_id)? {
if let Some(shared_screen) = self.shared_screen_for_peer(leader_id, pane, cx) {
items_to_add.push((pane.clone(), Box::new(shared_screen)));
}
}
}
}
@ -2527,11 +2563,32 @@ impl Workspace {
if pane == self.active_pane {
pane.update(cx, |pane, cx| pane.focus_active_item(cx));
}
cx.notify();
}
None
}
fn shared_screen_for_peer(
&self,
peer_id: PeerId,
pane: &ViewHandle<Pane>,
cx: &mut ViewContext<Self>,
) -> Option<ViewHandle<SharedScreen>> {
let call = self.active_call()?;
let room = call.read(cx).room()?.read(cx);
let participant = room.remote_participants().get(&peer_id)?;
let track = participant.tracks.values().next()?.clone();
let user = participant.user.clone();
for item in pane.read(cx).items_of_type::<SharedScreen>() {
if item.read(cx).peer_id == peer_id {
return Some(item);
}
}
Some(cx.add_view(|cx| SharedScreen::new(&track, peer_id, user.clone(), cx)))
}
pub fn on_window_activation_changed(&mut self, active: bool, cx: &mut ViewContext<Self>) {
if !active {
for pane in &self.panes {
@ -2552,6 +2609,25 @@ impl Workspace {
}
}
}
fn active_call(&self) -> Option<&ModelHandle<ActiveCall>> {
self.active_call.as_ref().map(|(call, _)| call)
}
fn on_active_call_event(
&mut self,
_: ModelHandle<ActiveCall>,
event: &call::room::Event,
cx: &mut ViewContext<Self>,
) {
match event {
call::room::Event::ParticipantLocationChanged { participant_id }
| call::room::Event::RemoteVideoTracksChanged { participant_id } => {
self.leader_updated(*participant_id, cx);
}
_ => {}
}
}
}
impl Entity for Workspace {
@ -2593,7 +2669,7 @@ impl View for Workspace {
&project,
&theme,
&self.follower_states_by_leader,
self.active_call.as_ref(),
self.active_call(),
cx,
))
.flex(1., true)

View file

@ -127,4 +127,4 @@ unindent = "0.1.7"
icon = ["app-icon@2x.png", "app-icon.png"]
identifier = "dev.zed.Zed"
name = "Zed"
osx_minimum_system_version = "10.14"
osx_minimum_system_version = "10.15.7"

View file

@ -1,7 +1,7 @@
use std::process::Command;
fn main() {
println!("cargo:rustc-env=MACOSX_DEPLOYMENT_TARGET=10.14");
println!("cargo:rustc-env=MACOSX_DEPLOYMENT_TARGET=10.15.7");
if let Ok(api_key) = std::env::var("ZED_MIXPANEL_TOKEN") {
println!("cargo:rustc-env=ZED_MIXPANEL_TOKEN={api_key}");
@ -10,6 +10,20 @@ fn main() {
println!("cargo:rustc-env=ZED_AMPLITUDE_API_KEY={api_key}");
}
if std::env::var("ZED_BUNDLE").ok().as_deref() == Some("true") {
// Find WebRTC.framework in the Frameworks folder when running as part of an application bundle.
println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/../Frameworks");
} else {
// Find WebRTC.framework as a sibling of the executable when running outside of an application bundle.
println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path");
}
// Seems to be required to enable Swift concurrency
println!("cargo:rustc-link-arg=-Wl,-rpath,/usr/lib/swift");
// Register exported Objective-C selectors, protocols, etc
println!("cargo:rustc-link-arg=-Wl,-ObjC");
let output = Command::new("npm")
.current_dir("../../styles")
.args(["install", "--no-save"])