Start work on notification panel
This commit is contained in:
parent
50cf25ae97
commit
d1756b621f
24 changed files with 1021 additions and 241 deletions
|
@ -327,7 +327,8 @@ CREATE TABLE "notifications" (
|
|||
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
|
||||
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
"entity_id_1" INTEGER,
|
||||
"entity_id_2" INTEGER
|
||||
"entity_id_2" INTEGER,
|
||||
"entity_id_3" INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX "index_notifications_on_recipient_id" ON "notifications" ("recipient_id");
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
CREATE TABLE "notification_kinds" (
|
||||
"id" INTEGER PRIMARY KEY NOT NULL,
|
||||
"name" VARCHAR NOT NULL,
|
||||
"name" VARCHAR NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX "index_notification_kinds_on_name" ON "notification_kinds" ("name");
|
||||
|
@ -8,11 +8,12 @@ CREATE UNIQUE INDEX "index_notification_kinds_on_name" ON "notification_kinds" (
|
|||
CREATE TABLE notifications (
|
||||
"id" SERIAL PRIMARY KEY,
|
||||
"created_at" TIMESTAMP NOT NULL DEFAULT now(),
|
||||
"recipent_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
|
||||
"recipient_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
|
||||
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
|
||||
"is_read" BOOLEAN NOT NULL DEFAULT FALSE
|
||||
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
"entity_id_1" INTEGER,
|
||||
"entity_id_2" INTEGER
|
||||
"entity_id_2" INTEGER,
|
||||
"entity_id_3" INTEGER
|
||||
);
|
||||
|
||||
CREATE INDEX "index_notifications_on_recipient_id" ON "notifications" ("recipient_id");
|
||||
|
|
|
@ -124,7 +124,11 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
|
||||
pub async fn send_contact_request(&self, sender_id: UserId, receiver_id: UserId) -> Result<()> {
|
||||
pub async fn send_contact_request(
|
||||
&self,
|
||||
sender_id: UserId,
|
||||
receiver_id: UserId,
|
||||
) -> Result<proto::Notification> {
|
||||
self.transaction(|tx| async move {
|
||||
let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
|
||||
(sender_id, receiver_id, true)
|
||||
|
@ -162,7 +166,14 @@ impl Database {
|
|||
.await?;
|
||||
|
||||
if rows_affected == 1 {
|
||||
Ok(())
|
||||
self.create_notification(
|
||||
receiver_id,
|
||||
rpc::Notification::ContactRequest {
|
||||
requester_id: sender_id.to_proto(),
|
||||
},
|
||||
&*tx,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Err(anyhow!("contact already requested"))?
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use super::*;
|
||||
use rpc::{Notification, NotificationEntityKind, NotificationKind};
|
||||
use rpc::{Notification, NotificationKind};
|
||||
|
||||
impl Database {
|
||||
pub async fn ensure_notification_kinds(&self) -> Result<()> {
|
||||
|
@ -25,49 +25,16 @@ impl Database {
|
|||
) -> Result<proto::AddNotifications> {
|
||||
self.transaction(|tx| async move {
|
||||
let mut result = proto::AddNotifications::default();
|
||||
|
||||
let mut rows = notification::Entity::find()
|
||||
.filter(notification::Column::RecipientId.eq(recipient_id))
|
||||
.order_by_desc(notification::Column::Id)
|
||||
.limit(limit as u64)
|
||||
.stream(&*tx)
|
||||
.await?;
|
||||
|
||||
let mut user_ids = Vec::new();
|
||||
let mut channel_ids = Vec::new();
|
||||
let mut message_ids = Vec::new();
|
||||
while let Some(row) = rows.next().await {
|
||||
let row = row?;
|
||||
|
||||
let Some(kind) = NotificationKind::from_i32(row.kind) else {
|
||||
continue;
|
||||
};
|
||||
let Some(notification) = Notification::from_parts(
|
||||
kind,
|
||||
[
|
||||
row.entity_id_1.map(|id| id as u64),
|
||||
row.entity_id_2.map(|id| id as u64),
|
||||
row.entity_id_3.map(|id| id as u64),
|
||||
],
|
||||
) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// Gather the ids of all associated entities.
|
||||
let (_, associated_entities) = notification.to_parts();
|
||||
for entity in associated_entities {
|
||||
let Some((id, kind)) = entity else {
|
||||
break;
|
||||
};
|
||||
match kind {
|
||||
NotificationEntityKind::User => &mut user_ids,
|
||||
NotificationEntityKind::Channel => &mut channel_ids,
|
||||
NotificationEntityKind::ChannelMessage => &mut message_ids,
|
||||
}
|
||||
.push(id);
|
||||
}
|
||||
|
||||
result.notifications.push(proto::Notification {
|
||||
id: row.id.to_proto(),
|
||||
kind: row.kind as u32,
|
||||
timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
|
||||
is_read: row.is_read,
|
||||
|
@ -76,43 +43,7 @@ impl Database {
|
|||
entity_id_3: row.entity_id_3.map(|id| id as u64),
|
||||
});
|
||||
}
|
||||
|
||||
let users = user::Entity::find()
|
||||
.filter(user::Column::Id.is_in(user_ids))
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
let channels = channel::Entity::find()
|
||||
.filter(user::Column::Id.is_in(channel_ids))
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
let messages = channel_message::Entity::find()
|
||||
.filter(user::Column::Id.is_in(message_ids))
|
||||
.all(&*tx)
|
||||
.await?;
|
||||
|
||||
for user in users {
|
||||
result.users.push(proto::User {
|
||||
id: user.id.to_proto(),
|
||||
github_login: user.github_login,
|
||||
avatar_url: String::new(),
|
||||
});
|
||||
}
|
||||
for channel in channels {
|
||||
result.channels.push(proto::Channel {
|
||||
id: channel.id.to_proto(),
|
||||
name: channel.name,
|
||||
});
|
||||
}
|
||||
for message in messages {
|
||||
result.messages.push(proto::ChannelMessage {
|
||||
id: message.id.to_proto(),
|
||||
body: message.body,
|
||||
timestamp: message.sent_at.assume_utc().unix_timestamp() as u64,
|
||||
sender_id: message.sender_id.to_proto(),
|
||||
nonce: None,
|
||||
});
|
||||
}
|
||||
|
||||
result.notifications.reverse();
|
||||
Ok(result)
|
||||
})
|
||||
.await
|
||||
|
@ -123,18 +54,27 @@ impl Database {
|
|||
recipient_id: UserId,
|
||||
notification: Notification,
|
||||
tx: &DatabaseTransaction,
|
||||
) -> Result<()> {
|
||||
) -> Result<proto::Notification> {
|
||||
let (kind, associated_entities) = notification.to_parts();
|
||||
notification::ActiveModel {
|
||||
let model = notification::ActiveModel {
|
||||
recipient_id: ActiveValue::Set(recipient_id),
|
||||
kind: ActiveValue::Set(kind as i32),
|
||||
entity_id_1: ActiveValue::Set(associated_entities[0].map(|(id, _)| id as i32)),
|
||||
entity_id_2: ActiveValue::Set(associated_entities[1].map(|(id, _)| id as i32)),
|
||||
entity_id_3: ActiveValue::Set(associated_entities[2].map(|(id, _)| id as i32)),
|
||||
entity_id_1: ActiveValue::Set(associated_entities[0].map(|id| id as i32)),
|
||||
entity_id_2: ActiveValue::Set(associated_entities[1].map(|id| id as i32)),
|
||||
entity_id_3: ActiveValue::Set(associated_entities[2].map(|id| id as i32)),
|
||||
..Default::default()
|
||||
}
|
||||
.save(&*tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
|
||||
Ok(proto::Notification {
|
||||
id: model.id.as_ref().to_proto(),
|
||||
kind: *model.kind.as_ref() as u32,
|
||||
timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
|
||||
is_read: false,
|
||||
entity_id_1: model.entity_id_1.as_ref().map(|id| id as u64),
|
||||
entity_id_2: model.entity_id_2.as_ref().map(|id| id as u64),
|
||||
entity_id_3: model.entity_id_3.as_ref().map(|id| id as u64),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,7 @@ pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
|
|||
|
||||
const MESSAGE_COUNT_PER_PAGE: usize = 100;
|
||||
const MAX_MESSAGE_LEN: usize = 1024;
|
||||
const INITIAL_NOTIFICATION_COUNT: usize = 30;
|
||||
|
||||
lazy_static! {
|
||||
static ref METRIC_CONNECTIONS: IntGauge =
|
||||
|
@ -290,6 +291,8 @@ impl Server {
|
|||
let pool = self.connection_pool.clone();
|
||||
let live_kit_client = self.app_state.live_kit_client.clone();
|
||||
|
||||
self.app_state.db.ensure_notification_kinds().await?;
|
||||
|
||||
let span = info_span!("start server");
|
||||
self.executor.spawn_detached(
|
||||
async move {
|
||||
|
@ -578,15 +581,17 @@ impl Server {
|
|||
this.app_state.db.set_user_connected_once(user_id, true).await?;
|
||||
}
|
||||
|
||||
let (contacts, channels_for_user, channel_invites) = future::try_join3(
|
||||
let (contacts, channels_for_user, channel_invites, notifications) = future::try_join4(
|
||||
this.app_state.db.get_contacts(user_id),
|
||||
this.app_state.db.get_channels_for_user(user_id),
|
||||
this.app_state.db.get_channel_invites_for_user(user_id)
|
||||
this.app_state.db.get_channel_invites_for_user(user_id),
|
||||
this.app_state.db.get_notifications(user_id, INITIAL_NOTIFICATION_COUNT)
|
||||
).await?;
|
||||
|
||||
{
|
||||
let mut pool = this.connection_pool.lock();
|
||||
pool.add_connection(connection_id, user_id, user.admin);
|
||||
this.peer.send(connection_id, notifications)?;
|
||||
this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
|
||||
this.peer.send(connection_id, build_initial_channels_update(
|
||||
channels_for_user,
|
||||
|
@ -2064,7 +2069,7 @@ async fn request_contact(
|
|||
return Err(anyhow!("cannot add yourself as a contact"))?;
|
||||
}
|
||||
|
||||
session
|
||||
let notification = session
|
||||
.db()
|
||||
.await
|
||||
.send_contact_request(requester_id, responder_id)
|
||||
|
@ -2095,6 +2100,12 @@ async fn request_contact(
|
|||
.user_connection_ids(responder_id)
|
||||
{
|
||||
session.peer.send(connection_id, update.clone())?;
|
||||
session.peer.send(
|
||||
connection_id,
|
||||
proto::AddNotifications {
|
||||
notifications: vec![notification.clone()],
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
response.send(proto::Ack {})?;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue