Switch fully to Rust Livekit (redux) (#27126)

Swift bindings BEGONE

Release Notes:

- Switched from using the Swift LiveKit bindings, to the Rust bindings,
fixing https://github.com/zed-industries/zed/issues/9396, a crash when
leaving a collaboration session, and making Zed easier to build.

---------

Co-authored-by: Conrad Irwin <conrad.irwin@gmail.com>
Co-authored-by: Michael Sloan <michael@zed.dev>
This commit is contained in:
Mikayla Maki 2025-03-28 10:58:23 -07:00 committed by GitHub
parent c8fb95cd1b
commit 8a307e7b89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
68 changed files with 2393 additions and 7579 deletions

View file

@ -18,7 +18,6 @@ test-support = [
"collections/test-support",
"gpui/test-support",
"livekit_client/test-support",
"livekit_client_macos/test-support",
"project/test-support",
"util/test-support"
]
@ -41,11 +40,7 @@ serde_derive.workspace = true
settings.workspace = true
telemetry.workspace = true
util.workspace = true
[target.'cfg(target_os = "macos")'.dependencies]
livekit_client_macos.workspace = true
[target.'cfg(not(target_os = "macos"))'.dependencies]
gpui_tokio.workspace = true
livekit_client.workspace = true
[dev-dependencies]
@ -57,9 +52,4 @@ language = { workspace = true, features = ["test-support"] }
project = { workspace = true, features = ["test-support"] }
util = { workspace = true, features = ["test-support"] }
http_client = { workspace = true, features = ["test-support"] }
[target.'cfg(target_os = "macos")'.dev-dependencies]
livekit_client_macos = { workspace = true, features = ["test-support"] }
[target.'cfg(not(target_os = "macos"))'.dev-dependencies]
livekit_client = { workspace = true, features = ["test-support"] }

View file

@ -1,13 +1,5 @@
pub mod call_settings;
#[cfg(target_os = "macos")]
mod macos;
mod call_impl;
#[cfg(target_os = "macos")]
pub use macos::*;
#[cfg(not(target_os = "macos"))]
mod cross_platform;
#[cfg(not(target_os = "macos"))]
pub use cross_platform::*;
pub use call_impl::*;

View file

@ -17,9 +17,7 @@ use room::Event;
use settings::Settings;
use std::sync::Arc;
pub use livekit_client::{
track::RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent,
};
pub use livekit_client::{RemoteVideoTrack, RemoteVideoTrackView, RemoteVideoTrackViewEvent};
pub use participant::ParticipantLocation;
pub use room::Room;
@ -28,10 +26,6 @@ struct GlobalActiveCall(Entity<ActiveCall>);
impl Global for GlobalActiveCall {}
pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
livekit_client::init(
cx.background_executor().dispatcher.clone(),
cx.http_client(),
);
CallSettings::register(cx);
let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));

View file

