Add ability to join a room from a channel ID

co-authored-by: max <max@zed.dev>
This commit is contained in:
Mikayla Maki 2023-07-31 15:27:10 -07:00
parent 4b94bfa045
commit 92fa879b0c
No known key found for this signature in database
16 changed files with 485 additions and 105 deletions

View file

@ -1337,32 +1337,65 @@ impl Database {
&self,
room_id: RoomId,
user_id: UserId,
channel_id: Option<ChannelId>,
connection: ConnectionId,
) -> Result<RoomGuard<proto::Room>> {
self.room_transaction(room_id, |tx| async move {
let result = room_participant::Entity::update_many()
.filter(
Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::UserId.eq(user_id))
.add(room_participant::Column::AnsweringConnectionId.is_null()),
)
.set(room_participant::ActiveModel {
if let Some(channel_id) = channel_id {
channel_member::Entity::find()
.filter(
channel_member::Column::ChannelId
.eq(channel_id)
.and(channel_member::Column::UserId.eq(user_id))
.and(channel_member::Column::Accepted.eq(true)),
)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such channel membership"))?;
room_participant::ActiveModel {
room_id: ActiveValue::set(room_id),
user_id: ActiveValue::set(user_id),
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
answering_connection_server_id: ActiveValue::set(Some(ServerId(
connection.owner_id as i32,
))),
answering_connection_lost: ActiveValue::set(false),
// Redundant for the channel join use case, used for channel and call invitations
calling_user_id: ActiveValue::set(user_id),
calling_connection_id: ActiveValue::set(connection.id as i32),
calling_connection_server_id: ActiveValue::set(Some(ServerId(
connection.owner_id as i32,
))),
..Default::default()
})
.exec(&*tx)
}
.insert(&*tx)
.await?;
if result.rows_affected == 0 {
Err(anyhow!("room does not exist or was already joined"))?
} else {
let room = self.get_room(room_id, &tx).await?;
Ok(room)
let result = room_participant::Entity::update_many()
.filter(
Condition::all()
.add(room_participant::Column::RoomId.eq(room_id))
.add(room_participant::Column::UserId.eq(user_id))
.add(room_participant::Column::AnsweringConnectionId.is_null()),
)
.set(room_participant::ActiveModel {
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
answering_connection_server_id: ActiveValue::set(Some(ServerId(
connection.owner_id as i32,
))),
answering_connection_lost: ActiveValue::set(false),
..Default::default()
})
.exec(&*tx)
.await?;
if result.rows_affected == 0 {
Err(anyhow!("room does not exist or was already joined"))?;
}
}
let room = self.get_room(room_id, &tx).await?;
Ok(room)
})
.await
}
@ -3071,6 +3104,14 @@ impl Database {
.insert(&*tx)
.await?;
room::ActiveModel {
channel_id: ActiveValue::Set(Some(channel.id)),
live_kit_room: ActiveValue::Set(format!("channel-{}", channel.id)),
..Default::default()
}
.insert(&*tx)
.await?;
Ok(channel.id)
})
.await
@ -3163,6 +3204,7 @@ impl Database {
self.transaction(|tx| async move {
let tx = tx;
// Breadth first list of all edges in this user's channels
let sql = r#"
WITH RECURSIVE channel_tree(child_id, parent_id, depth) AS (
SELECT channel_id as child_id, CAST(NULL as INTEGER) as parent_id, 0
@ -3173,23 +3215,52 @@ impl Database {
FROM channel_parents, channel_tree
WHERE channel_parents.parent_id = channel_tree.child_id
)
SELECT channel_tree.child_id as id, channels.name, channel_tree.parent_id
SELECT channel_tree.child_id, channel_tree.parent_id
FROM channel_tree
JOIN channels ON channels.id = channel_tree.child_id
ORDER BY channel_tree.depth;
ORDER BY child_id, parent_id IS NOT NULL
"#;
#[derive(FromQueryResult, Debug, PartialEq)]
pub struct ChannelParent {
pub child_id: ChannelId,
pub parent_id: Option<ChannelId>,
}
let stmt = Statement::from_sql_and_values(
self.pool.get_database_backend(),
sql,
vec![user_id.into()],
);
Ok(channel_parent::Entity::find()
let mut parents_by_child_id = HashMap::default();
let mut parents = channel_parent::Entity::find()
.from_raw_sql(stmt)
.into_model::<Channel>()
.all(&*tx)
.await?)
.into_model::<ChannelParent>()
.stream(&*tx).await?;
while let Some(parent) = parents.next().await {
let parent = parent?;
parents_by_child_id.insert(parent.child_id, parent.parent_id);
}
drop(parents);
let mut channels = Vec::with_capacity(parents_by_child_id.len());
let mut rows = channel::Entity::find()
.filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
.stream(&*tx).await?;
while let Some(row) = rows.next().await {
let row = row?;
channels.push(Channel {
id: row.id,
name: row.name,
parent_id: parents_by_child_id.get(&row.id).copied().flatten(),
});
}
drop(rows);
Ok(channels)
})
.await
}
@ -3210,6 +3281,22 @@ impl Database {
.await
}
pub async fn get_channel_room(&self, channel_id: ChannelId) -> Result<RoomId> {
self.transaction(|tx| async move {
let tx = tx;
let room = channel::Model {
id: channel_id,
..Default::default()
}
.find_related(room::Entity)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("invalid channel"))?;
Ok(room.id)
})
.await
}
async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
where
F: Send + Fn(TransactionHandle) -> Fut,

View file

@ -1,4 +1,4 @@
use super::{ChannelId, RoomId};
use super::ChannelId;
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, Default, PartialEq, Eq, DeriveEntityModel)]
@ -7,7 +7,6 @@ pub struct Model {
#[sea_orm(primary_key)]
pub id: ChannelId,
pub name: String,
pub room_id: Option<RoomId>,
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -1,4 +1,4 @@
use super::RoomId;
use super::{ChannelId, RoomId};
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
@ -7,6 +7,7 @@ pub struct Model {
#[sea_orm(primary_key)]
pub id: RoomId,
pub live_kit_room: String,
pub channel_id: Option<ChannelId>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@ -17,6 +18,12 @@ pub enum Relation {
Project,
#[sea_orm(has_many = "super::follower::Entity")]
Follower,
#[sea_orm(
belongs_to = "super::channel::Entity",
from = "Column::ChannelId",
to = "super::channel::Column::Id"
)]
Channel,
}
impl Related<super::room_participant::Entity> for Entity {
@ -39,7 +46,7 @@ impl Related<super::follower::Entity> for Entity {
impl Related<super::channel::Entity> for Entity {
fn to() -> RelationDef {
Relation::Follower.def()
Relation::Channel.def()
}
}

View file

@ -494,9 +494,14 @@ test_both_dbs!(
)
.await
.unwrap();
db.join_room(room_id, user2.user_id, ConnectionId { owner_id, id: 1 })
.await
.unwrap();
db.join_room(
room_id,
user2.user_id,
None,
ConnectionId { owner_id, id: 1 },
)
.await
.unwrap();
assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0);
db.share_project(room_id, ConnectionId { owner_id, id: 1 }, &[])
@ -920,11 +925,6 @@ test_both_dbs!(test_channels_postgres, test_channels_sqlite, db, {
name: "zed".to_string(),
parent_id: None,
},
Channel {
id: rust_id,
name: "rust".to_string(),
parent_id: None,
},
Channel {
id: crdb_id,
name: "crdb".to_string(),
@ -940,6 +940,11 @@ test_both_dbs!(test_channels_postgres, test_channels_sqlite, db, {
name: "replace".to_string(),
parent_id: Some(zed_id),
},
Channel {
id: rust_id,
name: "rust".to_string(),
parent_id: None,
},
Channel {
id: cargo_id,
name: "cargo".to_string(),
@ -949,6 +954,69 @@ test_both_dbs!(test_channels_postgres, test_channels_sqlite, db, {
);
});
test_both_dbs!(
test_joining_channels_postgres,
test_joining_channels_sqlite,
db,
{
let owner_id = db.create_server("test").await.unwrap().0 as u32;
let user_1 = db
.create_user(
"user1@example.com",
false,
NewUserParams {
github_login: "user1".into(),
github_user_id: 5,
invite_count: 0,
},
)
.await
.unwrap()
.user_id;
let user_2 = db
.create_user(
"user2@example.com",
false,
NewUserParams {
github_login: "user2".into(),
github_user_id: 6,
invite_count: 0,
},
)
.await
.unwrap()
.user_id;
let channel_1 = db.create_root_channel("channel_1", user_1).await.unwrap();
let room_1 = db.get_channel_room(channel_1).await.unwrap();
// can join a room with membership to its channel
let room = db
.join_room(
room_1,
user_1,
Some(channel_1),
ConnectionId { owner_id, id: 1 },
)
.await
.unwrap();
assert_eq!(room.participants.len(), 1);
drop(room);
// cannot join a room without membership to its channel
assert!(db
.join_room(
room_1,
user_2,
Some(channel_1),
ConnectionId { owner_id, id: 1 }
)
.await
.is_err());
}
);
#[gpui::test]
async fn test_multiple_signup_overwrite() {
let test_db = TestDb::postgres(build_background_executor());

View file

@ -34,7 +34,10 @@ use futures::{
use lazy_static::lazy_static;
use prometheus::{register_int_gauge, IntGauge};
use rpc::{
proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
proto::{
self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, LiveKitConnectionInfo,
RequestMessage,
},
Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
};
use serde::{Serialize, Serializer};
@ -183,7 +186,7 @@ impl Server {
server
.add_request_handler(ping)
.add_request_handler(create_room)
.add_request_handler(create_room_request)
.add_request_handler(join_room)
.add_request_handler(rejoin_room)
.add_request_handler(leave_room)
@ -243,6 +246,7 @@ impl Server {
.add_request_handler(invite_channel_member)
.add_request_handler(remove_channel_member)
.add_request_handler(respond_to_channel_invite)
.add_request_handler(join_channel)
.add_request_handler(follow)
.add_message_handler(unfollow)
.add_message_handler(update_followers)
@ -855,48 +859,17 @@ async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session
Ok(())
}
async fn create_room(
async fn create_room_request(
_request: proto::CreateRoom,
response: Response<proto::CreateRoom>,
session: Session,
) -> Result<()> {
let live_kit_room = nanoid::nanoid!(30);
let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() {
if let Some(_) = live_kit
.create_room(live_kit_room.clone())
.await
.trace_err()
{
if let Some(token) = live_kit
.room_token(&live_kit_room, &session.user_id.to_string())
.trace_err()
{
Some(proto::LiveKitConnectionInfo {
server_url: live_kit.url().into(),
token,
})
} else {
None
}
} else {
None
}
} else {
None
};
let (room, live_kit_connection_info) = create_room(&session).await?;
{
let room = session
.db()
.await
.create_room(session.user_id, session.connection_id, &live_kit_room)
.await?;
response.send(proto::CreateRoomResponse {
room: Some(room.clone()),
live_kit_connection_info,
})?;
}
response.send(proto::CreateRoomResponse {
room: Some(room.clone()),
live_kit_connection_info,
})?;
update_user_contacts(session.user_id, &session).await?;
Ok(())
@ -912,7 +885,7 @@ async fn join_room(
let room = session
.db()
.await
.join_room(room_id, session.user_id, session.connection_id)
.join_room(room_id, session.user_id, None, session.connection_id)
.await?;
room_updated(&room, &session.peer);
room.clone()
@ -2182,6 +2155,32 @@ async fn respond_to_channel_invite(
Ok(())
}
async fn join_channel(
request: proto::JoinChannel,
response: Response<proto::JoinChannel>,
session: Session,
) -> Result<()> {
let db = session.db().await;
let channel_id = ChannelId::from_proto(request.channel_id);
todo!();
// db.check_channel_membership(session.user_id, channel_id)
// .await?;
let (room, live_kit_connection_info) = create_room(&session).await?;
// db.set_channel_room(channel_id, room.id).await?;
response.send(proto::CreateRoomResponse {
room: Some(room.clone()),
live_kit_connection_info,
})?;
update_user_contacts(session.user_id, &session).await?;
Ok(())
}
async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
let project_id = ProjectId::from_proto(request.project_id);
let project_connection_ids = session
@ -2436,6 +2435,42 @@ fn project_left(project: &db::LeftProject, session: &Session) {
}
}
async fn create_room(session: &Session) -> Result<(proto::Room, Option<LiveKitConnectionInfo>)> {
let live_kit_room = nanoid::nanoid!(30);
let live_kit_connection_info = {
let live_kit_room = live_kit_room.clone();
let live_kit = session.live_kit_client.as_ref();
util::async_iife!({
let live_kit = live_kit?;
live_kit
.create_room(live_kit_room.clone())
.await
.trace_err()?;
let token = live_kit
.room_token(&live_kit_room, &session.user_id.to_string())
.trace_err()?;
Some(proto::LiveKitConnectionInfo {
server_url: live_kit.url().into(),
token,
})
})
}
.await;
let room = session
.db()
.await
.create_room(session.user_id, session.connection_id, &live_kit_room)
.await?;
Ok((room, live_kit_connection_info))
}
pub trait ResultExt {
type Ok;

View file

@ -5,7 +5,7 @@ use crate::{
AppState,
};
use anyhow::anyhow;
use call::ActiveCall;
use call::{ActiveCall, Room};
use client::{
self, proto::PeerId, ChannelStore, Client, Connection, Credentials, EstablishConnectionError,
UserStore,
@ -269,6 +269,44 @@ impl TestServer {
}
}
async fn make_channel(
&self,
channel: &str,
admin: (&TestClient, &mut TestAppContext),
members: &mut [(&TestClient, &mut TestAppContext)],
) -> u64 {
let (admin_client, admin_cx) = admin;
let channel_id = admin_client
.channel_store
.update(admin_cx, |channel_store, _| {
channel_store.create_channel(channel, None)
})
.await
.unwrap();
for (member_client, member_cx) in members {
admin_client
.channel_store
.update(admin_cx, |channel_store, _| {
channel_store.invite_member(channel_id, member_client.user_id().unwrap(), false)
})
.await
.unwrap();
admin_cx.foreground().run_until_parked();
member_client
.channel_store
.update(*member_cx, |channels, _| {
channels.respond_to_channel_invite(channel_id, true)
})
.await
.unwrap();
}
channel_id
}
async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) {
self.make_contacts(clients).await;
@ -516,3 +554,27 @@ impl Drop for TestClient {
self.client.teardown();
}
}
#[derive(Debug, Eq, PartialEq)]
struct RoomParticipants {
remote: Vec<String>,
pending: Vec<String>,
}
fn room_participants(room: &ModelHandle<Room>, cx: &mut TestAppContext) -> RoomParticipants {
room.read_with(cx, |room, _| {
let mut remote = room
.remote_participants()
.iter()
.map(|(_, participant)| participant.user.github_login.clone())
.collect::<Vec<_>>();
let mut pending = room
.pending_participants()
.iter()
.map(|user| user.github_login.clone())
.collect::<Vec<_>>();
remote.sort();
pending.sort();
RoomParticipants { remote, pending }
})
}

View file

@ -1,7 +1,10 @@
use call::ActiveCall;
use client::Channel;
use gpui::{executor::Deterministic, TestAppContext};
use std::sync::Arc;
use crate::tests::{room_participants, RoomParticipants};
use super::TestServer;
#[gpui::test]
@ -82,6 +85,58 @@ async fn test_basic_channels(
});
}
#[gpui::test]
async fn test_channel_room(
deterministic: Arc<Deterministic>,
cx_a: &mut TestAppContext,
cx_b: &mut TestAppContext,
) {
deterministic.forbid_parking();
let mut server = TestServer::start(&deterministic).await;
let client_a = server.create_client(cx_a, "user_a").await;
let client_b = server.create_client(cx_b, "user_b").await;
let zed_id = server
.make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let active_call_b = cx_b.read(ActiveCall::global);
active_call_a
.update(cx_a, |active_call, cx| active_call.join_channel(zed_id, cx))
.await
.unwrap();
deterministic.run_until_parked();
let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
assert_eq!(
room_participants(&room_a, cx_a),
RoomParticipants {
remote: vec!["user_a".to_string()],
pending: vec![]
}
);
active_call_b
.update(cx_b, |active_call, cx| active_call.join_channel(zed_id, cx))
.await
.unwrap();
deterministic.run_until_parked();
let active_call_b = cx_b.read(ActiveCall::global);
let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
assert_eq!(
room_participants(&room_b, cx_b),
RoomParticipants {
remote: vec!["user_a".to_string(), "user_b".to_string()],
pending: vec![]
}
);
}
// TODO:
// Invariants to test:
// 1. Dag structure is maintained for all operations (can't make a cycle)

View file

@ -1,6 +1,6 @@
use crate::{
rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
tests::{TestClient, TestServer},
tests::{room_participants, RoomParticipants, TestClient, TestServer},
};
use call::{room, ActiveCall, ParticipantLocation, Room};
use client::{User, RECEIVE_TIMEOUT};
@ -8319,30 +8319,6 @@ async fn test_inlay_hint_refresh_is_forwarded(
});
}
#[derive(Debug, Eq, PartialEq)]
struct RoomParticipants {
remote: Vec<String>,
pending: Vec<String>,
}
fn room_participants(room: &ModelHandle<Room>, cx: &mut TestAppContext) -> RoomParticipants {
room.read_with(cx, |room, _| {
let mut remote = room
.remote_participants()
.iter()
.map(|(_, participant)| participant.user.github_login.clone())
.collect::<Vec<_>>();
let mut pending = room
.pending_participants()
.iter()
.map(|user| user.github_login.clone())
.collect::<Vec<_>>();
remote.sort();
pending.sort();
RoomParticipants { remote, pending }
})
}
fn extract_hint_labels(editor: &Editor) -> Vec<String> {
let mut labels = Vec::new();
for (_, excerpt_hints) in &editor.inlay_hint_cache().hints {