Channel notifications from the server works

This commit is contained in:
Mikayla 2023-09-30 22:30:36 -07:00
parent 1469c02998
commit 9ba975d6ad
No known key found for this signature in database
16 changed files with 266 additions and 107 deletions

View file

@ -80,20 +80,20 @@ impl Database {
// Save the last observed operation
if let Some(max_operation) = max_operation {
observed_note_edits::Entity::insert(observed_note_edits::ActiveModel {
observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
user_id: ActiveValue::Set(user_id),
channel_id: ActiveValue::Set(channel_id),
buffer_id: ActiveValue::Set(buffer.id),
epoch: ActiveValue::Set(max_operation.0),
lamport_timestamp: ActiveValue::Set(max_operation.1),
})
.on_conflict(
OnConflict::columns([
observed_note_edits::Column::UserId,
observed_note_edits::Column::ChannelId,
observed_buffer_edits::Column::UserId,
observed_buffer_edits::Column::BufferId,
])
.update_columns([
observed_note_edits::Column::Epoch,
observed_note_edits::Column::LamportTimestamp,
observed_buffer_edits::Column::Epoch,
observed_buffer_edits::Column::LamportTimestamp,
])
.to_owned(),
)
@ -110,20 +110,20 @@ impl Database {
.map(|model| (model.epoch, model.lamport_timestamp));
if let Some(buffer_max) = buffer_max {
observed_note_edits::Entity::insert(observed_note_edits::ActiveModel {
observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
user_id: ActiveValue::Set(user_id),
channel_id: ActiveValue::Set(channel_id),
buffer_id: ActiveValue::Set(buffer.id),
epoch: ActiveValue::Set(buffer_max.0),
lamport_timestamp: ActiveValue::Set(buffer_max.1),
})
.on_conflict(
OnConflict::columns([
observed_note_edits::Column::UserId,
observed_note_edits::Column::ChannelId,
observed_buffer_edits::Column::UserId,
observed_buffer_edits::Column::BufferId,
])
.update_columns([
observed_note_edits::Column::Epoch,
observed_note_edits::Column::LamportTimestamp,
observed_buffer_edits::Column::Epoch,
observed_buffer_edits::Column::LamportTimestamp,
])
.to_owned(),
)
@ -463,7 +463,7 @@ impl Database {
channel_id: ChannelId,
user: UserId,
operations: &[proto::Operation],
) -> Result<Vec<ConnectionId>> {
) -> Result<(Vec<ConnectionId>, Vec<UserId>)> {
self.transaction(move |tx| async move {
self.check_user_is_channel_member(channel_id, user, &*tx)
.await?;
@ -483,10 +483,23 @@ impl Database {
.filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
.collect::<Vec<_>>();
let mut channel_members;
if !operations.is_empty() {
// get current channel participants and save the max operation above
self.save_max_operation_for_collaborators(operations.as_slice(), channel_id, &*tx)
self.save_max_operation_for_collaborators(
operations.as_slice(),
channel_id,
buffer.id,
&*tx,
)
.await?;
channel_members = self.get_channel_members_internal(channel_id, &*tx).await?;
let collaborators = self
.get_channel_buffer_collaborators_internal(channel_id, &*tx)
.await?;
channel_members.retain(|member| !collaborators.contains(member));
buffer_operation::Entity::insert_many(operations)
.on_conflict(
@ -501,6 +514,8 @@ impl Database {
)
.exec(&*tx)
.await?;
} else {
channel_members = Vec::new();
}
let mut connections = Vec::new();
@ -519,7 +534,7 @@ impl Database {
});
}
Ok(connections)
Ok((connections, channel_members))
})
.await
}
@ -528,6 +543,7 @@ impl Database {
&self,
operations: &[buffer_operation::ActiveModel],
channel_id: ChannelId,
buffer_id: BufferId,
tx: &DatabaseTransaction,
) -> Result<()> {
let max_operation = operations
@ -553,22 +569,22 @@ impl Database {
.get_channel_buffer_collaborators_internal(channel_id, tx)
.await?;
observed_note_edits::Entity::insert_many(users.iter().map(|id| {
observed_note_edits::ActiveModel {
observed_buffer_edits::Entity::insert_many(users.iter().map(|id| {
observed_buffer_edits::ActiveModel {
user_id: ActiveValue::Set(*id),
channel_id: ActiveValue::Set(channel_id),
buffer_id: ActiveValue::Set(buffer_id),
epoch: max_operation.0.clone(),
lamport_timestamp: ActiveValue::Set(*max_operation.1.as_ref()),
}
}))
.on_conflict(
OnConflict::columns([
observed_note_edits::Column::UserId,
observed_note_edits::Column::ChannelId,
observed_buffer_edits::Column::UserId,
observed_buffer_edits::Column::BufferId,
])
.update_columns([
observed_note_edits::Column::Epoch,
observed_note_edits::Column::LamportTimestamp,
observed_buffer_edits::Column::Epoch,
observed_buffer_edits::Column::LamportTimestamp,
])
.to_owned(),
)
@ -699,54 +715,75 @@ impl Database {
Ok(())
}
pub async fn has_buffer_changed(&self, user_id: UserId, channel_id: ChannelId) -> Result<bool> {
self.transaction(|tx| async move {
let user_max = observed_note_edits::Entity::find()
.filter(observed_note_edits::Column::UserId.eq(user_id))
.filter(observed_note_edits::Column::ChannelId.eq(channel_id))
.one(&*tx)
.await?
.map(|model| (model.epoch, model.lamport_timestamp));
#[cfg(test)]
pub async fn test_has_note_changed(
&self,
user_id: UserId,
channel_id: ChannelId,
) -> Result<bool> {
self.transaction(|tx| async move { self.has_note_changed(user_id, channel_id, &*tx).await })
.await
}
let channel_buffer = channel::Model {
id: channel_id,
..Default::default()
}
.find_related(buffer::Entity)
pub async fn has_note_changed(
&self,
user_id: UserId,
channel_id: ChannelId,
tx: &DatabaseTransaction,
) -> Result<bool> {
let Some(buffer_id) = channel::Model {
id: channel_id,
..Default::default()
}
.find_related(buffer::Entity)
.one(&*tx)
.await?
.map(|buffer| buffer.id) else {
return Ok(false);
};
let user_max = observed_buffer_edits::Entity::find()
.filter(observed_buffer_edits::Column::UserId.eq(user_id))
.filter(observed_buffer_edits::Column::BufferId.eq(buffer_id))
.one(&*tx)
.await?;
.await?
.map(|model| (model.epoch, model.lamport_timestamp));
let Some(channel_buffer) = channel_buffer else {
return Ok(false);
};
let channel_buffer = channel::Model {
id: channel_id,
..Default::default()
}
.find_related(buffer::Entity)
.one(&*tx)
.await?;
let mut channel_max = buffer_operation::Entity::find()
let Some(channel_buffer) = channel_buffer else {
return Ok(false);
};
let mut channel_max = buffer_operation::Entity::find()
.filter(buffer_operation::Column::BufferId.eq(channel_buffer.id))
.filter(buffer_operation::Column::Epoch.eq(channel_buffer.epoch))
.order_by(buffer_operation::Column::Epoch, Desc)
.order_by(buffer_operation::Column::LamportTimestamp, Desc)
.one(&*tx)
.await?
.map(|model| (model.epoch, model.lamport_timestamp));
// If there are no edits in this epoch
if channel_max.is_none() {
// check if this user observed the last edit of the previous epoch
channel_max = buffer_operation::Entity::find()
.filter(buffer_operation::Column::BufferId.eq(channel_buffer.id))
.filter(buffer_operation::Column::Epoch.eq(channel_buffer.epoch))
.filter(buffer_operation::Column::Epoch.eq(channel_buffer.epoch.saturating_sub(1)))
.order_by(buffer_operation::Column::Epoch, Desc)
.order_by(buffer_operation::Column::LamportTimestamp, Desc)
.one(&*tx)
.await?
.map(|model| (model.epoch, model.lamport_timestamp));
}
// If there are no edits in this epoch
if channel_max.is_none() {
// check if this user observed the last edit of the previous epoch
channel_max = buffer_operation::Entity::find()
.filter(buffer_operation::Column::BufferId.eq(channel_buffer.id))
.filter(
buffer_operation::Column::Epoch.eq(channel_buffer.epoch.saturating_sub(1)),
)
.order_by(buffer_operation::Column::Epoch, Desc)
.order_by(buffer_operation::Column::LamportTimestamp, Desc)
.one(&*tx)
.await?
.map(|model| (model.epoch, model.lamport_timestamp));
}
Ok(user_max != channel_max)
})
.await
Ok(user_max != channel_max)
}
}

View file

@ -391,7 +391,8 @@ impl Database {
.all(&*tx)
.await?;
self.get_user_channels(channel_memberships, &tx).await
self.get_user_channels(user_id, channel_memberships, &tx)
.await
})
.await
}
@ -414,13 +415,15 @@ impl Database {
.all(&*tx)
.await?;
self.get_user_channels(channel_membership, &tx).await
self.get_user_channels(user_id, channel_membership, &tx)
.await
})
.await
}
pub async fn get_user_channels(
&self,
user_id: UserId,
channel_memberships: Vec<channel_member::Model>,
tx: &DatabaseTransaction,
) -> Result<ChannelsForUser> {
@ -460,10 +463,18 @@ impl Database {
}
}
let mut channels_with_changed_notes = HashSet::default();
for channel in graph.channels.iter() {
if self.has_note_changed(user_id, channel.id, tx).await? {
channels_with_changed_notes.insert(channel.id);
}
}
Ok(ChannelsForUser {
channels: graph,
channel_participants,
channels_with_admin_privileges,
channels_with_changed_notes,
})
}