From d1a44b889edd96fd61e4ba1ca712c80f50d45ee9 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 30 Nov 2022 17:36:25 +0100 Subject: [PATCH] Implement contacts using sea-orm Co-Authored-By: Nathan Sobo --- crates/collab/src/db2.rs | 298 +++++++++++++++++++- crates/collab/src/db2/contact.rs | 58 ++++ crates/collab/src/db2/room_participant.rs | 12 + crates/collab/src/db2/tests.rs | 314 +++++++++++----------- crates/collab/src/db2/user.rs | 8 + 5 files changed, 531 insertions(+), 159 deletions(-) create mode 100644 crates/collab/src/db2/contact.rs diff --git a/crates/collab/src/db2.rs b/crates/collab/src/db2.rs index 5c5157d2aa..35a45acedf 100644 --- a/crates/collab/src/db2.rs +++ b/crates/collab/src/db2.rs @@ -1,4 +1,5 @@ mod access_token; +mod contact; mod project; mod project_collaborator; mod room; @@ -18,8 +19,11 @@ use sea_orm::{ entity::prelude::*, ConnectOptions, DatabaseConnection, DatabaseTransaction, DbErr, TransactionTrait, }; -use sea_orm::{ActiveValue, ConnectionTrait, IntoActiveModel, QueryOrder, QuerySelect}; -use sea_query::{OnConflict, Query}; +use sea_orm::{ + ActiveValue, ConnectionTrait, FromQueryResult, IntoActiveModel, JoinType, QueryOrder, + QuerySelect, +}; +use sea_query::{Alias, Expr, OnConflict, Query}; use serde::{Deserialize, Serialize}; use sqlx::migrate::{Migrate, Migration, MigrationSource}; use sqlx::Connection; @@ -29,6 +33,7 @@ use std::time::Duration; use std::{future::Future, marker::PhantomData, rc::Rc, sync::Arc}; use tokio::sync::{Mutex, OwnedMutexGuard}; +pub use contact::Contact; pub use user::Model as User; pub struct Database { @@ -95,6 +100,8 @@ impl Database { Ok(new_migrations) } + // users + pub async fn create_user( &self, email_address: &str, @@ -197,6 +204,292 @@ impl Database { .await } + // contacts + + pub async fn get_contacts(&self, user_id: UserId) -> Result> { + #[derive(Debug, FromQueryResult)] + struct ContactWithUserBusyStatuses { + user_id_a: UserId, + user_id_b: UserId, + a_to_b: bool, + accepted: bool, + should_notify: bool, + user_a_busy: bool, + user_b_busy: bool, + } + + self.transact(|tx| async move { + let user_a_participant = Alias::new("user_a_participant"); + let user_b_participant = Alias::new("user_b_participant"); + let mut db_contacts = contact::Entity::find() + .column_as( + Expr::tbl(user_a_participant.clone(), room_participant::Column::Id) + .is_not_null(), + "user_a_busy", + ) + .column_as( + Expr::tbl(user_b_participant.clone(), room_participant::Column::Id) + .is_not_null(), + "user_b_busy", + ) + .filter( + contact::Column::UserIdA + .eq(user_id) + .or(contact::Column::UserIdB.eq(user_id)), + ) + .join_as( + JoinType::LeftJoin, + contact::Relation::UserARoomParticipant.def(), + user_a_participant, + ) + .join_as( + JoinType::LeftJoin, + contact::Relation::UserBRoomParticipant.def(), + user_b_participant, + ) + .into_model::() + .stream(&tx) + .await?; + + let mut contacts = Vec::new(); + while let Some(db_contact) = db_contacts.next().await { + let db_contact = db_contact?; + if db_contact.user_id_a == user_id { + if db_contact.accepted { + contacts.push(Contact::Accepted { + user_id: db_contact.user_id_b, + should_notify: db_contact.should_notify && db_contact.a_to_b, + busy: db_contact.user_b_busy, + }); + } else if db_contact.a_to_b { + contacts.push(Contact::Outgoing { + user_id: db_contact.user_id_b, + }) + } else { + contacts.push(Contact::Incoming { + user_id: db_contact.user_id_b, + should_notify: db_contact.should_notify, + }); + } + } else if db_contact.accepted { + contacts.push(Contact::Accepted { + user_id: db_contact.user_id_a, + should_notify: db_contact.should_notify && !db_contact.a_to_b, + busy: db_contact.user_a_busy, + }); + } else if db_contact.a_to_b { + contacts.push(Contact::Incoming { + user_id: db_contact.user_id_a, + should_notify: db_contact.should_notify, + }); + } else { + contacts.push(Contact::Outgoing { + user_id: db_contact.user_id_a, + }); + } + } + + contacts.sort_unstable_by_key(|contact| contact.user_id()); + + Ok(contacts) + }) + .await + } + + pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result { + self.transact(|tx| async move { + let (id_a, id_b) = if user_id_1 < user_id_2 { + (user_id_1, user_id_2) + } else { + (user_id_2, user_id_1) + }; + + Ok(contact::Entity::find() + .filter( + contact::Column::UserIdA + .eq(id_a) + .and(contact::Column::UserIdB.eq(id_b)) + .and(contact::Column::Accepted.eq(true)), + ) + .one(&tx) + .await? + .is_some()) + }) + .await + } + + pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> { + self.transact(|mut tx| async move { + let (id_a, id_b, a_to_b) = if sender_id < receiver_id { + (sender_id, receiver_id, true) + } else { + (receiver_id, sender_id, false) + }; + + let rows_affected = contact::Entity::insert(contact::ActiveModel { + user_id_a: ActiveValue::set(id_a), + user_id_b: ActiveValue::set(id_b), + a_to_b: ActiveValue::set(a_to_b), + accepted: ActiveValue::set(false), + should_notify: ActiveValue::set(true), + ..Default::default() + }) + .on_conflict( + OnConflict::columns([contact::Column::UserIdA, contact::Column::UserIdB]) + .values([ + (contact::Column::Accepted, true.into()), + (contact::Column::ShouldNotify, false.into()), + ]) + .action_and_where( + contact::Column::Accepted.eq(false).and( + contact::Column::AToB + .eq(a_to_b) + .and(contact::Column::UserIdA.eq(id_b)) + .or(contact::Column::AToB + .ne(a_to_b) + .and(contact::Column::UserIdA.eq(id_a))), + ), + ) + .to_owned(), + ) + .exec_without_returning(&tx) + .await?; + + if rows_affected == 1 { + tx.commit().await?; + Ok(()) + } else { + Err(anyhow!("contact already requested"))? + } + }) + .await + } + + pub async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> { + self.transact(|mut tx| async move { + // let (id_a, id_b) = if responder_id < requester_id { + // (responder_id, requester_id) + // } else { + // (requester_id, responder_id) + // }; + // let query = " + // DELETE FROM contacts + // WHERE user_id_a = $1 AND user_id_b = $2; + // "; + // let result = sqlx::query(query) + // .bind(id_a.0) + // .bind(id_b.0) + // .execute(&mut tx) + // .await?; + + // if result.rows_affected() == 1 { + // tx.commit().await?; + // Ok(()) + // } else { + // Err(anyhow!("no such contact"))? + // } + todo!() + }) + .await + } + + pub async fn dismiss_contact_notification( + &self, + user_id: UserId, + contact_user_id: UserId, + ) -> Result<()> { + self.transact(|tx| async move { + let (id_a, id_b, a_to_b) = if user_id < contact_user_id { + (user_id, contact_user_id, true) + } else { + (contact_user_id, user_id, false) + }; + + let result = contact::Entity::update_many() + .set(contact::ActiveModel { + should_notify: ActiveValue::set(false), + ..Default::default() + }) + .filter( + contact::Column::UserIdA + .eq(id_a) + .and(contact::Column::UserIdB.eq(id_b)) + .and( + contact::Column::AToB + .eq(a_to_b) + .and(contact::Column::Accepted.eq(true)) + .or(contact::Column::AToB + .ne(a_to_b) + .and(contact::Column::Accepted.eq(false))), + ), + ) + .exec(&tx) + .await?; + if result.rows_affected == 0 { + Err(anyhow!("no such contact request"))? + } else { + tx.commit().await?; + Ok(()) + } + }) + .await + } + + pub async fn respond_to_contact_request( + &self, + responder_id: UserId, + requester_id: UserId, + accept: bool, + ) -> Result<()> { + self.transact(|tx| async move { + let (id_a, id_b, a_to_b) = if responder_id < requester_id { + (responder_id, requester_id, false) + } else { + (requester_id, responder_id, true) + }; + let rows_affected = if accept { + let result = contact::Entity::update_many() + .set(contact::ActiveModel { + accepted: ActiveValue::set(true), + should_notify: ActiveValue::set(true), + ..Default::default() + }) + .filter( + contact::Column::UserIdA + .eq(id_a) + .and(contact::Column::UserIdB.eq(id_b)) + .and(contact::Column::AToB.eq(a_to_b)), + ) + .exec(&tx) + .await?; + result.rows_affected + } else { + let result = contact::Entity::delete_many() + .filter( + contact::Column::UserIdA + .eq(id_a) + .and(contact::Column::UserIdB.eq(id_b)) + .and(contact::Column::AToB.eq(a_to_b)) + .and(contact::Column::Accepted.eq(false)), + ) + .exec(&tx) + .await?; + + result.rows_affected + }; + + if rows_affected == 1 { + tx.commit().await?; + Ok(()) + } else { + Err(anyhow!("no such contact request"))? + } + }) + .await + } + + // projects + pub async fn share_project( &self, room_id: RoomId, @@ -632,6 +925,7 @@ macro_rules! id_type { } id_type!(AccessTokenId); +id_type!(ContactId); id_type!(UserId); id_type!(RoomId); id_type!(RoomParticipantId); diff --git a/crates/collab/src/db2/contact.rs b/crates/collab/src/db2/contact.rs new file mode 100644 index 0000000000..c39d6643b3 --- /dev/null +++ b/crates/collab/src/db2/contact.rs @@ -0,0 +1,58 @@ +use super::{ContactId, UserId}; +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, Default, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "contacts")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: ContactId, + pub user_id_a: UserId, + pub user_id_b: UserId, + pub a_to_b: bool, + pub should_notify: bool, + pub accepted: bool, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::room_participant::Entity", + from = "Column::UserIdA", + to = "super::room_participant::Column::UserId" + )] + UserARoomParticipant, + #[sea_orm( + belongs_to = "super::room_participant::Entity", + from = "Column::UserIdB", + to = "super::room_participant::Column::UserId" + )] + UserBRoomParticipant, +} + +impl ActiveModelBehavior for ActiveModel {} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Contact { + Accepted { + user_id: UserId, + should_notify: bool, + busy: bool, + }, + Outgoing { + user_id: UserId, + }, + Incoming { + user_id: UserId, + should_notify: bool, + }, +} + +impl Contact { + pub fn user_id(&self) -> UserId { + match self { + Contact::Accepted { user_id, .. } => *user_id, + Contact::Outgoing { user_id } => *user_id, + Contact::Incoming { user_id, .. } => *user_id, + } + } +} diff --git a/crates/collab/src/db2/room_participant.rs b/crates/collab/src/db2/room_participant.rs index 4fabfc3068..c7c804581b 100644 --- a/crates/collab/src/db2/room_participant.rs +++ b/crates/collab/src/db2/room_participant.rs @@ -18,6 +18,12 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm( + belongs_to = "super::user::Entity", + from = "Column::UserId", + to = "super::user::Column::Id" + )] + User, #[sea_orm( belongs_to = "super::room::Entity", from = "Column::RoomId", @@ -26,6 +32,12 @@ pub enum Relation { Room, } +impl Related for Entity { + fn to() -> RelationDef { + Relation::User.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::Room.def() diff --git a/crates/collab/src/db2/tests.rs b/crates/collab/src/db2/tests.rs index c66e2fa406..1aeb802025 100644 --- a/crates/collab/src/db2/tests.rs +++ b/crates/collab/src/db2/tests.rs @@ -192,174 +192,174 @@ test_both_dbs!( } ); -// test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { -// let mut user_ids = Vec::new(); -// for i in 0..3 { -// user_ids.push( -// db.create_user( -// &format!("user{i}@example.com"), -// false, -// NewUserParams { -// github_login: format!("user{i}"), -// github_user_id: i, -// invite_count: 0, -// }, -// ) -// .await -// .unwrap() -// .user_id, -// ); -// } +test_both_dbs!(test_add_contacts_postgres, test_add_contacts_sqlite, db, { + let mut user_ids = Vec::new(); + for i in 0..3 { + user_ids.push( + db.create_user( + &format!("user{i}@example.com"), + false, + NewUserParams { + github_login: format!("user{i}"), + github_user_id: i, + invite_count: 0, + }, + ) + .await + .unwrap() + .user_id, + ); + } -// let user_1 = user_ids[0]; -// let user_2 = user_ids[1]; -// let user_3 = user_ids[2]; + let user_1 = user_ids[0]; + let user_2 = user_ids[1]; + let user_3 = user_ids[2]; -// // User starts with no contacts -// assert_eq!(db.get_contacts(user_1).await.unwrap(), &[]); + // User starts with no contacts + assert_eq!(db.get_contacts(user_1).await.unwrap(), &[]); -// // User requests a contact. Both users see the pending request. -// db.send_contact_request(user_1, user_2).await.unwrap(); -// assert!(!db.has_contact(user_1, user_2).await.unwrap()); -// assert!(!db.has_contact(user_2, user_1).await.unwrap()); -// assert_eq!( -// db.get_contacts(user_1).await.unwrap(), -// &[Contact::Outgoing { user_id: user_2 }], -// ); -// assert_eq!( -// db.get_contacts(user_2).await.unwrap(), -// &[Contact::Incoming { -// user_id: user_1, -// should_notify: true -// }] -// ); + // User requests a contact. Both users see the pending request. + db.send_contact_request(user_1, user_2).await.unwrap(); + assert!(!db.has_contact(user_1, user_2).await.unwrap()); + assert!(!db.has_contact(user_2, user_1).await.unwrap()); + assert_eq!( + db.get_contacts(user_1).await.unwrap(), + &[Contact::Outgoing { user_id: user_2 }], + ); + assert_eq!( + db.get_contacts(user_2).await.unwrap(), + &[Contact::Incoming { + user_id: user_1, + should_notify: true + }] + ); -// // User 2 dismisses the contact request notification without accepting or rejecting. -// // We shouldn't notify them again. -// db.dismiss_contact_notification(user_1, user_2) -// .await -// .unwrap_err(); -// db.dismiss_contact_notification(user_2, user_1) -// .await -// .unwrap(); -// assert_eq!( -// db.get_contacts(user_2).await.unwrap(), -// &[Contact::Incoming { -// user_id: user_1, -// should_notify: false -// }] -// ); + // User 2 dismisses the contact request notification without accepting or rejecting. + // We shouldn't notify them again. + db.dismiss_contact_notification(user_1, user_2) + .await + .unwrap_err(); + db.dismiss_contact_notification(user_2, user_1) + .await + .unwrap(); + assert_eq!( + db.get_contacts(user_2).await.unwrap(), + &[Contact::Incoming { + user_id: user_1, + should_notify: false + }] + ); -// // User can't accept their own contact request -// db.respond_to_contact_request(user_1, user_2, true) -// .await -// .unwrap_err(); + // User can't accept their own contact request + db.respond_to_contact_request(user_1, user_2, true) + .await + .unwrap_err(); -// // User accepts a contact request. Both users see the contact. -// db.respond_to_contact_request(user_2, user_1, true) -// .await -// .unwrap(); -// assert_eq!( -// db.get_contacts(user_1).await.unwrap(), -// &[Contact::Accepted { -// user_id: user_2, -// should_notify: true, -// busy: false, -// }], -// ); -// assert!(db.has_contact(user_1, user_2).await.unwrap()); -// assert!(db.has_contact(user_2, user_1).await.unwrap()); -// assert_eq!( -// db.get_contacts(user_2).await.unwrap(), -// &[Contact::Accepted { -// user_id: user_1, -// should_notify: false, -// busy: false, -// }] -// ); + // User accepts a contact request. Both users see the contact. + db.respond_to_contact_request(user_2, user_1, true) + .await + .unwrap(); + assert_eq!( + db.get_contacts(user_1).await.unwrap(), + &[Contact::Accepted { + user_id: user_2, + should_notify: true, + busy: false, + }], + ); + assert!(db.has_contact(user_1, user_2).await.unwrap()); + assert!(db.has_contact(user_2, user_1).await.unwrap()); + assert_eq!( + db.get_contacts(user_2).await.unwrap(), + &[Contact::Accepted { + user_id: user_1, + should_notify: false, + busy: false, + }] + ); -// // Users cannot re-request existing contacts. -// db.send_contact_request(user_1, user_2).await.unwrap_err(); -// db.send_contact_request(user_2, user_1).await.unwrap_err(); + // Users cannot re-request existing contacts. + db.send_contact_request(user_1, user_2).await.unwrap_err(); + db.send_contact_request(user_2, user_1).await.unwrap_err(); -// // Users can't dismiss notifications of them accepting other users' requests. -// db.dismiss_contact_notification(user_2, user_1) -// .await -// .unwrap_err(); -// assert_eq!( -// db.get_contacts(user_1).await.unwrap(), -// &[Contact::Accepted { -// user_id: user_2, -// should_notify: true, -// busy: false, -// }] -// ); + // Users can't dismiss notifications of them accepting other users' requests. + db.dismiss_contact_notification(user_2, user_1) + .await + .unwrap_err(); + assert_eq!( + db.get_contacts(user_1).await.unwrap(), + &[Contact::Accepted { + user_id: user_2, + should_notify: true, + busy: false, + }] + ); -// // Users can dismiss notifications of other users accepting their requests. -// db.dismiss_contact_notification(user_1, user_2) -// .await -// .unwrap(); -// assert_eq!( -// db.get_contacts(user_1).await.unwrap(), -// &[Contact::Accepted { -// user_id: user_2, -// should_notify: false, -// busy: false, -// }] -// ); + // Users can dismiss notifications of other users accepting their requests. + db.dismiss_contact_notification(user_1, user_2) + .await + .unwrap(); + assert_eq!( + db.get_contacts(user_1).await.unwrap(), + &[Contact::Accepted { + user_id: user_2, + should_notify: false, + busy: false, + }] + ); -// // Users send each other concurrent contact requests and -// // see that they are immediately accepted. -// db.send_contact_request(user_1, user_3).await.unwrap(); -// db.send_contact_request(user_3, user_1).await.unwrap(); -// assert_eq!( -// db.get_contacts(user_1).await.unwrap(), -// &[ -// Contact::Accepted { -// user_id: user_2, -// should_notify: false, -// busy: false, -// }, -// Contact::Accepted { -// user_id: user_3, -// should_notify: false, -// busy: false, -// } -// ] -// ); -// assert_eq!( -// db.get_contacts(user_3).await.unwrap(), -// &[Contact::Accepted { -// user_id: user_1, -// should_notify: false, -// busy: false, -// }], -// ); + // Users send each other concurrent contact requests and + // see that they are immediately accepted. + db.send_contact_request(user_1, user_3).await.unwrap(); + db.send_contact_request(user_3, user_1).await.unwrap(); + assert_eq!( + db.get_contacts(user_1).await.unwrap(), + &[ + Contact::Accepted { + user_id: user_2, + should_notify: false, + busy: false, + }, + Contact::Accepted { + user_id: user_3, + should_notify: false, + busy: false, + } + ] + ); + assert_eq!( + db.get_contacts(user_3).await.unwrap(), + &[Contact::Accepted { + user_id: user_1, + should_notify: false, + busy: false, + }], + ); -// // User declines a contact request. Both users see that it is gone. -// db.send_contact_request(user_2, user_3).await.unwrap(); -// db.respond_to_contact_request(user_3, user_2, false) -// .await -// .unwrap(); -// assert!(!db.has_contact(user_2, user_3).await.unwrap()); -// assert!(!db.has_contact(user_3, user_2).await.unwrap()); -// assert_eq!( -// db.get_contacts(user_2).await.unwrap(), -// &[Contact::Accepted { -// user_id: user_1, -// should_notify: false, -// busy: false, -// }] -// ); -// assert_eq!( -// db.get_contacts(user_3).await.unwrap(), -// &[Contact::Accepted { -// user_id: user_1, -// should_notify: false, -// busy: false, -// }], -// ); -// }); + // User declines a contact request. Both users see that it is gone. + db.send_contact_request(user_2, user_3).await.unwrap(); + db.respond_to_contact_request(user_3, user_2, false) + .await + .unwrap(); + assert!(!db.has_contact(user_2, user_3).await.unwrap()); + assert!(!db.has_contact(user_3, user_2).await.unwrap()); + assert_eq!( + db.get_contacts(user_2).await.unwrap(), + &[Contact::Accepted { + user_id: user_1, + should_notify: false, + busy: false, + }] + ); + assert_eq!( + db.get_contacts(user_3).await.unwrap(), + &[Contact::Accepted { + user_id: user_1, + should_notify: false, + busy: false, + }], + ); +}); test_both_dbs!(test_metrics_id_postgres, test_metrics_id_sqlite, db, { let NewUserResult { diff --git a/crates/collab/src/db2/user.rs b/crates/collab/src/db2/user.rs index 5e8a484571..f6bac9dc77 100644 --- a/crates/collab/src/db2/user.rs +++ b/crates/collab/src/db2/user.rs @@ -20,6 +20,8 @@ pub struct Model { pub enum Relation { #[sea_orm(has_many = "super::access_token::Entity")] AccessToken, + #[sea_orm(has_one = "super::room_participant::Entity")] + RoomParticipant, } impl Related for Entity { @@ -28,4 +30,10 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::RoomParticipant.def() + } +} + impl ActiveModelBehavior for ActiveModel {}