Set up basic RPC for managing channels
Co-authored-by: Mikayla <mikayla@zed.dev>
This commit is contained in:
parent
758e1f6e57
commit
4b94bfa045
12 changed files with 541 additions and 150 deletions
|
@ -3032,11 +3032,16 @@ impl Database {
|
|||
|
||||
// channels
|
||||
|
||||
pub async fn create_root_channel(&self, name: &str) -> Result<ChannelId> {
|
||||
self.create_channel(name, None).await
|
||||
pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
|
||||
self.create_channel(name, None, creator_id).await
|
||||
}
|
||||
|
||||
pub async fn create_channel(&self, name: &str, parent: Option<ChannelId>) -> Result<ChannelId> {
|
||||
pub async fn create_channel(
|
||||
&self,
|
||||
name: &str,
|
||||
parent: Option<ChannelId>,
|
||||
creator_id: UserId,
|
||||
) -> Result<ChannelId> {
|
||||
self.transaction(move |tx| async move {
|
||||
let tx = tx;
|
||||
|
||||
|
@ -3056,19 +3061,50 @@ impl Database {
|
|||
.await?;
|
||||
}
|
||||
|
||||
channel_member::ActiveModel {
|
||||
channel_id: ActiveValue::Set(channel.id),
|
||||
user_id: ActiveValue::Set(creator_id),
|
||||
accepted: ActiveValue::Set(true),
|
||||
admin: ActiveValue::Set(true),
|
||||
..Default::default()
|
||||
}
|
||||
.insert(&*tx)
|
||||
.await?;
|
||||
|
||||
Ok(channel.id)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
// Property: Members are only
|
||||
pub async fn add_channel_member(&self, channel_id: ChannelId, user_id: UserId) -> Result<()> {
|
||||
pub async fn invite_channel_member(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
invitee_id: UserId,
|
||||
inviter_id: UserId,
|
||||
is_admin: bool,
|
||||
) -> Result<()> {
|
||||
self.transaction(move |tx| async move {
|
||||
let tx = tx;
|
||||
|
||||
// Check if inviter is a member
|
||||
channel_member::Entity::find()
|
||||
.filter(
|
||||
channel_member::Column::ChannelId
|
||||
.eq(channel_id)
|
||||
.and(channel_member::Column::UserId.eq(inviter_id))
|
||||
.and(channel_member::Column::Admin.eq(true)),
|
||||
)
|
||||
.one(&*tx)
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
anyhow!("Inviter does not have permissions to invite the invitee")
|
||||
})?;
|
||||
|
||||
let channel_membership = channel_member::ActiveModel {
|
||||
channel_id: ActiveValue::Set(channel_id),
|
||||
user_id: ActiveValue::Set(user_id),
|
||||
user_id: ActiveValue::Set(invitee_id),
|
||||
accepted: ActiveValue::Set(false),
|
||||
admin: ActiveValue::Set(is_admin),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
@ -3079,6 +3115,50 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
|
||||
pub async fn respond_to_channel_invite(
|
||||
&self,
|
||||
channel_id: ChannelId,
|
||||
user_id: UserId,
|
||||
accept: bool,
|
||||
) -> Result<()> {
|
||||
self.transaction(move |tx| async move {
|
||||
let tx = tx;
|
||||
|
||||
let rows_affected = if accept {
|
||||
channel_member::Entity::update_many()
|
||||
.set(channel_member::ActiveModel {
|
||||
accepted: ActiveValue::Set(accept),
|
||||
..Default::default()
|
||||
})
|
||||
.filter(
|
||||
channel_member::Column::ChannelId
|
||||
.eq(channel_id)
|
||||
.and(channel_member::Column::UserId.eq(user_id))
|
||||
.and(channel_member::Column::Accepted.eq(false)),
|
||||
)
|
||||
.exec(&*tx)
|
||||
.await?
|
||||
.rows_affected
|
||||
} else {
|
||||
channel_member::ActiveModel {
|
||||
channel_id: ActiveValue::Unchanged(channel_id),
|
||||
user_id: ActiveValue::Unchanged(user_id),
|
||||
..Default::default()
|
||||
}
|
||||
.delete(&*tx)
|
||||
.await?
|
||||
.rows_affected
|
||||
};
|
||||
|
||||
if rows_affected == 0 {
|
||||
Err(anyhow!("no such invitation"))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
|
||||
self.transaction(|tx| async move {
|
||||
let tx = tx;
|
||||
|
@ -3087,7 +3167,7 @@ impl Database {
|
|||
WITH RECURSIVE channel_tree(child_id, parent_id, depth) AS (
|
||||
SELECT channel_id as child_id, CAST(NULL as INTEGER) as parent_id, 0
|
||||
FROM channel_members
|
||||
WHERE user_id = $1
|
||||
WHERE user_id = $1 AND accepted
|
||||
UNION
|
||||
SELECT channel_parents.child_id, channel_parents.parent_id, channel_tree.depth + 1
|
||||
FROM channel_parents, channel_tree
|
||||
|
@ -3114,6 +3194,22 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
|
||||
pub async fn get_channel(&self, channel_id: ChannelId) -> Result<Channel> {
|
||||
self.transaction(|tx| async move {
|
||||
let tx = tx;
|
||||
let channel = channel::Entity::find_by_id(channel_id)
|
||||
.one(&*tx)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow!("no such channel"))?;
|
||||
Ok(Channel {
|
||||
id: channel.id,
|
||||
name: channel.name,
|
||||
parent_id: None,
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
|
||||
where
|
||||
F: Send + Fn(TransactionHandle) -> Fut,
|
||||
|
|
|
@ -10,6 +10,8 @@ pub struct Model {
|
|||
pub id: ChannelMemberId,
|
||||
pub channel_id: ChannelId,
|
||||
pub user_id: UserId,
|
||||
pub accepted: bool,
|
||||
pub admin: bool,
|
||||
}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
|
|
@ -894,18 +894,21 @@ test_both_dbs!(test_channels_postgres, test_channels_sqlite, db, {
|
|||
.unwrap()
|
||||
.user_id;
|
||||
|
||||
let zed_id = db.create_root_channel("zed").await.unwrap();
|
||||
let crdb_id = db.create_channel("crdb", Some(zed_id)).await.unwrap();
|
||||
let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
|
||||
let crdb_id = db.create_channel("crdb", Some(zed_id), a_id).await.unwrap();
|
||||
let livestreaming_id = db
|
||||
.create_channel("livestreaming", Some(zed_id))
|
||||
.create_channel("livestreaming", Some(zed_id), a_id)
|
||||
.await
|
||||
.unwrap();
|
||||
let replace_id = db
|
||||
.create_channel("replace", Some(zed_id), a_id)
|
||||
.await
|
||||
.unwrap();
|
||||
let rust_id = db.create_root_channel("rust", a_id).await.unwrap();
|
||||
let cargo_id = db
|
||||
.create_channel("cargo", Some(rust_id), a_id)
|
||||
.await
|
||||
.unwrap();
|
||||
let replace_id = db.create_channel("replace", Some(zed_id)).await.unwrap();
|
||||
let rust_id = db.create_root_channel("rust").await.unwrap();
|
||||
let cargo_id = db.create_channel("cargo", Some(rust_id)).await.unwrap();
|
||||
|
||||
db.add_channel_member(zed_id, a_id).await.unwrap();
|
||||
db.add_channel_member(rust_id, a_id).await.unwrap();
|
||||
|
||||
let channels = db.get_channels(a_id).await.unwrap();
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ mod connection_pool;
|
|||
|
||||
use crate::{
|
||||
auth,
|
||||
db::{self, Database, ProjectId, RoomId, ServerId, User, UserId},
|
||||
db::{self, ChannelId, Database, ProjectId, RoomId, ServerId, User, UserId},
|
||||
executor::Executor,
|
||||
AppState, Result,
|
||||
};
|
||||
|
@ -239,6 +239,10 @@ impl Server {
|
|||
.add_request_handler(request_contact)
|
||||
.add_request_handler(remove_contact)
|
||||
.add_request_handler(respond_to_contact_request)
|
||||
.add_request_handler(create_channel)
|
||||
.add_request_handler(invite_channel_member)
|
||||
.add_request_handler(remove_channel_member)
|
||||
.add_request_handler(respond_to_channel_invite)
|
||||
.add_request_handler(follow)
|
||||
.add_message_handler(unfollow)
|
||||
.add_message_handler(update_followers)
|
||||
|
@ -2084,6 +2088,100 @@ async fn remove_contact(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_channel(
|
||||
request: proto::CreateChannel,
|
||||
response: Response<proto::CreateChannel>,
|
||||
session: Session,
|
||||
) -> Result<()> {
|
||||
let db = session.db().await;
|
||||
let id = db
|
||||
.create_channel(
|
||||
&request.name,
|
||||
request.parent_id.map(|id| ChannelId::from_proto(id)),
|
||||
session.user_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut update = proto::UpdateChannels::default();
|
||||
update.channels.push(proto::Channel {
|
||||
id: id.to_proto(),
|
||||
name: request.name,
|
||||
parent_id: request.parent_id,
|
||||
});
|
||||
session.peer.send(session.connection_id, update)?;
|
||||
response.send(proto::CreateChannelResponse {
|
||||
channel_id: id.to_proto(),
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn invite_channel_member(
|
||||
request: proto::InviteChannelMember,
|
||||
response: Response<proto::InviteChannelMember>,
|
||||
session: Session,
|
||||
) -> Result<()> {
|
||||
let db = session.db().await;
|
||||
let channel_id = ChannelId::from_proto(request.channel_id);
|
||||
let channel = db.get_channel(channel_id).await?;
|
||||
let invitee_id = UserId::from_proto(request.user_id);
|
||||
db.invite_channel_member(channel_id, invitee_id, session.user_id, false)
|
||||
.await?;
|
||||
|
||||
let mut update = proto::UpdateChannels::default();
|
||||
update.channel_invitations.push(proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
name: channel.name,
|
||||
parent_id: None,
|
||||
});
|
||||
for connection_id in session
|
||||
.connection_pool()
|
||||
.await
|
||||
.user_connection_ids(invitee_id)
|
||||
{
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
}
|
||||
|
||||
response.send(proto::Ack {})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_channel_member(
|
||||
request: proto::RemoveChannelMember,
|
||||
response: Response<proto::RemoveChannelMember>,
|
||||
session: Session,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn respond_to_channel_invite(
|
||||
request: proto::RespondToChannelInvite,
|
||||
response: Response<proto::RespondToChannelInvite>,
|
||||
session: Session,
|
||||
) -> Result<()> {
|
||||
let db = session.db().await;
|
||||
let channel_id = ChannelId::from_proto(request.channel_id);
|
||||
let channel = db.get_channel(channel_id).await?;
|
||||
db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
|
||||
.await?;
|
||||
|
||||
let mut update = proto::UpdateChannels::default();
|
||||
update
|
||||
.remove_channel_invitations
|
||||
.push(channel_id.to_proto());
|
||||
if request.accept {
|
||||
update.channels.push(proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
name: channel.name,
|
||||
parent_id: None,
|
||||
});
|
||||
}
|
||||
session.peer.send(session.connection_id, update)?;
|
||||
response.send(proto::Ack {})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
|
||||
let project_id = ProjectId::from_proto(request.project_id);
|
||||
let project_connection_ids = session
|
||||
|
|
|
@ -7,7 +7,8 @@ use crate::{
|
|||
use anyhow::anyhow;
|
||||
use call::ActiveCall;
|
||||
use client::{
|
||||
self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore,
|
||||
self, proto::PeerId, ChannelStore, Client, Connection, Credentials, EstablishConnectionError,
|
||||
UserStore,
|
||||
};
|
||||
use collections::{HashMap, HashSet};
|
||||
use fs::FakeFs;
|
||||
|
@ -33,9 +34,9 @@ use std::{
|
|||
use util::http::FakeHttpClient;
|
||||
use workspace::Workspace;
|
||||
|
||||
mod channel_tests;
|
||||
mod integration_tests;
|
||||
mod randomized_integration_tests;
|
||||
mod channel_tests;
|
||||
|
||||
struct TestServer {
|
||||
app_state: Arc<AppState>,
|
||||
|
@ -187,6 +188,8 @@ impl TestServer {
|
|||
|
||||
let fs = FakeFs::new(cx.background());
|
||||
let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
|
||||
let channel_store =
|
||||
cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
|
||||
let app_state = Arc::new(workspace::AppState {
|
||||
client: client.clone(),
|
||||
user_store: user_store.clone(),
|
||||
|
@ -218,6 +221,7 @@ impl TestServer {
|
|||
username: name.to_string(),
|
||||
state: Default::default(),
|
||||
user_store,
|
||||
channel_store,
|
||||
fs,
|
||||
language_registry: Arc::new(LanguageRegistry::test()),
|
||||
};
|
||||
|
@ -320,6 +324,7 @@ struct TestClient {
|
|||
username: String,
|
||||
state: RefCell<TestClientState>,
|
||||
pub user_store: ModelHandle<UserStore>,
|
||||
pub channel_store: ModelHandle<ChannelStore>,
|
||||
language_registry: Arc<LanguageRegistry>,
|
||||
fs: Arc<FakeFs>,
|
||||
}
|
||||
|
|
|
@ -1,85 +1,108 @@
|
|||
use client::Channel;
|
||||
use gpui::{executor::Deterministic, TestAppContext};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::db::Channel;
|
||||
|
||||
use super::TestServer;
|
||||
|
||||
#[gpui::test]
|
||||
async fn test_basic_channels(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
|
||||
async fn test_basic_channels(
|
||||
deterministic: Arc<Deterministic>,
|
||||
cx_a: &mut TestAppContext,
|
||||
cx_b: &mut TestAppContext,
|
||||
) {
|
||||
deterministic.forbid_parking();
|
||||
let mut server = TestServer::start(&deterministic).await;
|
||||
let client_a = server.create_client(cx, "user_a").await;
|
||||
let a_id = crate::db::UserId(client_a.user_id().unwrap() as i32);
|
||||
let db = server._test_db.db();
|
||||
let client_a = server.create_client(cx_a, "user_a").await;
|
||||
let client_b = server.create_client(cx_b, "user_b").await;
|
||||
|
||||
let zed_id = db.create_root_channel("zed").await.unwrap();
|
||||
let crdb_id = db.create_channel("crdb", Some(zed_id)).await.unwrap();
|
||||
let livestreaming_id = db
|
||||
.create_channel("livestreaming", Some(zed_id))
|
||||
let channel_a_id = client_a
|
||||
.channel_store
|
||||
.update(cx_a, |channel_store, _| {
|
||||
channel_store.create_channel("channel-a", None)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let replace_id = db.create_channel("replace", Some(zed_id)).await.unwrap();
|
||||
let rust_id = db.create_root_channel("rust").await.unwrap();
|
||||
let cargo_id = db.create_channel("cargo", Some(rust_id)).await.unwrap();
|
||||
|
||||
db.add_channel_member(zed_id, a_id).await.unwrap();
|
||||
db.add_channel_member(rust_id, a_id).await.unwrap();
|
||||
|
||||
let channels = db.get_channels(a_id).await.unwrap();
|
||||
assert_eq!(
|
||||
channels,
|
||||
vec![
|
||||
Channel {
|
||||
id: zed_id,
|
||||
name: "zed".to_string(),
|
||||
client_a.channel_store.read_with(cx_a, |channels, _| {
|
||||
assert_eq!(
|
||||
channels.channels(),
|
||||
&[Channel {
|
||||
id: channel_a_id,
|
||||
name: "channel-a".to_string(),
|
||||
parent_id: None,
|
||||
},
|
||||
Channel {
|
||||
id: rust_id,
|
||||
name: "rust".to_string(),
|
||||
parent_id: None,
|
||||
},
|
||||
Channel {
|
||||
id: crdb_id,
|
||||
name: "crdb".to_string(),
|
||||
parent_id: Some(zed_id),
|
||||
},
|
||||
Channel {
|
||||
id: livestreaming_id,
|
||||
name: "livestreaming".to_string(),
|
||||
parent_id: Some(zed_id),
|
||||
},
|
||||
Channel {
|
||||
id: replace_id,
|
||||
name: "replace".to_string(),
|
||||
parent_id: Some(zed_id),
|
||||
},
|
||||
Channel {
|
||||
id: cargo_id,
|
||||
name: "cargo".to_string(),
|
||||
parent_id: Some(rust_id),
|
||||
}
|
||||
]
|
||||
);
|
||||
}
|
||||
}]
|
||||
)
|
||||
});
|
||||
|
||||
#[gpui::test]
|
||||
async fn test_block_cycle_creation(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
|
||||
deterministic.forbid_parking();
|
||||
let mut server = TestServer::start(&deterministic).await;
|
||||
let client_a = server.create_client(cx, "user_a").await;
|
||||
let a_id = crate::db::UserId(client_a.user_id().unwrap() as i32);
|
||||
let db = server._test_db.db();
|
||||
client_b
|
||||
.channel_store
|
||||
.read_with(cx_b, |channels, _| assert_eq!(channels.channels(), &[]));
|
||||
|
||||
let zed_id = db.create_root_channel("zed").await.unwrap();
|
||||
let first_id = db.create_channel("first", Some(zed_id)).await.unwrap();
|
||||
let second_id = db
|
||||
.create_channel("second_id", Some(first_id))
|
||||
// Invite client B to channel A as client A.
|
||||
client_a
|
||||
.channel_store
|
||||
.update(cx_a, |channel_store, _| {
|
||||
channel_store.invite_member(channel_a_id, client_b.user_id().unwrap(), false)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Wait for client b to see the invitation
|
||||
deterministic.run_until_parked();
|
||||
|
||||
client_b.channel_store.read_with(cx_b, |channels, _| {
|
||||
assert_eq!(
|
||||
channels.channel_invitations(),
|
||||
&[Channel {
|
||||
id: channel_a_id,
|
||||
name: "channel-a".to_string(),
|
||||
parent_id: None,
|
||||
}]
|
||||
)
|
||||
});
|
||||
|
||||
// Client B now sees that they are in channel A.
|
||||
client_b
|
||||
.channel_store
|
||||
.update(cx_b, |channels, _| {
|
||||
channels.respond_to_channel_invite(channel_a_id, true)
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
client_b.channel_store.read_with(cx_b, |channels, _| {
|
||||
assert_eq!(channels.channel_invitations(), &[]);
|
||||
assert_eq!(
|
||||
channels.channels(),
|
||||
&[Channel {
|
||||
id: channel_a_id,
|
||||
name: "channel-a".to_string(),
|
||||
parent_id: None,
|
||||
}]
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// Invariants to test:
|
||||
// 1. Dag structure is maintained for all operations (can't make a cycle)
|
||||
// 2. Can't be a member of a super channel, and accept a membership of a sub channel (by definition, a noop)
|
||||
|
||||
// #[gpui::test]
|
||||
// async fn test_block_cycle_creation(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
|
||||
// // deterministic.forbid_parking();
|
||||
// // let mut server = TestServer::start(&deterministic).await;
|
||||
// // let client_a = server.create_client(cx, "user_a").await;
|
||||
// // let a_id = crate::db::UserId(client_a.user_id().unwrap() as i32);
|
||||
// // let db = server._test_db.db();
|
||||
|
||||
// // let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
|
||||
// // let first_id = db.create_channel("first", Some(zed_id)).await.unwrap();
|
||||
// // let second_id = db
|
||||
// // .create_channel("second_id", Some(first_id))
|
||||
// // .await
|
||||
// // .unwrap();
|
||||
// }
|
||||
|
||||
/*
|
||||
Linear things:
|
||||
- A way of expressing progress to the team
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue