Generalize notifications' actor id to entity id

This way, we can retrieve channel invite notifications when
responding to the invites.
This commit is contained in:
Max Brunsfeld 2023-10-17 10:34:50 -07:00
parent f225039d36
commit f2d36a47ae
13 changed files with 115 additions and 98 deletions

View file

@ -324,8 +324,8 @@ CREATE TABLE "notifications" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"created_at" TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
"recipient_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"actor_id" INTEGER REFERENCES users (id) ON DELETE CASCADE,
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
"entity_id" INTEGER,
"content" TEXT,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"response" BOOLEAN

View file

@ -9,8 +9,8 @@ CREATE TABLE notifications (
"id" SERIAL PRIMARY KEY,
"created_at" TIMESTAMP NOT NULL DEFAULT now(),
"recipient_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"actor_id" INTEGER REFERENCES users (id) ON DELETE CASCADE,
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
"entity_id" INTEGER,
"content" TEXT,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"response" BOOLEAN

View file

@ -125,7 +125,7 @@ impl Database {
}
pub async fn initialize_static_data(&mut self) -> Result<()> {
self.initialize_notification_enum().await?;
self.initialize_notification_kinds().await?;
Ok(())
}

View file

@ -166,6 +166,11 @@ impl Database {
self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
.await?;
let channel = channel::Entity::find_by_id(channel_id)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such channel"))?;
channel_member::ActiveModel {
channel_id: ActiveValue::Set(channel_id),
user_id: ActiveValue::Set(invitee_id),
@ -181,6 +186,7 @@ impl Database {
invitee_id,
rpc::Notification::ChannelInvitation {
channel_id: channel_id.to_proto(),
channel_name: channel.name,
},
true,
&*tx,
@ -269,6 +275,7 @@ impl Database {
user_id,
&rpc::Notification::ChannelInvitation {
channel_id: channel_id.to_proto(),
channel_name: Default::default(),
},
accept,
&*tx,

View file

@ -168,7 +168,7 @@ impl Database {
.create_notification(
receiver_id,
rpc::Notification::ContactRequest {
actor_id: sender_id.to_proto(),
sender_id: sender_id.to_proto(),
},
true,
&*tx,
@ -219,7 +219,7 @@ impl Database {
.remove_notification(
responder_id,
rpc::Notification::ContactRequest {
actor_id: requester_id.to_proto(),
sender_id: requester_id.to_proto(),
},
&*tx,
)
@ -324,7 +324,7 @@ impl Database {
self.respond_to_notification(
responder_id,
&rpc::Notification::ContactRequest {
actor_id: requester_id.to_proto(),
sender_id: requester_id.to_proto(),
},
accept,
&*tx,
@ -337,7 +337,7 @@ impl Database {
self.create_notification(
requester_id,
rpc::Notification::ContactRequestAccepted {
actor_id: responder_id.to_proto(),
responder_id: responder_id.to_proto(),
},
true,
&*tx,

View file

@ -2,7 +2,7 @@ use super::*;
use rpc::Notification;
impl Database {
pub async fn initialize_notification_enum(&mut self) -> Result<()> {
pub async fn initialize_notification_kinds(&mut self) -> Result<()> {
notification_kind::Entity::insert_many(Notification::all_variant_names().iter().map(
|kind| notification_kind::ActiveModel {
name: ActiveValue::Set(kind.to_string()),
@ -64,6 +64,9 @@ impl Database {
.await
}
/// Create a notification. If `avoid_duplicates` is set to true, then avoid
/// creating a new notification if the given recipient already has an
/// unread notification with the given kind and entity id.
pub async fn create_notification(
&self,
recipient_id: UserId,
@ -81,22 +84,14 @@ impl Database {
}
}
let notification_proto = notification.to_proto();
let kind = *self
.notification_kinds_by_name
.get(&notification_proto.kind)
.ok_or_else(|| anyhow!("invalid notification kind {:?}", notification_proto.kind))?;
let actor_id = notification_proto.actor_id.map(|id| UserId::from_proto(id));
let proto = notification.to_proto();
let kind = notification_kind_from_proto(self, &proto)?;
let model = notification::ActiveModel {
recipient_id: ActiveValue::Set(recipient_id),
kind: ActiveValue::Set(kind),
content: ActiveValue::Set(notification_proto.content.clone()),
actor_id: ActiveValue::Set(actor_id),
is_read: ActiveValue::NotSet,
response: ActiveValue::NotSet,
created_at: ActiveValue::NotSet,
id: ActiveValue::NotSet,
entity_id: ActiveValue::Set(proto.entity_id.map(|id| id as i32)),
content: ActiveValue::Set(proto.content.clone()),
..Default::default()
}
.save(&*tx)
.await?;
@ -105,16 +100,18 @@ impl Database {
recipient_id,
proto::Notification {
id: model.id.as_ref().to_proto(),
kind: notification_proto.kind,
kind: proto.kind,
timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
is_read: false,
response: None,
content: notification_proto.content,
actor_id: notification_proto.actor_id,
content: proto.content,
entity_id: proto.entity_id,
},
)))
}
/// Remove an unread notification with the given recipient, kind and
/// entity id.
pub async fn remove_notification(
&self,
recipient_id: UserId,
@ -130,6 +127,8 @@ impl Database {
Ok(id)
}
/// Populate the response for the notification with the given kind and
/// entity id.
pub async fn respond_to_notification(
&self,
recipient_id: UserId,
@ -156,47 +155,38 @@ impl Database {
}
}
pub async fn find_notification(
/// Find an unread notification by its recipient, kind and entity id.
async fn find_notification(
&self,
recipient_id: UserId,
notification: &Notification,
tx: &DatabaseTransaction,
) -> Result<Option<NotificationId>> {
let proto = notification.to_proto();
let kind = *self
.notification_kinds_by_name
.get(&proto.kind)
.ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?;
let mut rows = notification::Entity::find()
let kind = notification_kind_from_proto(self, &proto)?;
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryIds {
Id,
}
Ok(notification::Entity::find()
.select_only()
.column(notification::Column::Id)
.filter(
Condition::all()
.add(notification::Column::RecipientId.eq(recipient_id))
.add(notification::Column::IsRead.eq(false))
.add(notification::Column::Kind.eq(kind))
.add(if proto.actor_id.is_some() {
notification::Column::ActorId.eq(proto.actor_id)
.add(if proto.entity_id.is_some() {
notification::Column::EntityId.eq(proto.entity_id)
} else {
notification::Column::ActorId.is_null()
notification::Column::EntityId.is_null()
}),
)
.stream(&*tx)
.await?;
// Don't rely on the JSON serialization being identical, in case the
// notification type is changed in backward-compatible ways.
while let Some(row) = rows.next().await {
let row = row?;
let id = row.id;
if let Some(proto) = model_to_proto(self, row) {
if let Some(existing) = Notification::from_proto(&proto) {
if existing == *notification {
return Ok(Some(id));
}
}
}
}
Ok(None)
.into_values::<_, QueryIds>()
.one(&*tx)
.await?)
}
}
@ -209,6 +199,17 @@ fn model_to_proto(this: &Database, row: notification::Model) -> Option<proto::No
is_read: row.is_read,
response: row.response,
content: row.content,
actor_id: row.actor_id.map(|id| id.to_proto()),
entity_id: row.entity_id.map(|id| id as u64),
})
}
fn notification_kind_from_proto(
this: &Database,
proto: &proto::Notification,
) -> Result<NotificationKindId> {
Ok(this
.notification_kinds_by_name
.get(&proto.kind)
.copied()
.ok_or_else(|| anyhow!("invalid notification kind {:?}", proto.kind))?)
}

View file

@ -9,8 +9,8 @@ pub struct Model {
pub id: NotificationId,
pub created_at: PrimitiveDateTime,
pub recipient_id: UserId,
pub actor_id: Option<UserId>,
pub kind: NotificationKindId,
pub entity_id: Option<i32>,
pub content: String,
pub is_read: bool,
pub response: Option<bool>,

View file

@ -45,7 +45,7 @@ impl TestDb {
))
.await
.unwrap();
db.initialize_notification_enum().await.unwrap();
db.initialize_notification_kinds().await.unwrap();
db
});
@ -85,7 +85,7 @@ impl TestDb {
.unwrap();
let migrations_path = concat!(env!("CARGO_MANIFEST_DIR"), "/migrations");
db.migrate(Path::new(migrations_path), false).await.unwrap();
db.initialize_notification_enum().await.unwrap();
db.initialize_notification_kinds().await.unwrap();
db
});

View file

@ -120,7 +120,7 @@ impl AppState {
let mut db_options = db::ConnectOptions::new(config.database_url.clone());
db_options.max_connections(config.database_max_connections);
let mut db = Database::new(db_options, Executor::Production).await?;
db.initialize_notification_enum().await?;
db.initialize_notification_kinds().await?;
let live_kit_client = if let Some(((server, key), secret)) = config
.live_kit_server