revert single channel click (#7738)

- Revert "collab tweaks (#7706)"
- Revert "2112 (#7640)"
- Revert "single click channel (#7596)"
- Reserve protobufs
- Don't revert migrations

Release Notes:

- N/A

**or**

- N/A
This commit is contained in:
Conrad Irwin 2024-02-13 12:53:49 -07:00 committed by GitHub
parent ecd9b93cb1
commit 2294d99046
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 525 additions and 709 deletions

View file

@ -84,7 +84,6 @@ pub struct ActiveCall {
),
client: Arc<Client>,
user_store: Model<UserStore>,
pending_channel_id: Option<u64>,
_subscriptions: Vec<client::Subscription>,
}
@ -98,7 +97,6 @@ impl ActiveCall {
location: None,
pending_invites: Default::default(),
incoming_call: watch::channel(),
pending_channel_id: None,
_join_debouncer: OneAtATime { cancel: None },
_subscriptions: vec![
client.add_request_handler(cx.weak_model(), Self::handle_incoming_call),
@ -113,10 +111,6 @@ impl ActiveCall {
self.room()?.read(cx).channel_id()
}
pub fn pending_channel_id(&self) -> Option<u64> {
self.pending_channel_id
}
async fn handle_incoming_call(
this: Model<Self>,
envelope: TypedEnvelope<proto::IncomingCall>,
@ -345,13 +339,11 @@ impl ActiveCall {
channel_id: u64,
cx: &mut ModelContext<Self>,
) -> Task<Result<Option<Model<Room>>>> {
let mut leave = None;
if let Some(room) = self.room().cloned() {
if room.read(cx).channel_id() == Some(channel_id) {
return Task::ready(Ok(Some(room)));
} else {
let (room, _) = self.room.take().unwrap();
leave = room.update(cx, |room, cx| Some(room.leave(cx)));
room.update(cx, |room, cx| room.clear_state(cx));
}
}
@ -361,21 +353,14 @@ impl ActiveCall {
let client = self.client.clone();
let user_store = self.user_store.clone();
self.pending_channel_id = Some(channel_id);
let join = self._join_debouncer.spawn(cx, move |cx| async move {
if let Some(task) = leave {
task.await?
}
Room::join_channel(channel_id, client, user_store, cx).await
});
cx.spawn(|this, mut cx| async move {
let room = join.await?;
this.update(&mut cx, |this, cx| {
this.pending_channel_id.take();
this.set_room(room.clone(), cx)
})?
.await?;
this.update(&mut cx, |this, cx| this.set_room(room.clone(), cx))?
.await?;
this.update(&mut cx, |this, cx| {
this.report_call_event("join channel", cx)
})?;

View file

@ -7,6 +7,7 @@ use settings::Settings;
#[derive(Deserialize, Debug)]
pub struct CallSettings {
pub mute_on_join: bool,
pub share_on_join: bool,
}
/// Configuration of voice calls in Zed.
@ -16,6 +17,11 @@ pub struct CallSettingsContent {
///
/// Default: false
pub mute_on_join: Option<bool>,
/// Whether your current project should be shared when joining an empty channel.
///
/// Default: true
pub share_on_join: Option<bool>,
}
impl Settings for CallSettings {

View file

@ -49,7 +49,6 @@ pub struct RemoteParticipant {
pub participant_index: ParticipantIndex,
pub muted: bool,
pub speaking: bool,
pub in_call: bool,
pub video_tracks: HashMap<live_kit_client::Sid, Arc<RemoteVideoTrack>>,
pub audio_tracks: HashMap<live_kit_client::Sid, Arc<RemoteAudioTrack>>,
}

View file

@ -61,7 +61,6 @@ pub struct Room {
id: u64,
channel_id: Option<u64>,
live_kit: Option<LiveKitRoom>,
live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
status: RoomStatus,
shared_projects: HashSet<WeakModel<Project>>,
joined_projects: HashSet<WeakModel<Project>>,
@ -113,18 +112,91 @@ impl Room {
user_store: Model<UserStore>,
cx: &mut ModelContext<Self>,
) -> Self {
let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
let room = live_kit_client::Room::new();
let mut status = room.status();
// Consume the initial status of the room.
let _ = status.try_recv();
let _maintain_room = cx.spawn(|this, mut cx| async move {
while let Some(status) = status.next().await {
let this = if let Some(this) = this.upgrade() {
this
} else {
break;
};
if status == live_kit_client::ConnectionState::Disconnected {
this.update(&mut cx, |this, cx| this.leave(cx).log_err())
.ok();
break;
}
}
});
let _handle_updates = cx.spawn({
let room = room.clone();
move |this, mut cx| async move {
let mut updates = room.updates();
while let Some(update) = updates.next().await {
let this = if let Some(this) = this.upgrade() {
this
} else {
break;
};
this.update(&mut cx, |this, cx| {
this.live_kit_room_updated(update, cx).log_err()
})
.ok();
}
}
});
let connect = room.connect(&connection_info.server_url, &connection_info.token);
cx.spawn(|this, mut cx| async move {
connect.await?;
this.update(&mut cx, |this, cx| {
if !this.read_only() {
if let Some(live_kit) = &this.live_kit {
if !live_kit.muted_by_user && !live_kit.deafened {
return this.share_microphone(cx);
}
}
}
Task::ready(Ok(()))
})?
.await
})
.detach_and_log_err(cx);
Some(LiveKitRoom {
room,
screen_track: LocalTrack::None,
microphone_track: LocalTrack::None,
next_publish_id: 0,
muted_by_user: Self::mute_on_join(cx),
deafened: false,
speaking: false,
_maintain_room,
_handle_updates,
})
} else {
None
};
let maintain_connection = cx.spawn({
let client = client.clone();
move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
});
Audio::play_sound(Sound::Joined, cx);
let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
let mut this = Self {
Self {
id,
channel_id,
live_kit: None,
live_kit_connection_info,
live_kit: live_kit_room,
status: RoomStatus::Online,
shared_projects: Default::default(),
joined_projects: Default::default(),
@ -148,11 +220,7 @@ impl Room {
maintain_connection: Some(maintain_connection),
room_update_completed_tx,
room_update_completed_rx,
};
if this.live_kit_connection_info.is_some() {
this.join_call(cx).detach_and_log_err(cx);
}
this
}
pub(crate) fn create(
@ -211,7 +279,7 @@ impl Room {
cx: AsyncAppContext,
) -> Result<Model<Self>> {
Self::from_join_response(
client.request(proto::JoinChannel2 { channel_id }).await?,
client.request(proto::JoinChannel { channel_id }).await?,
client,
user_store,
cx,
@ -256,7 +324,7 @@ impl Room {
}
pub fn mute_on_join(cx: &AppContext) -> bool {
CallSettings::get_global(cx).mute_on_join
CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
}
fn from_join_response(
@ -306,9 +374,7 @@ impl Room {
}
log::info!("leaving room");
if self.live_kit.is_some() {
Audio::play_sound(Sound::Leave, cx);
}
Audio::play_sound(Sound::Leave, cx);
self.clear_state(cx);
@ -527,24 +593,6 @@ impl Room {
&self.remote_participants
}
pub fn call_participants(&self, cx: &AppContext) -> Vec<Arc<User>> {
self.remote_participants()
.values()
.filter_map(|participant| {
if participant.in_call {
Some(participant.user.clone())
} else {
None
}
})
.chain(if self.in_call() {
self.user_store.read(cx).current_user()
} else {
None
})
.collect()
}
pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
self.remote_participants
.values()
@ -569,6 +617,10 @@ impl Room {
self.local_participant.role == proto::ChannelRole::Admin
}
pub fn local_participant_is_guest(&self) -> bool {
self.local_participant.role == proto::ChannelRole::Guest
}
pub fn set_participant_role(
&mut self,
user_id: u64,
@ -776,7 +828,6 @@ impl Room {
}
let role = participant.role();
let in_call = participant.in_call;
let location = ParticipantLocation::from_proto(participant.location)
.unwrap_or(ParticipantLocation::External);
if let Some(remote_participant) =
@ -787,15 +838,9 @@ impl Room {
remote_participant.participant_index = participant_index;
if location != remote_participant.location
|| role != remote_participant.role
|| in_call != remote_participant.in_call
{
if in_call && !remote_participant.in_call {
Audio::play_sound(Sound::Joined, cx);
}
remote_participant.location = location;
remote_participant.role = role;
remote_participant.in_call = participant.in_call;
cx.emit(Event::ParticipantLocationChanged {
participant_id: peer_id,
});
@ -812,15 +857,12 @@ impl Room {
role,
muted: true,
speaking: false,
in_call: participant.in_call,
video_tracks: Default::default(),
audio_tracks: Default::default(),
},
);
if participant.in_call {
Audio::play_sound(Sound::Joined, cx);
}
Audio::play_sound(Sound::Joined, cx);
if let Some(live_kit) = this.live_kit.as_ref() {
let video_tracks =
@ -1009,6 +1051,15 @@ impl Room {
}
RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
if let Some(live_kit) = &self.live_kit {
if live_kit.deafened {
track.stop();
cx.foreground_executor()
.spawn(publication.set_enabled(false))
.detach();
}
}
let user_id = track.publisher_id().parse()?;
let track_id = track.sid().to_string();
let participant = self
@ -1155,7 +1206,7 @@ impl Room {
})
}
pub(crate) fn share_project(
pub fn share_project(
&mut self,
project: Model<Project>,
cx: &mut ModelContext<Self>,
@ -1257,14 +1308,18 @@ impl Room {
})
}
pub fn is_sharing_mic(&self) -> bool {
self.live_kit.as_ref().map_or(false, |live_kit| {
!matches!(live_kit.microphone_track, LocalTrack::None)
})
}
pub fn is_muted(&self) -> bool {
self.live_kit
.as_ref()
.map_or(true, |live_kit| match &live_kit.microphone_track {
LocalTrack::None => true,
LocalTrack::Pending { .. } => true,
LocalTrack::Published { track_publication } => track_publication.is_muted(),
})
self.live_kit.as_ref().map_or(false, |live_kit| {
matches!(live_kit.microphone_track, LocalTrack::None)
|| live_kit.muted_by_user
|| live_kit.deafened
})
}
pub fn read_only(&self) -> bool {
@ -1278,8 +1333,8 @@ impl Room {
.map_or(false, |live_kit| live_kit.speaking)
}
pub fn in_call(&self) -> bool {
self.live_kit.is_some()
pub fn is_deafened(&self) -> Option<bool> {
self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
}
#[track_caller]
@ -1332,8 +1387,12 @@ impl Room {
Ok(publication) => {
if canceled {
live_kit.room.unpublish_track(publication);
live_kit.microphone_track = LocalTrack::None;
} else {
if live_kit.muted_by_user || live_kit.deafened {
cx.background_executor()
.spawn(publication.set_mute(true))
.detach();
}
live_kit.microphone_track = LocalTrack::Published {
track_publication: publication,
};
@ -1437,140 +1496,50 @@ impl Room {
}
pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) {
let muted = !self.is_muted();
if let Some(task) = self.set_mute(muted, cx) {
task.detach_and_log_err(cx);
if let Some(live_kit) = self.live_kit.as_mut() {
// When unmuting, undeafen if the user was deafened before.
let was_deafened = live_kit.deafened;
if live_kit.muted_by_user
|| live_kit.deafened
|| matches!(live_kit.microphone_track, LocalTrack::None)
{
live_kit.muted_by_user = false;
live_kit.deafened = false;
} else {
live_kit.muted_by_user = true;
}
let muted = live_kit.muted_by_user;
let should_undeafen = was_deafened && !live_kit.deafened;
if let Some(task) = self.set_mute(muted, cx) {
task.detach_and_log_err(cx);
}
if should_undeafen {
if let Some(task) = self.set_deafened(false, cx) {
task.detach_and_log_err(cx);
}
}
}
}
pub fn join_call(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if self.live_kit.is_some() {
return Task::ready(Ok(()));
}
pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) {
if let Some(live_kit) = self.live_kit.as_mut() {
// When deafening, mute the microphone if it was not already muted.
// When un-deafening, unmute the microphone, unless it was explicitly muted.
let deafened = !live_kit.deafened;
live_kit.deafened = deafened;
let should_change_mute = !live_kit.muted_by_user;
let room = live_kit_client::Room::new();
let mut status = room.status();
// Consume the initial status of the room.
let _ = status.try_recv();
let _maintain_room = cx.spawn(|this, mut cx| async move {
while let Some(status) = status.next().await {
let this = if let Some(this) = this.upgrade() {
this
} else {
break;
};
if let Some(task) = self.set_deafened(deafened, cx) {
task.detach_and_log_err(cx);
}
if status == live_kit_client::ConnectionState::Disconnected {
this.update(&mut cx, |this, cx| this.leave(cx).log_err())
.ok();
break;
if should_change_mute {
if let Some(task) = self.set_mute(deafened, cx) {
task.detach_and_log_err(cx);
}
}
});
let _handle_updates = cx.spawn({
let room = room.clone();
move |this, mut cx| async move {
let mut updates = room.updates();
while let Some(update) = updates.next().await {
let this = if let Some(this) = this.upgrade() {
this
} else {
break;
};
this.update(&mut cx, |this, cx| {
this.live_kit_room_updated(update, cx).log_err()
})
.ok();
}
}
});
self.live_kit = Some(LiveKitRoom {
room: room.clone(),
screen_track: LocalTrack::None,
microphone_track: LocalTrack::None,
next_publish_id: 0,
speaking: false,
_maintain_room,
_handle_updates,
});
cx.spawn({
let client = self.client.clone();
let share_microphone = !self.read_only() && !Self::mute_on_join(cx);
let connection_info = self.live_kit_connection_info.clone();
let channel_id = self.channel_id;
move |this, mut cx| async move {
let connection_info = if let Some(connection_info) = connection_info {
connection_info.clone()
} else if let Some(channel_id) = channel_id {
if let Some(connection_info) = client
.request(proto::JoinChannelCall { channel_id })
.await?
.live_kit_connection_info
{
connection_info
} else {
return Err(anyhow!("failed to get connection info from server"));
}
} else {
return Err(anyhow!(
"tried to connect to livekit without connection info"
));
};
room.connect(&connection_info.server_url, &connection_info.token)
.await?;
let track_updates = this.update(&mut cx, |this, cx| {
Audio::play_sound(Sound::Joined, cx);
let Some(live_kit) = this.live_kit.as_mut() else {
return vec![];
};
let mut track_updates = Vec::new();
for participant in this.remote_participants.values() {
for publication in live_kit
.room
.remote_audio_track_publications(&participant.user.id.to_string())
{
track_updates.push(publication.set_enabled(true));
}
for track in participant.audio_tracks.values() {
track.start();
}
}
track_updates
})?;
if share_microphone {
this.update(&mut cx, |this, cx| this.share_microphone(cx))?
.await?
};
for result in futures::future::join_all(track_updates).await {
result?;
}
anyhow::Ok(())
}
})
}
pub fn leave_call(&mut self, cx: &mut ModelContext<Self>) {
Audio::play_sound(Sound::Leave, cx);
if let Some(channel_id) = self.channel_id() {
let client = self.client.clone();
cx.background_executor()
.spawn(client.request(proto::LeaveChannelCall { channel_id }))
.detach_and_log_err(cx);
self.live_kit.take();
self.live_kit_connection_info.take();
cx.notify();
} else {
self.leave(cx).detach_and_log_err(cx)
}
}
@ -1601,6 +1570,40 @@ impl Room {
}
}
fn set_deafened(
&mut self,
deafened: bool,
cx: &mut ModelContext<Self>,
) -> Option<Task<Result<()>>> {
let live_kit = self.live_kit.as_mut()?;
cx.notify();
let mut track_updates = Vec::new();
for participant in self.remote_participants.values() {
for publication in live_kit
.room
.remote_audio_track_publications(&participant.user.id.to_string())
{
track_updates.push(publication.set_enabled(!deafened));
}
for track in participant.audio_tracks.values() {
if deafened {
track.stop();
} else {
track.start();
}
}
}
Some(cx.foreground_executor().spawn(async move {
for result in futures::future::join_all(track_updates).await {
result?;
}
Ok(())
}))
}
fn set_mute(
&mut self,
should_mute: bool,
@ -1645,6 +1648,9 @@ struct LiveKitRoom {
room: Arc<live_kit_client::Room>,
screen_track: LocalTrack,
microphone_track: LocalTrack,
/// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
muted_by_user: bool,
deafened: bool,
speaking: bool,
next_publish_id: usize,
_maintain_room: Task<()>,