@ -1,13 +1,14 @@
use anyhow::{anyhow, Result};
use client::ParticipantIndex;
use client::{proto, User};
use client::{proto, ParticipantIndex, User};
use collections::HashMap;
use gpui::WeakEntity;
pub use livekit_client_macos::Frame;
pub use livekit_client_macos::{RemoteAudioTrack, RemoteVideoTrack};
use livekit_client::AudioStream;
use project::Project;
use std::sync::Arc;
pub use livekit_client::TrackSid;
pub use livekit_client::{RemoteAudioTrack, RemoteVideoTrack};
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ParticipantLocation {
SharedProject { project_id: u64 },
@ -48,7 +49,6 @@ impl LocalParticipant {
}
}
#[derive(Clone, Debug)]
pub struct RemoteParticipant {
pub user: Arc<User>,
pub peer_id: proto::PeerId,
@ -58,13 +58,13 @@ pub struct RemoteParticipant {
pub participant_index: ParticipantIndex,
pub muted: bool,
pub speaking: bool,
pub video_tracks: HashMap<livekit_client_macos::Sid, Arc<RemoteVideoTrack>>,
pub audio_tracks: HashMap<livekit_client_macos::Sid, Arc<RemoteAudioTrack>>,
pub video_tracks: HashMap<TrackSid, RemoteVideoTrack>,
pub audio_tracks: HashMap<TrackSid, (RemoteAudioTrack, AudioStream)>,
}
impl RemoteParticipant {
pub fn has_video_tracks(&self) -> bool {
!self.video_tracks.is_empty()
return !self.video_tracks.is_empty();
}
pub fn can_write(&self) -> bool {

View file

@ -1,5 +1,3 @@
#![cfg_attr(all(target_os = "windows", target_env = "gnu"), allow(unused))]
use crate::{
call_settings::CallSettings,
participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
@ -14,20 +12,10 @@ use collections::{BTreeMap, HashMap, HashSet};
use fs::Fs;
use futures::{FutureExt, StreamExt};
use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
use gpui_tokio::Tokio;
use language::LanguageRegistry;
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
use livekit::{
capture_local_audio_track, capture_local_video_track,
id::ParticipantIdentity,
options::{TrackPublishOptions, VideoCodec},
play_remote_audio_track,
publication::LocalTrackPublication,
track::{TrackKind, TrackSource},
RoomEvent, RoomOptions,
};
#[cfg(all(target_os = "windows", target_env = "gnu"))]
use livekit::{publication::LocalTrackPublication, RoomEvent};
use livekit_client as livekit;
use livekit::{LocalTrackPublication, ParticipantIdentity, RoomEvent};
use livekit_client::{self as livekit, TrackSid};
use postage::{sink::Sink, stream::Stream, watch};
use project::Project;
use settings::Settings as _;
@ -47,6 +35,9 @@ pub enum Event {
RemoteVideoTracksChanged {
participant_id: proto::PeerId,
},
RemoteVideoTrackUnsubscribed {
sid: TrackSid,
},
RemoteAudioTracksChanged {
participant_id: proto::PeerId,
},
@ -104,11 +95,7 @@ impl Room {
!self.shared_projects.is_empty()
}
#[cfg(all(
any(test, feature = "test-support"),
not(all(target_os = "windows", target_env = "gnu"))
))]
pub fn is_connected(&self) -> bool {
pub fn is_connected(&self, _: &App) -> bool {
if let Some(live_kit) = self.live_kit.as_ref() {
live_kit.room.connection_state() == livekit::ConnectionState::Connected
} else {
@ -477,13 +464,15 @@ impl Room {
id: worktree.id().to_proto(),
scan_id: worktree.completed_scan_id() as u64,
});
for repository in worktree.repositories().iter() {
repositories.push(proto::RejoinRepository {
id: repository.work_directory_id().to_proto(),
scan_id: worktree.completed_scan_id() as u64,
});
}
}
for (entry_id, repository) in project.repositories(cx) {
let repository = repository.read(cx);
repositories.push(proto::RejoinRepository {
id: entry_id.to_proto(),
scan_id: repository.completed_scan_id as u64,
});
}
rejoined_projects.push(proto::RejoinProject {
id: project_id,
worktrees,
@ -687,12 +676,6 @@ impl Room {
}
}
#[cfg(all(target_os = "windows", target_env = "gnu"))]
fn start_room_connection(&self, mut room: proto::Room, cx: &mut Context<Self>) -> Task<()> {
Task::ready(())
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
fn start_room_connection(&self, mut room: proto::Room, cx: &mut Context<Self>) -> Task<()> {
// Filter ourselves out from the room's participants.
let local_participant_ix = room
@ -845,7 +828,6 @@ impl Room {
muted: true,
speaking: false,
video_tracks: Default::default(),
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
audio_tracks: Default::default(),
},
);
@ -948,7 +930,6 @@ impl Room {
);
match event {
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
RoomEvent::TrackSubscribed {
track,
participant,
@ -963,18 +944,22 @@ impl Room {
)
})?;
if self.live_kit.as_ref().map_or(true, |kit| kit.deafened) {
track.rtc_track().set_enabled(false);
if matches!(track, livekit_client::RemoteTrack::Audio(_)) {
track.set_enabled(false, cx);
}
}
match track {
livekit::track::RemoteTrack::Audio(track) => {
livekit_client::RemoteTrack::Audio(track) => {
cx.emit(Event::RemoteAudioTracksChanged {
participant_id: participant.peer_id,
});
let stream = play_remote_audio_track(&track, cx.background_executor())?;
participant.audio_tracks.insert(track_id, (track, stream));
participant.muted = publication.is_muted();
if let Some(live_kit) = self.live_kit.as_ref() {
let stream = live_kit.room.play_remote_audio_track(&track, cx)?;
participant.audio_tracks.insert(track_id, (track, stream));
participant.muted = publication.is_muted();
}
}
livekit::track::RemoteTrack::Video(track) => {
livekit_client::RemoteTrack::Video(track) => {
cx.emit(Event::RemoteVideoTracksChanged {
participant_id: participant.peer_id,
});
@ -983,7 +968,6 @@ impl Room {
}
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
RoomEvent::TrackUnsubscribed {
track, participant, ..
} => {
@ -995,23 +979,23 @@ impl Room {
)
})?;
match track {
livekit::track::RemoteTrack::Audio(track) => {
livekit_client::RemoteTrack::Audio(track) => {
participant.audio_tracks.remove(&track.sid());
participant.muted = true;
cx.emit(Event::RemoteAudioTracksChanged {
participant_id: participant.peer_id,
});
}
livekit::track::RemoteTrack::Video(track) => {
livekit_client::RemoteTrack::Video(track) => {
participant.video_tracks.remove(&track.sid());
cx.emit(Event::RemoteVideoTracksChanged {
participant_id: participant.peer_id,
});
cx.emit(Event::RemoteVideoTrackUnsubscribed { sid: track.sid() });
}
}
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
RoomEvent::ActiveSpeakersChanged { speakers } => {
let mut speaker_ids = speakers
.into_iter()
@ -1028,7 +1012,6 @@ impl Room {
}
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
RoomEvent::TrackMuted {
participant,
publication,
@ -1053,7 +1036,6 @@ impl Room {
}
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
RoomEvent::LocalTrackUnpublished { publication, .. } => {
log::info!("unpublished track {}", publication.sid());
if let Some(room) = &mut self.live_kit {
@ -1076,12 +1058,10 @@ impl Room {
}
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
RoomEvent::LocalTrackPublished { publication, .. } => {
log::info!("published track {:?}", publication.sid());
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
RoomEvent::Disconnected { reason } => {
log::info!("disconnected from room: {reason:?}");
self.leave(cx).detach_and_log_err(cx);
@ -1309,13 +1289,6 @@ impl Room {
pub fn can_use_microphone(&self) -> bool {
use proto::ChannelRole::*;
#[cfg(not(any(test, feature = "test-support")))]
{
if cfg!(all(target_os = "windows", target_env = "gnu")) {
return false;
}
}
match self.local_participant.role {
Admin | Member | Talker => true,
Guest | Banned => false,
@ -1330,40 +1303,23 @@ impl Room {
}
}
#[cfg(all(target_os = "windows", target_env = "gnu"))]
pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
Task::ready(Err(anyhow!("MinGW is not supported yet")))
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
#[track_caller]
pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
if self.status.is_offline() {
return Task::ready(Err(anyhow!("room is offline")));
}
let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
let (room, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
let publish_id = post_inc(&mut live_kit.next_publish_id);
live_kit.microphone_track = LocalTrack::Pending { publish_id };
cx.notify();
(live_kit.room.local_participant(), publish_id)
(live_kit.room.clone(), publish_id)
} else {
return Task::ready(Err(anyhow!("live-kit was not initialized")));
};
cx.spawn(async move |this, cx| {
let (track, stream) = capture_local_audio_track(cx.background_executor())?.await;
let publication = participant
.publish_track(
livekit::track::LocalTrack::Audio(track),
TrackPublishOptions {
source: TrackSource::Microphone,
..Default::default()
},
)
.await
.map_err(|error| anyhow!("failed to publish track: {error}"));
let publication = room.publish_local_microphone_track(cx).await;
this.update(cx, |this, cx| {
let live_kit = this
.live_kit
@ -1380,15 +1336,15 @@ impl Room {
};
match publication {
Ok(publication) => {
Ok((publication, stream)) => {
if canceled {
cx.background_spawn(async move {
participant.unpublish_track(&publication.sid()).await
cx.spawn(async move |_, cx| {
room.unpublish_local_track(publication.sid(), cx).await
})
.detach_and_log_err(cx)
} else {
if live_kit.muted_by_user || live_kit.deafened {
publication.mute();
publication.mute(cx);
}
live_kit.microphone_track = LocalTrack::Published {
track_publication: publication,
@ -1412,12 +1368,6 @@ impl Room {
})
}
#[cfg(all(target_os = "windows", target_env = "gnu"))]
pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
Task::ready(Err(anyhow!("MinGW is not supported yet")))
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub fn share_screen(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
if self.status.is_offline() {
return Task::ready(Err(anyhow!("room is offline")));
@ -1441,19 +1391,7 @@ impl Room {
let sources = sources.await??;
let source = sources.first().ok_or_else(|| anyhow!("no display found"))?;
let (track, stream) = capture_local_video_track(&**source).await?;
let publication = participant
.publish_track(
livekit::track::LocalTrack::Video(track),
TrackPublishOptions {
source: TrackSource::Screenshare,
video_codec: VideoCodec::H264,
..Default::default()
},
)
.await
.map_err(|error| anyhow!("error publishing screen track {error:?}"));
let publication = participant.publish_screenshare_track(&**source, cx).await;
this.update(cx, |this, cx| {
let live_kit = this
@ -1471,10 +1409,10 @@ impl Room {
};
match publication {
Ok(publication) => {
Ok((publication, stream)) => {
if canceled {
cx.background_spawn(async move {
participant.unpublish_track(&publication.sid()).await
cx.spawn(async move |_, cx| {
participant.unpublish_track(publication.sid(), cx).await
})
.detach()
} else {
@ -1564,14 +1502,11 @@ impl Room {
LocalTrack::Published {
track_publication, ..
} => {
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
{
let local_participant = live_kit.room.local_participant();
let sid = track_publication.sid();
cx.background_spawn(
async move { local_participant.unpublish_track(&sid).await },
)
.detach_and_log_err(cx);
cx.spawn(async move |_, cx| local_participant.unpublish_track(sid, cx).await)
.detach_and_log_err(cx);
cx.notify();
}
@ -1582,14 +1517,13 @@ impl Room {
}
fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<()> {
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
{
let live_kit = self.live_kit.as_mut()?;
cx.notify();
for (_, participant) in live_kit.room.remote_participants() {
for (_, publication) in participant.track_publications() {
if publication.kind() == TrackKind::Audio {
publication.set_enabled(!deafened);
if publication.is_audio() {
publication.set_enabled(!deafened, cx);
}
}
}
@ -1620,14 +1554,13 @@ impl Room {
LocalTrack::Published {
track_publication, ..
} => {
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
{
if should_mute {
track_publication.mute()
} else {
track_publication.unmute()
}
let guard = Tokio::handle(cx);
if should_mute {
track_publication.mute(cx)
} else {
track_publication.unmute(cx)
}
drop(guard);
None
}
@ -1635,30 +1568,19 @@ impl Room {
}
}
#[cfg(all(target_os = "windows", target_env = "gnu"))]
fn spawn_room_connection(
livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
cx: &mut Context<'_, Room>,
) {
}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
fn spawn_room_connection(
livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
cx: &mut Context<'_, Room>,
) {
if let Some(connection_info) = livekit_connection_info {
cx.spawn(async move |this, cx| {
let (room, mut events) = livekit::Room::connect(
&connection_info.server_url,
&connection_info.token,
RoomOptions::default(),
)
.await?;
let (room, mut events) =
livekit::Room::connect(connection_info.server_url, connection_info.token, cx)
.await?;
this.update(cx, |this, cx| {
let _handle_updates = cx.spawn(async move |this, cx| {
while let Some(event) = events.recv().await {
while let Some(event) = events.next().await {
if this
.update(cx, |this, cx| {
this.livekit_room_updated(event, cx).warn_on_err();
@ -1707,10 +1629,6 @@ struct LiveKitRoom {
}
impl LiveKitRoom {
#[cfg(all(target_os = "windows", target_env = "gnu"))]
fn stop_publishing(&mut self, _cx: &mut Context<Room>) {}
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
fn stop_publishing(&mut self, cx: &mut Context<Room>) {
let mut tracks_to_unpublish = Vec::new();
if let LocalTrack::Published {
@ -1730,9 +1648,9 @@ impl LiveKitRoom {
}
let participant = self.room.local_participant();
cx.background_spawn(async move {
cx.spawn(async move |_, cx| {
for sid in tracks_to_unpublish {
participant.unpublish_track(&sid).await.log_err();
participant.unpublish_track(sid, cx).await.log_err();
}
})
.detach();

View file

@ -1,84 +0,0 @@
#![cfg_attr(all(target_os = "windows", target_env = "gnu"), allow(unused))]
use anyhow::{anyhow, Result};
use client::{proto, ParticipantIndex, User};
use collections::HashMap;
use gpui::WeakEntity;
use livekit_client::AudioStream;
use project::Project;
use std::sync::Arc;
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub use livekit_client::id::TrackSid;
pub use livekit_client::track::{RemoteAudioTrack, RemoteVideoTrack};
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ParticipantLocation {
SharedProject { project_id: u64 },
UnsharedProject,
External,
}
impl ParticipantLocation {
pub fn from_proto(location: Option<proto::ParticipantLocation>) -> Result<Self> {
match location.and_then(|l| l.variant) {
Some(proto::participant_location::Variant::SharedProject(project)) => {
Ok(Self::SharedProject {
project_id: project.id,
})
}
Some(proto::participant_location::Variant::UnsharedProject(_)) => {
Ok(Self::UnsharedProject)
}
Some(proto::participant_location::Variant::External(_)) => Ok(Self::External),
None => Err(anyhow!("participant location was not provided")),
}
}
}
#[derive(Clone, Default)]
pub struct LocalParticipant {
pub projects: Vec<proto::ParticipantProject>,
pub active_project: Option<WeakEntity<Project>>,
pub role: proto::ChannelRole,
}
impl LocalParticipant {
pub fn can_write(&self) -> bool {
matches!(
self.role,
proto::ChannelRole::Admin | proto::ChannelRole::Member
)
}
}
pub struct RemoteParticipant {
pub user: Arc<User>,
pub peer_id: proto::PeerId,
pub role: proto::ChannelRole,
pub projects: Vec<proto::ParticipantProject>,
pub location: ParticipantLocation,
pub participant_index: ParticipantIndex,
pub muted: bool,
pub speaking: bool,
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub video_tracks: HashMap<TrackSid, RemoteVideoTrack>,
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
pub audio_tracks: HashMap<TrackSid, (RemoteAudioTrack, AudioStream)>,
}
impl RemoteParticipant {
pub fn has_video_tracks(&self) -> bool {
#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
return !self.video_tracks.is_empty();
#[cfg(all(target_os = "windows", target_env = "gnu"))]
return false;
}
pub fn can_write(&self) -> bool {
matches!(
self.role,
proto::ChannelRole::Admin | proto::ChannelRole::Member
)
}
}

View file

@ -1,521 +0,0 @@
pub mod participant;
pub mod room;
use crate::call_settings::CallSettings;
use anyhow::{anyhow, Result};
use audio::Audio;
use client::{proto, ChannelId, Client, TypedEnvelope, User, UserStore, ZED_ALWAYS_ACTIVE};
use collections::HashSet;
use futures::{channel::oneshot, future::Shared, Future, FutureExt};
use gpui::{
App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, Subscription, Task,
WeakEntity,
};
use postage::watch;
use project::Project;
use room::Event;
use settings::Settings;
use std::sync::Arc;
pub use participant::ParticipantLocation;
pub use room::Room;
struct GlobalActiveCall(Entity<ActiveCall>);
impl Global for GlobalActiveCall {}
pub fn init(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
CallSettings::register(cx);
let active_call = cx.new(|cx| ActiveCall::new(client, user_store, cx));
cx.set_global(GlobalActiveCall(active_call));
}
pub struct OneAtATime {
cancel: Option<oneshot::Sender<()>>,
}
impl OneAtATime {
/// spawn a task in the given context.
/// if another task is spawned before that resolves, or if the OneAtATime itself is dropped, the first task will be cancelled and return Ok(None)
/// otherwise you'll see the result of the task.
fn spawn<F, Fut, R>(&mut self, cx: &mut App, f: F) -> Task<Result<Option<R>>>
where
F: 'static + FnOnce(AsyncApp) -> Fut,
Fut: Future<Output = Result<R>>,
R: 'static,
{
let (tx, rx) = oneshot::channel();
self.cancel.replace(tx);
cx.spawn(async move |cx| {
futures::select_biased! {
_ = rx.fuse() => Ok(None),
result = f(cx.clone()).fuse() => result.map(Some),
}
})
}
fn running(&self) -> bool {
self.cancel
.as_ref()
.is_some_and(|cancel| !cancel.is_canceled())
}
}
#[derive(Clone)]
pub struct IncomingCall {
pub room_id: u64,
pub calling_user: Arc<User>,
pub participants: Vec<Arc<User>>,
pub initial_project: Option<proto::ParticipantProject>,
}
/// Singleton global maintaining the user's participation in a room across workspaces.
pub struct ActiveCall {
room: Option<(Entity<Room>, Vec<Subscription>)>,
pending_room_creation: Option<Shared<Task<Result<Entity<Room>, Arc<anyhow::Error>>>>>,
location: Option<WeakEntity<Project>>,
_join_debouncer: OneAtATime,
pending_invites: HashSet<u64>,
incoming_call: (
watch::Sender<Option<IncomingCall>>,
watch::Receiver<Option<IncomingCall>>,
),
client: Arc<Client>,
user_store: Entity<UserStore>,
_subscriptions: Vec<client::Subscription>,
}
impl EventEmitter<Event> for ActiveCall {}
impl ActiveCall {
fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
Self {
room: None,
pending_room_creation: None,
location: None,
pending_invites: Default::default(),
incoming_call: watch::channel(),
_join_debouncer: OneAtATime { cancel: None },
_subscriptions: vec![
client.add_request_handler(cx.weak_entity(), Self::handle_incoming_call),
client.add_message_handler(cx.weak_entity(), Self::handle_call_canceled),
],
client,
user_store,
}
}
pub fn channel_id(&self, cx: &App) -> Option<ChannelId> {
self.room()?.read(cx).channel_id()
}
async fn handle_incoming_call(
this: Entity<Self>,
envelope: TypedEnvelope<proto::IncomingCall>,
mut cx: AsyncApp,
) -> Result<proto::Ack> {
let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
let call = IncomingCall {
room_id: envelope.payload.room_id,
participants: user_store
.update(&mut cx, |user_store, cx| {
user_store.get_users(envelope.payload.participant_user_ids, cx)
})?
.await?,
calling_user: user_store
.update(&mut cx, |user_store, cx| {
user_store.get_user(envelope.payload.calling_user_id, cx)
})?
.await?,
initial_project: envelope.payload.initial_project,
};
this.update(&mut cx, |this, _| {
*this.incoming_call.0.borrow_mut() = Some(call);
})?;
Ok(proto::Ack {})
}
async fn handle_call_canceled(
this: Entity<Self>,
envelope: TypedEnvelope<proto::CallCanceled>,
mut cx: AsyncApp,
) -> Result<()> {
this.update(&mut cx, |this, _| {
let mut incoming_call = this.incoming_call.0.borrow_mut();
if incoming_call
.as_ref()
.map_or(false, |call| call.room_id == envelope.payload.room_id)
{
incoming_call.take();
}
})?;
Ok(())
}
pub fn global(cx: &App) -> Entity<Self> {
cx.global::<GlobalActiveCall>().0.clone()
}
pub fn try_global(cx: &App) -> Option<Entity<Self>> {
cx.try_global::<GlobalActiveCall>()
.map(|call| call.0.clone())
}
pub fn invite(
&mut self,
called_user_id: u64,
initial_project: Option<Entity<Project>>,
cx: &mut Context<Self>,
) -> Task<Result<()>> {
if !self.pending_invites.insert(called_user_id) {
return Task::ready(Err(anyhow!("user was already invited")));
}
cx.notify();
if self._join_debouncer.running() {
return Task::ready(Ok(()));
}
let room = if let Some(room) = self.room().cloned() {
Some(Task::ready(Ok(room)).shared())
} else {
self.pending_room_creation.clone()
};
let invite = if let Some(room) = room {
cx.spawn(async move |_, cx| {
let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
let initial_project_id = if let Some(initial_project) = initial_project {
Some(
room.update(cx, |room, cx| room.share_project(initial_project, cx))?
.await?,
)
} else {
None
};
room.update(cx, move |room, cx| {
room.call(called_user_id, initial_project_id, cx)
})?
.await?;
anyhow::Ok(())
})
} else {
let client = self.client.clone();
let user_store = self.user_store.clone();
let room = cx
.spawn(async move |this, cx| {
let create_room = async {
let room = cx
.update(|cx| {
Room::create(
called_user_id,
initial_project,
client,
user_store,
cx,
)
})?
.await?;
this.update(cx, |this, cx| this.set_room(Some(room.clone()), cx))?
.await?;
anyhow::Ok(room)
};
let room = create_room.await;
this.update(cx, |this, _| this.pending_room_creation = None)?;
room.map_err(Arc::new)
})
.shared();
self.pending_room_creation = Some(room.clone());
cx.background_spawn(async move {
room.await.map_err(|err| anyhow!("{:?}", err))?;
anyhow::Ok(())
})
};
cx.spawn(async move |this, cx| {
let result = invite.await;
if result.is_ok() {
this.update(cx, |this, cx| {
this.report_call_event("Participant Invited", cx)
})?;
} else {
//TODO: report collaboration error
log::error!("invite failed: {:?}", result);
}
this.update(cx, |this, cx| {
this.pending_invites.remove(&called_user_id);
cx.notify();
})?;
result
})
}
pub fn cancel_invite(
&mut self,
called_user_id: u64,
cx: &mut Context<Self>,
) -> Task<Result<()>> {
let room_id = if let Some(room) = self.room() {
room.read(cx).id()
} else {
return Task::ready(Err(anyhow!("no active call")));
};
let client = self.client.clone();
cx.background_spawn(async move {
client
.request(proto::CancelCall {
room_id,
called_user_id,
})
.await?;
anyhow::Ok(())
})
}
pub fn incoming(&self) -> watch::Receiver<Option<IncomingCall>> {
self.incoming_call.1.clone()
}
pub fn accept_incoming(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
if self.room.is_some() {
return Task::ready(Err(anyhow!("cannot join while on another call")));
}
let call = if let Some(call) = self.incoming_call.0.borrow_mut().take() {
call
} else {
return Task::ready(Err(anyhow!("no incoming call")));
};
if self.pending_room_creation.is_some() {
return Task::ready(Ok(()));
}
let room_id = call.room_id;
let client = self.client.clone();
let user_store = self.user_store.clone();
let join = self._join_debouncer.spawn(cx, move |mut cx| async move {
Room::join(room_id, client, user_store, &mut cx).await
});
cx.spawn(async move |this, cx| {
let room = join.await?;
this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
.await?;
this.update(cx, |this, cx| {
this.report_call_event("Incoming Call Accepted", cx)
})?;
Ok(())
})
}
pub fn decline_incoming(&mut self, _: &mut Context<Self>) -> Result<()> {
let call = self
.incoming_call
.0
.borrow_mut()
.take()
.ok_or_else(|| anyhow!("no incoming call"))?;
telemetry::event!("Incoming Call Declined", room_id = call.room_id);
self.client.send(proto::DeclineCall {
room_id: call.room_id,
})?;
Ok(())
}
pub fn join_channel(
&mut self,
channel_id: ChannelId,
cx: &mut Context<Self>,
) -> Task<Result<Option<Entity<Room>>>> {
if let Some(room) = self.room().cloned() {
if room.read(cx).channel_id() == Some(channel_id) {
return Task::ready(Ok(Some(room)));
} else {
room.update(cx, |room, cx| room.clear_state(cx));
}
}
if self.pending_room_creation.is_some() {
return Task::ready(Ok(None));
}
let client = self.client.clone();
let user_store = self.user_store.clone();
let join = self._join_debouncer.spawn(cx, move |mut cx| async move {
Room::join_channel(channel_id, client, user_store, &mut cx).await
});
cx.spawn(async move |this, cx| {
let room = join.await?;
this.update(cx, |this, cx| this.set_room(room.clone(), cx))?
.await?;
this.update(cx, |this, cx| this.report_call_event("Channel Joined", cx))?;
Ok(room)
})
}
pub fn hang_up(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
cx.notify();
self.report_call_event("Call Ended", cx);
Audio::end_call(cx);
let channel_id = self.channel_id(cx);
if let Some((room, _)) = self.room.take() {
cx.emit(Event::RoomLeft { channel_id });
room.update(cx, |room, cx| room.leave(cx))
} else {
Task::ready(Ok(()))
}
}
pub fn share_project(
&mut self,
project: Entity<Project>,
cx: &mut Context<Self>,
) -> Task<Result<u64>> {
if let Some((room, _)) = self.room.as_ref() {
self.report_call_event("Project Shared", cx);
room.update(cx, |room, cx| room.share_project(project, cx))
} else {
Task::ready(Err(anyhow!("no active call")))
}
}
pub fn unshare_project(
&mut self,
project: Entity<Project>,
cx: &mut Context<Self>,
) -> Result<()> {
if let Some((room, _)) = self.room.as_ref() {
self.report_call_event("Project Unshared", cx);
room.update(cx, |room, cx| room.unshare_project(project, cx))
} else {
Err(anyhow!("no active call"))
}
}
pub fn location(&self) -> Option<&WeakEntity<Project>> {
self.location.as_ref()
}
pub fn set_location(
&mut self,
project: Option<&Entity<Project>>,
cx: &mut Context<Self>,
) -> Task<Result<()>> {
if project.is_some() || !*ZED_ALWAYS_ACTIVE {
self.location = project.map(|project| project.downgrade());
if let Some((room, _)) = self.room.as_ref() {
return room.update(cx, |room, cx| room.set_location(project, cx));
}
}
Task::ready(Ok(()))
}
fn set_room(&mut self, room: Option<Entity<Room>>, cx: &mut Context<Self>) -> Task<Result<()>> {
if room.as_ref() == self.room.as_ref().map(|room| &room.0) {
Task::ready(Ok(()))
} else {
cx.notify();
if let Some(room) = room {
if room.read(cx).status().is_offline() {
self.room = None;
Task::ready(Ok(()))
} else {
let subscriptions = vec![
cx.observe(&room, |this, room, cx| {
if room.read(cx).status().is_offline() {
this.set_room(None, cx).detach_and_log_err(cx);
}
cx.notify();
}),
cx.subscribe(&room, |_, _, event, cx| cx.emit(event.clone())),
];
self.room = Some((room.clone(), subscriptions));
let location = self
.location
.as_ref()
.and_then(|location| location.upgrade());
let channel_id = room.read(cx).channel_id();
cx.emit(Event::RoomJoined { channel_id });
room.update(cx, |room, cx| room.set_location(location.as_ref(), cx))
}
} else {
self.room = None;
Task::ready(Ok(()))
}
}
}
pub fn room(&self) -> Option<&Entity<Room>> {
self.room.as_ref().map(|(room, _)| room)
}
pub fn client(&self) -> Arc<Client> {
self.client.clone()
}
pub fn pending_invites(&self) -> &HashSet<u64> {
&self.pending_invites
}
pub fn report_call_event(&self, operation: &'static str, cx: &mut App) {
if let Some(room) = self.room() {
let room = room.read(cx);
telemetry::event!(
operation,
room_id = room.id(),
channel_id = room.channel_id()
);
}
}
}
#[cfg(test)]
mod test {
use gpui::TestAppContext;
use crate::OneAtATime;
#[gpui::test]
async fn test_one_at_a_time(cx: &mut TestAppContext) {
let mut one_at_a_time = OneAtATime { cancel: None };
assert_eq!(
cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(1) }))
.await
.unwrap(),
Some(1)
);
let (a, b) = cx.update(|cx| {
(
one_at_a_time.spawn(cx, |_| async {
panic!("");
}),
one_at_a_time.spawn(cx, |_| async { Ok(3) }),
)
});
assert_eq!(a.await.unwrap(), None::<u32>);
assert_eq!(b.await.unwrap(), Some(3));
let promise = cx.update(|cx| one_at_a_time.spawn(cx, |_| async { Ok(4) }));
drop(one_at_a_time);
assert_eq!(promise.await.unwrap(), None);
}
}

File diff suppressed because it is too large Load diff