Display invite response buttons inline in notification panel

This commit is contained in:
Max Brunsfeld 2023-10-17 09:12:55 -07:00
parent c66385f0f9
commit f225039d36
19 changed files with 421 additions and 169 deletions

View file

@ -322,12 +322,13 @@ CREATE UNIQUE INDEX "index_notification_kinds_on_name" ON "notification_kinds" (
CREATE TABLE "notifications" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"created_at" TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
"recipient_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"actor_id" INTEGER REFERENCES users (id) ON DELETE CASCADE,
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
"content" TEXT
"content" TEXT,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"response" BOOLEAN
);
CREATE INDEX "index_notifications_on_recipient_id_is_read" ON "notifications" ("recipient_id", "is_read");

View file

@ -7,12 +7,13 @@ CREATE UNIQUE INDEX "index_notification_kinds_on_name" ON "notification_kinds" (
CREATE TABLE notifications (
"id" SERIAL PRIMARY KEY,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"created_at" TIMESTAMP NOT NULL DEFAULT now(),
"recipient_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
"actor_id" INTEGER REFERENCES users (id) ON DELETE CASCADE,
"kind" INTEGER NOT NULL REFERENCES notification_kinds (id),
"content" TEXT
"content" TEXT,
"is_read" BOOLEAN NOT NULL DEFAULT FALSE,
"response" BOOLEAN
);
CREATE INDEX "index_notifications_on_recipient_id" ON "notifications" ("recipient_id");

View file

@ -384,6 +384,8 @@ impl Contact {
}
}
pub type NotificationBatch = Vec<(UserId, proto::Notification)>;
#[derive(Clone, Debug, PartialEq, Eq, FromQueryResult, Serialize, Deserialize)]
pub struct Invite {
pub email_address: String,

View file

@ -161,7 +161,7 @@ impl Database {
invitee_id: UserId,
inviter_id: UserId,
is_admin: bool,
) -> Result<Option<proto::Notification>> {
) -> Result<NotificationBatch> {
self.transaction(move |tx| async move {
self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
.await?;
@ -176,16 +176,18 @@ impl Database {
.insert(&*tx)
.await?;
self.create_notification(
invitee_id,
rpc::Notification::ChannelInvitation {
actor_id: inviter_id.to_proto(),
channel_id: channel_id.to_proto(),
},
true,
&*tx,
)
.await
Ok(self
.create_notification(
invitee_id,
rpc::Notification::ChannelInvitation {
channel_id: channel_id.to_proto(),
},
true,
&*tx,
)
.await?
.into_iter()
.collect())
})
.await
}
@ -228,7 +230,7 @@ impl Database {
channel_id: ChannelId,
user_id: UserId,
accept: bool,
) -> Result<()> {
) -> Result<NotificationBatch> {
self.transaction(move |tx| async move {
let rows_affected = if accept {
channel_member::Entity::update_many()
@ -246,21 +248,34 @@ impl Database {
.await?
.rows_affected
} else {
channel_member::ActiveModel {
channel_id: ActiveValue::Unchanged(channel_id),
user_id: ActiveValue::Unchanged(user_id),
..Default::default()
}
.delete(&*tx)
.await?
.rows_affected
channel_member::Entity::delete_many()
.filter(
channel_member::Column::ChannelId
.eq(channel_id)
.and(channel_member::Column::UserId.eq(user_id))
.and(channel_member::Column::Accepted.eq(false)),
)
.exec(&*tx)
.await?
.rows_affected
};
if rows_affected == 0 {
Err(anyhow!("no such invitation"))?;
}
Ok(())
Ok(self
.respond_to_notification(
user_id,
&rpc::Notification::ChannelInvitation {
channel_id: channel_id.to_proto(),
},
accept,
&*tx,
)
.await?
.into_iter()
.collect())
})
.await
}

View file

@ -123,7 +123,7 @@ impl Database {
&self,
sender_id: UserId,
receiver_id: UserId,
) -> Result<Option<proto::Notification>> {
) -> Result<NotificationBatch> {
self.transaction(|tx| async move {
let (id_a, id_b, a_to_b) = if sender_id < receiver_id {
(sender_id, receiver_id, true)
@ -164,15 +164,18 @@ impl Database {
Err(anyhow!("contact already requested"))?;
}
self.create_notification(
receiver_id,
rpc::Notification::ContactRequest {
actor_id: sender_id.to_proto(),
},
true,
&*tx,
)
.await
Ok(self
.create_notification(
receiver_id,
rpc::Notification::ContactRequest {
actor_id: sender_id.to_proto(),
},
true,
&*tx,
)
.await?
.into_iter()
.collect())
})
.await
}
@ -274,7 +277,7 @@ impl Database {
responder_id: UserId,
requester_id: UserId,
accept: bool,
) -> Result<Option<proto::Notification>> {
) -> Result<NotificationBatch> {
self.transaction(|tx| async move {
let (id_a, id_b, a_to_b) = if responder_id < requester_id {
(responder_id, requester_id, false)
@ -316,15 +319,34 @@ impl Database {
Err(anyhow!("no such contact request"))?
}
self.create_notification(
requester_id,
rpc::Notification::ContactRequestAccepted {
actor_id: responder_id.to_proto(),
},
true,
&*tx,
)
.await
let mut notifications = Vec::new();
notifications.extend(
self.respond_to_notification(
responder_id,
&rpc::Notification::ContactRequest {
actor_id: requester_id.to_proto(),
},
accept,
&*tx,
)
.await?,
);
if accept {
notifications.extend(
self.create_notification(
requester_id,
rpc::Notification::ContactRequestAccepted {
actor_id: responder_id.to_proto(),
},
true,
&*tx,
)
.await?,
);
}
Ok(notifications)
})
.await
}

View file

@ -52,7 +52,7 @@ impl Database {
while let Some(row) = rows.next().await {
let row = row?;
let kind = row.kind;
if let Some(proto) = self.model_to_proto(row) {
if let Some(proto) = model_to_proto(self, row) {
result.push(proto);
} else {
log::warn!("unknown notification kind {:?}", kind);
@ -70,7 +70,7 @@ impl Database {
notification: Notification,
avoid_duplicates: bool,
tx: &DatabaseTransaction,
) -> Result<Option<proto::Notification>> {
) -> Result<Option<(UserId, proto::Notification)>> {
if avoid_duplicates {
if self
.find_notification(recipient_id, &notification, tx)
@ -94,20 +94,25 @@ impl Database {
content: ActiveValue::Set(notification_proto.content.clone()),
actor_id: ActiveValue::Set(actor_id),
is_read: ActiveValue::NotSet,
response: ActiveValue::NotSet,
created_at: ActiveValue::NotSet,
id: ActiveValue::NotSet,
}
.save(&*tx)
.await?;
Ok(Some(proto::Notification {
id: model.id.as_ref().to_proto(),
kind: notification_proto.kind,
timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
is_read: false,
content: notification_proto.content,
actor_id: notification_proto.actor_id,
}))
Ok(Some((
recipient_id,
proto::Notification {
id: model.id.as_ref().to_proto(),
kind: notification_proto.kind,
timestamp: model.created_at.as_ref().assume_utc().unix_timestamp() as u64,
is_read: false,
response: None,
content: notification_proto.content,
actor_id: notification_proto.actor_id,
},
)))
}
pub async fn remove_notification(
@ -125,6 +130,32 @@ impl Database {
Ok(id)
}
pub async fn respond_to_notification(
&self,
recipient_id: UserId,
notification: &Notification,
response: bool,
tx: &DatabaseTransaction,
) -> Result<Option<(UserId, proto::Notification)>> {
if let Some(id) = self
.find_notification(recipient_id, notification, tx)
.await?
{
let row = notification::Entity::update(notification::ActiveModel {
id: ActiveValue::Unchanged(id),
recipient_id: ActiveValue::Unchanged(recipient_id),
response: ActiveValue::Set(Some(response)),
is_read: ActiveValue::Set(true),
..Default::default()
})
.exec(tx)
.await?;
Ok(model_to_proto(self, row).map(|notification| (recipient_id, notification)))
} else {
Ok(None)
}
}
pub async fn find_notification(
&self,
recipient_id: UserId,
@ -142,7 +173,11 @@ impl Database {
.add(notification::Column::RecipientId.eq(recipient_id))
.add(notification::Column::IsRead.eq(false))
.add(notification::Column::Kind.eq(kind))
.add(notification::Column::ActorId.eq(proto.actor_id)),
.add(if proto.actor_id.is_some() {
notification::Column::ActorId.eq(proto.actor_id)
} else {
notification::Column::ActorId.is_null()
}),
)
.stream(&*tx)
.await?;
@ -152,7 +187,7 @@ impl Database {
while let Some(row) = rows.next().await {
let row = row?;
let id = row.id;
if let Some(proto) = self.model_to_proto(row) {
if let Some(proto) = model_to_proto(self, row) {
if let Some(existing) = Notification::from_proto(&proto) {
if existing == *notification {
return Ok(Some(id));
@ -163,16 +198,17 @@ impl Database {
Ok(None)
}
fn model_to_proto(&self, row: notification::Model) -> Option<proto::Notification> {
let kind = self.notification_kinds_by_id.get(&row.kind)?;
Some(proto::Notification {
id: row.id.to_proto(),
kind: kind.to_string(),
timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
is_read: row.is_read,
content: row.content,
actor_id: row.actor_id.map(|id| id.to_proto()),
})
}
}
fn model_to_proto(this: &Database, row: notification::Model) -> Option<proto::Notification> {
let kind = this.notification_kinds_by_id.get(&row.kind)?;
Some(proto::Notification {
id: row.id.to_proto(),
kind: kind.to_string(),
timestamp: row.created_at.assume_utc().unix_timestamp() as u64,
is_read: row.is_read,
response: row.response,
content: row.content,
actor_id: row.actor_id.map(|id| id.to_proto()),
})
}

View file

@ -7,12 +7,13 @@ use time::PrimitiveDateTime;
pub struct Model {
#[sea_orm(primary_key)]
pub id: NotificationId,
pub is_read: bool,
pub created_at: PrimitiveDateTime,
pub recipient_id: UserId,
pub actor_id: Option<UserId>,
pub kind: NotificationKindId,
pub content: String,
pub is_read: bool,
pub response: Option<bool>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View file

@ -2067,7 +2067,7 @@ async fn request_contact(
return Err(anyhow!("cannot add yourself as a contact"))?;
}
let notification = session
let notifications = session
.db()
.await
.send_contact_request(requester_id, responder_id)
@ -2091,22 +2091,13 @@ async fn request_contact(
.push(proto::IncomingContactRequest {
requester_id: requester_id.to_proto(),
});
for connection_id in session
.connection_pool()
.await
.user_connection_ids(responder_id)
{
let connection_pool = session.connection_pool().await;
for connection_id in connection_pool.user_connection_ids(responder_id) {
session.peer.send(connection_id, update.clone())?;
if let Some(notification) = &notification {
session.peer.send(
connection_id,
proto::NewNotification {
notification: Some(notification.clone()),
},
)?;
}
}
send_notifications(&*connection_pool, &session.peer, notifications);
response.send(proto::Ack {})?;
Ok(())
}
@ -2125,7 +2116,7 @@ async fn respond_to_contact_request(
} else {
let accept = request.response == proto::ContactRequestResponse::Accept as i32;
let notification = db
let notifications = db
.respond_to_contact_request(responder_id, requester_id, accept)
.await?;
let requester_busy = db.is_user_busy(requester_id).await?;
@ -2156,17 +2147,12 @@ async fn respond_to_contact_request(
update
.remove_outgoing_requests
.push(responder_id.to_proto());
for connection_id in pool.user_connection_ids(requester_id) {
session.peer.send(connection_id, update.clone())?;
if let Some(notification) = &notification {
session.peer.send(
connection_id,
proto::NewNotification {
notification: Some(notification.clone()),
},
)?;
}
}
send_notifications(&*pool, &session.peer, notifications);
}
response.send(proto::Ack {})?;
@ -2310,7 +2296,7 @@ async fn invite_channel_member(
let db = session.db().await;
let channel_id = ChannelId::from_proto(request.channel_id);
let invitee_id = UserId::from_proto(request.user_id);
let notification = db
let notifications = db
.invite_channel_member(channel_id, invitee_id, session.user_id, request.admin)
.await?;
@ -2325,22 +2311,13 @@ async fn invite_channel_member(
name: channel.name,
});
for connection_id in session
.connection_pool()
.await
.user_connection_ids(invitee_id)
{
let pool = session.connection_pool().await;
for connection_id in pool.user_connection_ids(invitee_id) {
session.peer.send(connection_id, update.clone())?;
if let Some(notification) = &notification {
session.peer.send(
connection_id,
proto::NewNotification {
notification: Some(notification.clone()),
},
)?;
}
}
send_notifications(&*pool, &session.peer, notifications);
response.send(proto::Ack {})?;
Ok(())
}
@ -2588,7 +2565,8 @@ async fn respond_to_channel_invite(
) -> Result<()> {
let db = session.db().await;
let channel_id = ChannelId::from_proto(request.channel_id);
db.respond_to_channel_invite(channel_id, session.user_id, request.accept)
let notifications = db
.respond_to_channel_invite(channel_id, session.user_id, request.accept)
.await?;
let mut update = proto::UpdateChannels::default();
@ -2636,6 +2614,11 @@ async fn respond_to_channel_invite(
);
}
session.peer.send(session.connection_id, update)?;
send_notifications(
&*session.connection_pool().await,
&session.peer,
notifications,
);
response.send(proto::Ack {})?;
Ok(())
@ -2853,6 +2836,29 @@ fn channel_buffer_updated<T: EnvelopedMessage>(
});
}
fn send_notifications(
connection_pool: &ConnectionPool,
peer: &Peer,
notifications: db::NotificationBatch,
) {
for (user_id, notification) in notifications {
for connection_id in connection_pool.user_connection_ids(user_id) {
if let Err(error) = peer.send(
connection_id,
proto::NewNotification {
notification: Some(notification.clone()),
},
) {
tracing::error!(
"failed to send notification to {:?} {}",
connection_id,
error
);
}
}
}
}
async fn send_channel_message(
request: proto::SendChannelMessage,
response: Response<proto::SendChannelMessage>,

View file

@ -117,8 +117,8 @@ async fn test_core_channels(
// Client B accepts the invitation.
client_b
.channel_store()
.update(cx_b, |channels, _| {
channels.respond_to_channel_invite(channel_a_id, true)
.update(cx_b, |channels, cx| {
channels.respond_to_channel_invite(channel_a_id, true, cx)
})
.await
.unwrap();
@ -856,8 +856,8 @@ async fn test_lost_channel_creation(
// Client B accepts the invite
client_b
.channel_store()
.update(cx_b, |channel_store, _| {
channel_store.respond_to_channel_invite(channel_id, true)
.update(cx_b, |channel_store, cx| {
channel_store.respond_to_channel_invite(channel_id, true, cx)
})
.await
.unwrap();

View file

@ -339,8 +339,8 @@ impl TestServer {
member_cx
.read(ChannelStore::global)
.update(*member_cx, |channels, _| {
channels.respond_to_channel_invite(channel_id, true)
.update(*member_cx, |channels, cx| {
channels.respond_to_channel_invite(channel_id, true, cx)
})
.await
.unwrap();
@ -626,8 +626,8 @@ impl TestClient {
other_cx
.read(ChannelStore::global)
.update(other_cx, |channel_store, _| {
channel_store.respond_to_channel_invite(channel, true)
.update(other_cx, |channel_store, cx| {
channel_store.respond_to_channel_invite(channel, true, cx)
})
.await
.unwrap();