Persist chat mentions

This commit is contained in:
Max Brunsfeld 2023-10-18 16:56:03 -07:00
parent 821419ee5b
commit d05404a4df
22 changed files with 402 additions and 112 deletions

View file

@ -103,6 +103,7 @@ impl Database {
sender_id: row.sender_id.to_proto(),
body: row.body,
timestamp: row.sent_at.assume_utc().unix_timestamp() as u64,
mentions: vec![],
nonce: Some(proto::Nonce {
upper_half: nonce.0,
lower_half: nonce.1,
@ -111,6 +112,38 @@ impl Database {
}
drop(rows);
messages.reverse();
let mut mentions = channel_message_mention::Entity::find()
.filter(
channel_message_mention::Column::MessageId.is_in(messages.iter().map(|m| m.id)),
)
.order_by_asc(channel_message_mention::Column::MessageId)
.order_by_asc(channel_message_mention::Column::StartOffset)
.stream(&*tx)
.await?;
let mut message_ix = 0;
while let Some(mention) = mentions.next().await {
let mention = mention?;
let message_id = mention.message_id.to_proto();
while let Some(message) = messages.get_mut(message_ix) {
if message.id < message_id {
message_ix += 1;
} else {
if message.id == message_id {
message.mentions.push(proto::ChatMention {
range: Some(proto::Range {
start: mention.start_offset as u64,
end: mention.end_offset as u64,
}),
user_id: mention.user_id.to_proto(),
});
}
break;
}
}
}
Ok(messages)
})
.await
@ -121,6 +154,7 @@ impl Database {
channel_id: ChannelId,
user_id: UserId,
body: &str,
mentions: &[proto::ChatMention],
timestamp: OffsetDateTime,
nonce: u128,
) -> Result<(MessageId, Vec<ConnectionId>, Vec<UserId>)> {
@ -150,7 +184,7 @@ impl Database {
let timestamp = timestamp.to_offset(time::UtcOffset::UTC);
let timestamp = time::PrimitiveDateTime::new(timestamp.date(), timestamp.time());
let message = channel_message::Entity::insert(channel_message::ActiveModel {
let message_id = channel_message::Entity::insert(channel_message::ActiveModel {
channel_id: ActiveValue::Set(channel_id),
sender_id: ActiveValue::Set(user_id),
body: ActiveValue::Set(body.to_string()),
@ -164,7 +198,31 @@ impl Database {
.to_owned(),
)
.exec(&*tx)
.await?;
.await?
.last_insert_id;
let models = mentions
.iter()
.filter_map(|mention| {
let range = mention.range.as_ref()?;
if !body.is_char_boundary(range.start as usize)
|| !body.is_char_boundary(range.end as usize)
{
return None;
}
Some(channel_message_mention::ActiveModel {
message_id: ActiveValue::Set(message_id),
start_offset: ActiveValue::Set(range.start as i32),
end_offset: ActiveValue::Set(range.end as i32),
user_id: ActiveValue::Set(UserId::from_proto(mention.user_id)),
})
})
.collect::<Vec<_>>();
if !models.is_empty() {
channel_message_mention::Entity::insert_many(models)
.exec(&*tx)
.await?;
}
#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
enum QueryConnectionId {
@ -172,22 +230,13 @@ impl Database {
}
// Observe this message for the sender
self.observe_channel_message_internal(
channel_id,
user_id,
message.last_insert_id,
&*tx,
)
.await?;
self.observe_channel_message_internal(channel_id, user_id, message_id, &*tx)
.await?;
let mut channel_members = self.get_channel_members_internal(channel_id, &*tx).await?;
channel_members.retain(|member| !participant_user_ids.contains(member));
Ok((
message.last_insert_id,
participant_connection_ids,
channel_members,
))
Ok((message_id, participant_connection_ids, channel_members))
})
.await
}