Leave room automatically on disconnection
Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
parent
f0c45cbceb
commit
6aa0f0b200
6 changed files with 302 additions and 123 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -4459,6 +4459,7 @@ dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"client",
|
"client",
|
||||||
"collections",
|
"collections",
|
||||||
|
"futures",
|
||||||
"gpui",
|
"gpui",
|
||||||
"project",
|
"project",
|
||||||
]
|
]
|
||||||
|
|
|
@ -86,7 +86,7 @@ async fn test_basic_calls(
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_a, &client_a, cx_a).await,
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: Default::default(),
|
remote: Default::default(),
|
||||||
pending: Default::default()
|
pending: Default::default()
|
||||||
|
@ -104,7 +104,7 @@ async fn test_basic_calls(
|
||||||
|
|
||||||
deterministic.run_until_parked();
|
deterministic.run_until_parked();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_a, &client_a, cx_a).await,
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: Default::default(),
|
remote: Default::default(),
|
||||||
pending: vec!["user_b".to_string()]
|
pending: vec!["user_b".to_string()]
|
||||||
|
@ -121,14 +121,14 @@ async fn test_basic_calls(
|
||||||
|
|
||||||
deterministic.run_until_parked();
|
deterministic.run_until_parked();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_a, &client_a, cx_a).await,
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: vec!["user_b".to_string()],
|
remote: vec!["user_b".to_string()],
|
||||||
pending: Default::default()
|
pending: Default::default()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_b, &client_b, cx_b).await,
|
room_participants(&room_b, &client_b, cx_b).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: vec!["user_a".to_string()],
|
remote: vec!["user_a".to_string()],
|
||||||
pending: Default::default()
|
pending: Default::default()
|
||||||
|
@ -146,14 +146,14 @@ async fn test_basic_calls(
|
||||||
|
|
||||||
deterministic.run_until_parked();
|
deterministic.run_until_parked();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_a, &client_a, cx_a).await,
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: vec!["user_b".to_string()],
|
remote: vec!["user_b".to_string()],
|
||||||
pending: vec!["user_c".to_string()]
|
pending: vec!["user_c".to_string()]
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_b, &client_b, cx_b).await,
|
room_participants(&room_b, &client_b, cx_b).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: vec!["user_a".to_string()],
|
remote: vec!["user_a".to_string()],
|
||||||
pending: vec!["user_c".to_string()]
|
pending: vec!["user_c".to_string()]
|
||||||
|
@ -170,14 +170,14 @@ async fn test_basic_calls(
|
||||||
|
|
||||||
deterministic.run_until_parked();
|
deterministic.run_until_parked();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_a, &client_a, cx_a).await,
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: vec!["user_b".to_string()],
|
remote: vec!["user_b".to_string()],
|
||||||
pending: Default::default()
|
pending: Default::default()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_b, &client_b, cx_b).await,
|
room_participants(&room_b, &client_b, cx_b).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: vec!["user_a".to_string()],
|
remote: vec!["user_a".to_string()],
|
||||||
pending: Default::default()
|
pending: Default::default()
|
||||||
|
@ -185,61 +185,90 @@ async fn test_basic_calls(
|
||||||
);
|
);
|
||||||
|
|
||||||
// User A leaves the room.
|
// User A leaves the room.
|
||||||
cx_a.update(|_| drop(room_a));
|
room_a.update(cx_a, |room, cx| room.leave(cx)).unwrap();
|
||||||
deterministic.run_until_parked();
|
deterministic.run_until_parked();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
participants(&room_b, &client_b, cx_b).await,
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: Default::default(),
|
remote: Default::default(),
|
||||||
pending: Default::default()
|
pending: Default::default()
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
assert_eq!(
|
||||||
#[derive(Debug, Eq, PartialEq)]
|
room_participants(&room_b, &client_b, cx_b).await,
|
||||||
struct RoomParticipants {
|
|
||||||
remote: Vec<String>,
|
|
||||||
pending: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn participants(
|
|
||||||
room: &ModelHandle<Room>,
|
|
||||||
client: &TestClient,
|
|
||||||
cx: &mut TestAppContext,
|
|
||||||
) -> RoomParticipants {
|
|
||||||
let remote_users = room.update(cx, |room, cx| {
|
|
||||||
room.remote_participants()
|
|
||||||
.values()
|
|
||||||
.map(|participant| {
|
|
||||||
client
|
|
||||||
.user_store
|
|
||||||
.update(cx, |users, cx| users.get_user(participant.user_id, cx))
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
});
|
|
||||||
let remote_users = futures::future::try_join_all(remote_users).await.unwrap();
|
|
||||||
let pending_users = room.update(cx, |room, cx| {
|
|
||||||
room.pending_user_ids()
|
|
||||||
.iter()
|
|
||||||
.map(|user_id| {
|
|
||||||
client
|
|
||||||
.user_store
|
|
||||||
.update(cx, |users, cx| users.get_user(*user_id, cx))
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
});
|
|
||||||
let pending_users = futures::future::try_join_all(pending_users).await.unwrap();
|
|
||||||
|
|
||||||
RoomParticipants {
|
RoomParticipants {
|
||||||
remote: remote_users
|
remote: Default::default(),
|
||||||
.into_iter()
|
pending: Default::default()
|
||||||
.map(|user| user.github_login.clone())
|
|
||||||
.collect(),
|
|
||||||
pending: pending_users
|
|
||||||
.into_iter()
|
|
||||||
.map(|user| user.github_login.clone())
|
|
||||||
.collect(),
|
|
||||||
}
|
}
|
||||||
}
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[gpui::test(iterations = 10)]
|
||||||
|
async fn test_leaving_room_on_disconnection(
|
||||||
|
deterministic: Arc<Deterministic>,
|
||||||
|
cx_a: &mut TestAppContext,
|
||||||
|
cx_b: &mut TestAppContext,
|
||||||
|
) {
|
||||||
|
deterministic.forbid_parking();
|
||||||
|
let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
|
||||||
|
let client_a = server.create_client(cx_a, "user_a").await;
|
||||||
|
let client_b = server.create_client(cx_b, "user_b").await;
|
||||||
|
server
|
||||||
|
.make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let room_a = cx_a
|
||||||
|
.update(|cx| Room::create(client_a.clone(), cx))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Call user B from client A.
|
||||||
|
let mut incoming_call_b = client_b
|
||||||
|
.user_store
|
||||||
|
.update(cx_b, |user, _| user.incoming_call());
|
||||||
|
room_a
|
||||||
|
.update(cx_a, |room, cx| room.call(client_b.user_id().unwrap(), cx))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// User B receives the call and joins the room.
|
||||||
|
let call_b = incoming_call_b.next().await.unwrap().unwrap();
|
||||||
|
let room_b = cx_b
|
||||||
|
.update(|cx| Room::join(&call_b, client_b.clone(), cx))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
deterministic.run_until_parked();
|
||||||
|
assert_eq!(
|
||||||
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
|
RoomParticipants {
|
||||||
|
remote: vec!["user_b".to_string()],
|
||||||
|
pending: Default::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
room_participants(&room_b, &client_b, cx_b).await,
|
||||||
|
RoomParticipants {
|
||||||
|
remote: vec!["user_a".to_string()],
|
||||||
|
pending: Default::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
server.disconnect_client(client_a.current_user_id(cx_a));
|
||||||
|
cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
|
||||||
|
assert_eq!(
|
||||||
|
room_participants(&room_a, &client_a, cx_a).await,
|
||||||
|
RoomParticipants {
|
||||||
|
remote: Default::default(),
|
||||||
|
pending: Default::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
room_participants(&room_b, &client_b, cx_b).await,
|
||||||
|
RoomParticipants {
|
||||||
|
remote: Default::default(),
|
||||||
|
pending: Default::default()
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[gpui::test(iterations = 10)]
|
#[gpui::test(iterations = 10)]
|
||||||
|
@ -6169,3 +6198,49 @@ fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Eq, PartialEq)]
|
||||||
|
struct RoomParticipants {
|
||||||
|
remote: Vec<String>,
|
||||||
|
pending: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn room_participants(
|
||||||
|
room: &ModelHandle<Room>,
|
||||||
|
client: &TestClient,
|
||||||
|
cx: &mut TestAppContext,
|
||||||
|
) -> RoomParticipants {
|
||||||
|
let remote_users = room.update(cx, |room, cx| {
|
||||||
|
room.remote_participants()
|
||||||
|
.values()
|
||||||
|
.map(|participant| {
|
||||||
|
client
|
||||||
|
.user_store
|
||||||
|
.update(cx, |users, cx| users.get_user(participant.user_id, cx))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
});
|
||||||
|
let remote_users = futures::future::try_join_all(remote_users).await.unwrap();
|
||||||
|
let pending_users = room.update(cx, |room, cx| {
|
||||||
|
room.pending_user_ids()
|
||||||
|
.iter()
|
||||||
|
.map(|user_id| {
|
||||||
|
client
|
||||||
|
.user_store
|
||||||
|
.update(cx, |users, cx| users.get_user(*user_id, cx))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
});
|
||||||
|
let pending_users = futures::future::try_join_all(pending_users).await.unwrap();
|
||||||
|
|
||||||
|
RoomParticipants {
|
||||||
|
remote: remote_users
|
||||||
|
.into_iter()
|
||||||
|
.map(|user| user.github_login.clone())
|
||||||
|
.collect(),
|
||||||
|
pending: pending_users
|
||||||
|
.into_iter()
|
||||||
|
.map(|user| user.github_login.clone())
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -528,6 +528,13 @@ impl Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(room) = removed_connection
|
||||||
|
.room_id
|
||||||
|
.and_then(|room_id| store.room(room_id))
|
||||||
|
{
|
||||||
|
self.room_updated(room);
|
||||||
|
}
|
||||||
|
|
||||||
removed_user_id = removed_connection.user_id;
|
removed_user_id = removed_connection.user_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ pub type RoomId = u64;
|
||||||
#[derive(Default, Serialize)]
|
#[derive(Default, Serialize)]
|
||||||
pub struct Store {
|
pub struct Store {
|
||||||
connections: BTreeMap<ConnectionId, ConnectionState>,
|
connections: BTreeMap<ConnectionId, ConnectionState>,
|
||||||
connections_by_user_id: BTreeMap<UserId, UserConnectionState>,
|
connected_users: BTreeMap<UserId, ConnectedUser>,
|
||||||
next_room_id: RoomId,
|
next_room_id: RoomId,
|
||||||
rooms: BTreeMap<RoomId, proto::Room>,
|
rooms: BTreeMap<RoomId, proto::Room>,
|
||||||
projects: BTreeMap<ProjectId, Project>,
|
projects: BTreeMap<ProjectId, Project>,
|
||||||
|
@ -22,9 +22,9 @@ pub struct Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Serialize)]
|
#[derive(Default, Serialize)]
|
||||||
struct UserConnectionState {
|
struct ConnectedUser {
|
||||||
connection_ids: HashSet<ConnectionId>,
|
connection_ids: HashSet<ConnectionId>,
|
||||||
room: Option<RoomState>,
|
active_call: Option<CallState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
@ -37,9 +37,9 @@ struct ConnectionState {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
|
#[derive(Copy, Clone, Eq, PartialEq, Serialize)]
|
||||||
enum RoomState {
|
struct CallState {
|
||||||
Joined { room_id: RoomId },
|
room_id: RoomId,
|
||||||
Calling { room_id: RoomId },
|
joined: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
@ -89,6 +89,7 @@ pub struct RemovedConnectionState {
|
||||||
pub hosted_projects: HashMap<ProjectId, Project>,
|
pub hosted_projects: HashMap<ProjectId, Project>,
|
||||||
pub guest_project_ids: HashSet<ProjectId>,
|
pub guest_project_ids: HashSet<ProjectId>,
|
||||||
pub contact_ids: HashSet<UserId>,
|
pub contact_ids: HashSet<UserId>,
|
||||||
|
pub room_id: Option<RoomId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct LeftProject {
|
pub struct LeftProject {
|
||||||
|
@ -156,7 +157,7 @@ impl Store {
|
||||||
channels: Default::default(),
|
channels: Default::default(),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
self.connections_by_user_id
|
self.connected_users
|
||||||
.entry(user_id)
|
.entry(user_id)
|
||||||
.or_default()
|
.or_default()
|
||||||
.connection_ids
|
.connection_ids
|
||||||
|
@ -196,10 +197,32 @@ impl Store {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let user_connection_state = self.connections_by_user_id.get_mut(&user_id).unwrap();
|
let connected_user = self.connected_users.get_mut(&user_id).unwrap();
|
||||||
user_connection_state.connection_ids.remove(&connection_id);
|
connected_user.connection_ids.remove(&connection_id);
|
||||||
if user_connection_state.connection_ids.is_empty() {
|
if let Some(active_call) = connected_user.active_call.as_ref() {
|
||||||
self.connections_by_user_id.remove(&user_id);
|
if let Some(room) = self.rooms.get_mut(&active_call.room_id) {
|
||||||
|
let prev_participant_count = room.participants.len();
|
||||||
|
room.participants
|
||||||
|
.retain(|participant| participant.peer_id != connection_id.0);
|
||||||
|
if prev_participant_count == room.participants.len() {
|
||||||
|
if connected_user.connection_ids.is_empty() {
|
||||||
|
room.pending_user_ids
|
||||||
|
.retain(|pending_user_id| *pending_user_id != user_id.to_proto());
|
||||||
|
result.room_id = Some(active_call.room_id);
|
||||||
|
connected_user.active_call = None;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
result.room_id = Some(active_call.room_id);
|
||||||
|
connected_user.active_call = None;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::error!("disconnected user claims to be in a room that does not exist");
|
||||||
|
connected_user.active_call = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if connected_user.connection_ids.is_empty() {
|
||||||
|
self.connected_users.remove(&user_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.connections.remove(&connection_id).unwrap();
|
self.connections.remove(&connection_id).unwrap();
|
||||||
|
@ -247,7 +270,7 @@ impl Store {
|
||||||
&self,
|
&self,
|
||||||
user_id: UserId,
|
user_id: UserId,
|
||||||
) -> impl Iterator<Item = ConnectionId> + '_ {
|
) -> impl Iterator<Item = ConnectionId> + '_ {
|
||||||
self.connections_by_user_id
|
self.connected_users
|
||||||
.get(&user_id)
|
.get(&user_id)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|state| &state.connection_ids)
|
.map(|state| &state.connection_ids)
|
||||||
|
@ -257,7 +280,7 @@ impl Store {
|
||||||
|
|
||||||
pub fn is_user_online(&self, user_id: UserId) -> bool {
|
pub fn is_user_online(&self, user_id: UserId) -> bool {
|
||||||
!self
|
!self
|
||||||
.connections_by_user_id
|
.connected_users
|
||||||
.get(&user_id)
|
.get(&user_id)
|
||||||
.unwrap_or(&Default::default())
|
.unwrap_or(&Default::default())
|
||||||
.connection_ids
|
.connection_ids
|
||||||
|
@ -308,7 +331,7 @@ impl Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
|
pub fn project_metadata_for_user(&self, user_id: UserId) -> Vec<proto::ProjectMetadata> {
|
||||||
let user_connection_state = self.connections_by_user_id.get(&user_id);
|
let user_connection_state = self.connected_users.get(&user_id);
|
||||||
let project_ids = user_connection_state.iter().flat_map(|state| {
|
let project_ids = user_connection_state.iter().flat_map(|state| {
|
||||||
state
|
state
|
||||||
.connection_ids
|
.connection_ids
|
||||||
|
@ -347,13 +370,13 @@ impl Store {
|
||||||
.connections
|
.connections
|
||||||
.get_mut(&creator_connection_id)
|
.get_mut(&creator_connection_id)
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
let user_connection_state = self
|
let connected_user = self
|
||||||
.connections_by_user_id
|
.connected_users
|
||||||
.get_mut(&connection.user_id)
|
.get_mut(&connection.user_id)
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
user_connection_state.room.is_none(),
|
connected_user.active_call.is_none(),
|
||||||
"cannot participate in more than one room at once"
|
"can't create a room with an active call"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut room = proto::Room::default();
|
let mut room = proto::Room::default();
|
||||||
|
@ -370,7 +393,10 @@ impl Store {
|
||||||
|
|
||||||
let room_id = post_inc(&mut self.next_room_id);
|
let room_id = post_inc(&mut self.next_room_id);
|
||||||
self.rooms.insert(room_id, room);
|
self.rooms.insert(room_id, room);
|
||||||
user_connection_state.room = Some(RoomState::Joined { room_id });
|
connected_user.active_call = Some(CallState {
|
||||||
|
room_id,
|
||||||
|
joined: true,
|
||||||
|
});
|
||||||
Ok(room_id)
|
Ok(room_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,15 +412,15 @@ impl Store {
|
||||||
let user_id = connection.user_id;
|
let user_id = connection.user_id;
|
||||||
let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
|
let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
|
||||||
|
|
||||||
let mut user_connection_state = self
|
let mut connected_user = self
|
||||||
.connections_by_user_id
|
.connected_users
|
||||||
.get_mut(&user_id)
|
.get_mut(&user_id)
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
user_connection_state
|
connected_user
|
||||||
.room
|
.active_call
|
||||||
.map_or(true, |room| room == RoomState::Calling { room_id }),
|
.map_or(false, |call| call.room_id == room_id && !call.joined),
|
||||||
"cannot participate in more than one room at once"
|
"not being called on this room"
|
||||||
);
|
);
|
||||||
|
|
||||||
let room = self
|
let room = self
|
||||||
|
@ -417,7 +443,10 @@ impl Store {
|
||||||
)),
|
)),
|
||||||
}),
|
}),
|
||||||
});
|
});
|
||||||
user_connection_state.room = Some(RoomState::Joined { room_id });
|
connected_user.active_call = Some(CallState {
|
||||||
|
room_id,
|
||||||
|
joined: true,
|
||||||
|
});
|
||||||
|
|
||||||
Ok((room, recipient_connection_ids))
|
Ok((room, recipient_connection_ids))
|
||||||
}
|
}
|
||||||
|
@ -433,14 +462,14 @@ impl Store {
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
let user_id = connection.user_id;
|
let user_id = connection.user_id;
|
||||||
|
|
||||||
let mut user_connection_state = self
|
let mut connected_user = self
|
||||||
.connections_by_user_id
|
.connected_users
|
||||||
.get_mut(&user_id)
|
.get_mut(&user_id)
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
user_connection_state
|
connected_user
|
||||||
.room
|
.active_call
|
||||||
.map_or(false, |room| room == RoomState::Joined { room_id }),
|
.map_or(false, |call| call.room_id == room_id && call.joined),
|
||||||
"cannot leave a room before joining it"
|
"cannot leave a room before joining it"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -450,26 +479,32 @@ impl Store {
|
||||||
.ok_or_else(|| anyhow!("no such room"))?;
|
.ok_or_else(|| anyhow!("no such room"))?;
|
||||||
room.participants
|
room.participants
|
||||||
.retain(|participant| participant.peer_id != connection_id.0);
|
.retain(|participant| participant.peer_id != connection_id.0);
|
||||||
user_connection_state.room = None;
|
connected_user.active_call = None;
|
||||||
|
|
||||||
Ok(room)
|
Ok(room)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> {
|
||||||
|
self.rooms.get(&room_id)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn call(
|
pub fn call(
|
||||||
&mut self,
|
&mut self,
|
||||||
room_id: RoomId,
|
room_id: RoomId,
|
||||||
from_connection_id: ConnectionId,
|
from_connection_id: ConnectionId,
|
||||||
to_user_id: UserId,
|
recipient_id: UserId,
|
||||||
) -> Result<(UserId, Vec<ConnectionId>, &proto::Room)> {
|
) -> Result<(UserId, Vec<ConnectionId>, &proto::Room)> {
|
||||||
let from_user_id = self.user_id_for_connection(from_connection_id)?;
|
let from_user_id = self.user_id_for_connection(from_connection_id)?;
|
||||||
|
|
||||||
let to_connection_ids = self.connection_ids_for_user(to_user_id).collect::<Vec<_>>();
|
let recipient_connection_ids = self
|
||||||
let mut to_user_connection_state = self
|
.connection_ids_for_user(recipient_id)
|
||||||
.connections_by_user_id
|
.collect::<Vec<_>>();
|
||||||
.get_mut(&to_user_id)
|
let mut recipient = self
|
||||||
|
.connected_users
|
||||||
|
.get_mut(&recipient_id)
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
to_user_connection_state.room.is_none(),
|
recipient.active_call.is_none(),
|
||||||
"recipient is already on another call"
|
"recipient is already on another call"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -486,22 +521,27 @@ impl Store {
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
room.pending_user_ids
|
room.pending_user_ids
|
||||||
.iter()
|
.iter()
|
||||||
.all(|user_id| UserId::from_proto(*user_id) != to_user_id),
|
.all(|user_id| UserId::from_proto(*user_id) != recipient_id),
|
||||||
"cannot call the same user more than once"
|
"cannot call the same user more than once"
|
||||||
);
|
);
|
||||||
room.pending_user_ids.push(to_user_id.to_proto());
|
room.pending_user_ids.push(recipient_id.to_proto());
|
||||||
to_user_connection_state.room = Some(RoomState::Calling { room_id });
|
recipient.active_call = Some(CallState {
|
||||||
|
room_id,
|
||||||
|
joined: false,
|
||||||
|
});
|
||||||
|
|
||||||
Ok((from_user_id, to_connection_ids, room))
|
Ok((from_user_id, recipient_connection_ids, room))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
|
pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
|
||||||
let mut to_user_connection_state = self
|
let mut recipient = self
|
||||||
.connections_by_user_id
|
.connected_users
|
||||||
.get_mut(&to_user_id)
|
.get_mut(&to_user_id)
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
anyhow::ensure!(to_user_connection_state.room == Some(RoomState::Calling { room_id }));
|
anyhow::ensure!(recipient
|
||||||
to_user_connection_state.room = None;
|
.active_call
|
||||||
|
.map_or(false, |call| call.room_id == room_id && !call.joined));
|
||||||
|
recipient.active_call = None;
|
||||||
let room = self
|
let room = self
|
||||||
.rooms
|
.rooms
|
||||||
.get_mut(&room_id)
|
.get_mut(&room_id)
|
||||||
|
@ -516,18 +556,17 @@ impl Store {
|
||||||
recipient_connection_id: ConnectionId,
|
recipient_connection_id: ConnectionId,
|
||||||
) -> Result<(&proto::Room, Vec<ConnectionId>)> {
|
) -> Result<(&proto::Room, Vec<ConnectionId>)> {
|
||||||
let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
|
let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
|
||||||
let mut to_user_connection_state = self
|
let recipient = self
|
||||||
.connections_by_user_id
|
.connected_users
|
||||||
.get_mut(&recipient_user_id)
|
.get_mut(&recipient_user_id)
|
||||||
.ok_or_else(|| anyhow!("no such connection"))?;
|
.ok_or_else(|| anyhow!("no such connection"))?;
|
||||||
if let Some(RoomState::Calling { room_id }) = to_user_connection_state.room {
|
if let Some(active_call) = recipient.active_call.take() {
|
||||||
to_user_connection_state.room = None;
|
|
||||||
let recipient_connection_ids = self
|
let recipient_connection_ids = self
|
||||||
.connection_ids_for_user(recipient_user_id)
|
.connection_ids_for_user(recipient_user_id)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let room = self
|
let room = self
|
||||||
.rooms
|
.rooms
|
||||||
.get_mut(&room_id)
|
.get_mut(&active_call.room_id)
|
||||||
.ok_or_else(|| anyhow!("no such room"))?;
|
.ok_or_else(|| anyhow!("no such room"))?;
|
||||||
room.pending_user_ids
|
room.pending_user_ids
|
||||||
.retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
|
.retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
|
||||||
|
@ -650,7 +689,7 @@ impl Store {
|
||||||
|
|
||||||
for requester_user_id in project.join_requests.keys() {
|
for requester_user_id in project.join_requests.keys() {
|
||||||
if let Some(requester_user_connection_state) =
|
if let Some(requester_user_connection_state) =
|
||||||
self.connections_by_user_id.get_mut(requester_user_id)
|
self.connected_users.get_mut(requester_user_id)
|
||||||
{
|
{
|
||||||
for requester_connection_id in
|
for requester_connection_id in
|
||||||
&requester_user_connection_state.connection_ids
|
&requester_user_connection_state.connection_ids
|
||||||
|
@ -1007,14 +1046,14 @@ impl Store {
|
||||||
assert!(channel.connection_ids.contains(connection_id));
|
assert!(channel.connection_ids.contains(connection_id));
|
||||||
}
|
}
|
||||||
assert!(self
|
assert!(self
|
||||||
.connections_by_user_id
|
.connected_users
|
||||||
.get(&connection.user_id)
|
.get(&connection.user_id)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.connection_ids
|
.connection_ids
|
||||||
.contains(connection_id));
|
.contains(connection_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (user_id, state) in &self.connections_by_user_id {
|
for (user_id, state) in &self.connected_users {
|
||||||
for connection_id in &state.connection_ids {
|
for connection_id in &state.connection_ids {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
self.connections.get(connection_id).unwrap().user_id,
|
self.connections.get(connection_id).unwrap().user_id,
|
||||||
|
|
|
@ -16,12 +16,14 @@ test-support = [
|
||||||
]
|
]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.38"
|
|
||||||
client = { path = "../client" }
|
client = { path = "../client" }
|
||||||
collections = { path = "../collections" }
|
collections = { path = "../collections" }
|
||||||
gpui = { path = "../gpui" }
|
gpui = { path = "../gpui" }
|
||||||
project = { path = "../project" }
|
project = { path = "../project" }
|
||||||
|
|
||||||
|
anyhow = "1.0.38"
|
||||||
|
futures = "0.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
client = { path = "../client", features = ["test-support"] }
|
client = { path = "../client", features = ["test-support"] }
|
||||||
collections = { path = "../collections", features = ["test-support"] }
|
collections = { path = "../collections", features = ["test-support"] }
|
||||||
|
|
|
@ -3,6 +3,7 @@ mod participant;
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use client::{call::Call, proto, Client, PeerId, TypedEnvelope};
|
use client::{call::Call, proto, Client, PeerId, TypedEnvelope};
|
||||||
use collections::HashMap;
|
use collections::HashMap;
|
||||||
|
use futures::StreamExt;
|
||||||
use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
|
use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
|
||||||
use participant::{LocalParticipant, ParticipantLocation, RemoteParticipant};
|
use participant::{LocalParticipant, ParticipantLocation, RemoteParticipant};
|
||||||
use project::Project;
|
use project::Project;
|
||||||
|
@ -12,13 +13,9 @@ pub enum Event {
|
||||||
PeerChangedActiveProject,
|
PeerChangedActiveProject,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum CallResponse {
|
|
||||||
Accepted,
|
|
||||||
Rejected,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Room {
|
pub struct Room {
|
||||||
id: u64,
|
id: u64,
|
||||||
|
status: RoomStatus,
|
||||||
local_participant: LocalParticipant,
|
local_participant: LocalParticipant,
|
||||||
remote_participants: HashMap<PeerId, RemoteParticipant>,
|
remote_participants: HashMap<PeerId, RemoteParticipant>,
|
||||||
pending_user_ids: Vec<u64>,
|
pending_user_ids: Vec<u64>,
|
||||||
|
@ -32,8 +29,24 @@ impl Entity for Room {
|
||||||
|
|
||||||
impl Room {
|
impl Room {
|
||||||
fn new(id: u64, client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
|
fn new(id: u64, client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
|
||||||
|
let mut client_status = client.status();
|
||||||
|
cx.spawn_weak(|this, mut cx| async move {
|
||||||
|
let is_connected = client_status
|
||||||
|
.next()
|
||||||
|
.await
|
||||||
|
.map_or(false, |s| s.is_connected());
|
||||||
|
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
|
||||||
|
if !is_connected || client_status.next().await.is_some() {
|
||||||
|
if let Some(this) = this.upgrade(&cx) {
|
||||||
|
let _ = this.update(&mut cx, |this, cx| this.leave(cx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.detach();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
id,
|
id,
|
||||||
|
status: RoomStatus::Online,
|
||||||
local_participant: LocalParticipant {
|
local_participant: LocalParticipant {
|
||||||
projects: Default::default(),
|
projects: Default::default(),
|
||||||
},
|
},
|
||||||
|
@ -69,6 +82,18 @@ impl Room {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn leave(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
|
||||||
|
if self.status.is_offline() {
|
||||||
|
return Err(anyhow!("room is offline"));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.status = RoomStatus::Offline;
|
||||||
|
self.remote_participants.clear();
|
||||||
|
self.client.send(proto::LeaveRoom { id: self.id })?;
|
||||||
|
cx.notify();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
|
pub fn remote_participants(&self) -> &HashMap<PeerId, RemoteParticipant> {
|
||||||
&self.remote_participants
|
&self.remote_participants
|
||||||
}
|
}
|
||||||
|
@ -112,6 +137,10 @@ impl Room {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn call(&mut self, to_user_id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
|
pub fn call(&mut self, to_user_id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
|
||||||
|
if self.status.is_offline() {
|
||||||
|
return Task::ready(Err(anyhow!("room is offline")));
|
||||||
|
}
|
||||||
|
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
let room_id = self.id;
|
let room_id = self.id;
|
||||||
cx.foreground().spawn(async move {
|
cx.foreground().spawn(async move {
|
||||||
|
@ -125,32 +154,58 @@ impl Room {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn publish_project(&mut self, project: ModelHandle<Project>) -> Result<()> {
|
pub fn publish_project(&mut self, project: ModelHandle<Project>) -> Task<Result<()>> {
|
||||||
|
if self.status.is_offline() {
|
||||||
|
return Task::ready(Err(anyhow!("room is offline")));
|
||||||
|
}
|
||||||
|
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn unpublish_project(&mut self, project: ModelHandle<Project>) -> Result<()> {
|
pub fn unpublish_project(&mut self, project: ModelHandle<Project>) -> Task<Result<()>> {
|
||||||
|
if self.status.is_offline() {
|
||||||
|
return Task::ready(Err(anyhow!("room is offline")));
|
||||||
|
}
|
||||||
|
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set_active_project(
|
pub fn set_active_project(
|
||||||
&mut self,
|
&mut self,
|
||||||
project: Option<&ModelHandle<Project>>,
|
project: Option<&ModelHandle<Project>>,
|
||||||
) -> Result<()> {
|
) -> Task<Result<()>> {
|
||||||
|
if self.status.is_offline() {
|
||||||
|
return Task::ready(Err(anyhow!("room is offline")));
|
||||||
|
}
|
||||||
|
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn mute(&mut self) -> Result<()> {
|
pub fn mute(&mut self) -> Task<Result<()>> {
|
||||||
|
if self.status.is_offline() {
|
||||||
|
return Task::ready(Err(anyhow!("room is offline")));
|
||||||
|
}
|
||||||
|
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn unmute(&mut self) -> Result<()> {
|
pub fn unmute(&mut self) -> Task<Result<()>> {
|
||||||
|
if self.status.is_offline() {
|
||||||
|
return Task::ready(Err(anyhow!("room is offline")));
|
||||||
|
}
|
||||||
|
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Room {
|
#[derive(Copy, Clone, PartialEq, Eq)]
|
||||||
fn drop(&mut self) {
|
pub enum RoomStatus {
|
||||||
let _ = self.client.send(proto::LeaveRoom { id: self.id });
|
Online,
|
||||||
|
Offline,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RoomStatus {
|
||||||
|
fn is_offline(&self) -> bool {
|
||||||
|
matches!(self, RoomStatus::Offline)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue