Simplify handling of syncing versions

Currently whenever a channel changes we send a huge amount of data to
each member. This is the first step in reducing that

Co-Authored-By: Max <max@zed.dev>
Co-Authored-By: bennetbo <bennetbo@gmx.de>
This commit is contained in:
Conrad Irwin 2024-01-25 15:39:55 -07:00
parent b72c037199
commit 716221cd38
13 changed files with 209 additions and 404 deletions

View file

@ -748,18 +748,11 @@ impl Database {
.await
}
pub async fn unseen_channel_buffer_changes(
pub async fn latest_channel_buffer_changes(
&self,
user_id: UserId,
channel_ids: &[ChannelId],
tx: &DatabaseTransaction,
) -> Result<Vec<proto::UnseenChannelBufferChange>> {
#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
enum QueryIds {
ChannelId,
Id,
}
) -> Result<Vec<proto::ChannelBufferVersion>> {
let mut channel_ids_by_buffer_id = HashMap::default();
let mut rows = buffer::Entity::find()
.filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied()))
@ -771,51 +764,23 @@ impl Database {
}
drop(rows);
let mut observed_edits_by_buffer_id = HashMap::default();
let mut rows = observed_buffer_edits::Entity::find()
.filter(observed_buffer_edits::Column::UserId.eq(user_id))
.filter(
observed_buffer_edits::Column::BufferId
.is_in(channel_ids_by_buffer_id.keys().copied()),
)
.stream(&*tx)
.await?;
while let Some(row) = rows.next().await {
let row = row?;
observed_edits_by_buffer_id.insert(row.buffer_id, row);
}
drop(rows);
let latest_operations = self
.get_latest_operations_for_buffers(channel_ids_by_buffer_id.keys().copied(), &*tx)
.await?;
let mut changes = Vec::default();
for latest in latest_operations {
if let Some(observed) = observed_edits_by_buffer_id.get(&latest.buffer_id) {
if (
observed.epoch,
observed.lamport_timestamp,
observed.replica_id,
) >= (latest.epoch, latest.lamport_timestamp, latest.replica_id)
{
continue;
}
}
if let Some(channel_id) = channel_ids_by_buffer_id.get(&latest.buffer_id) {
changes.push(proto::UnseenChannelBufferChange {
channel_id: channel_id.to_proto(),
epoch: latest.epoch as u64,
Ok(latest_operations
.iter()
.flat_map(|op| {
Some(proto::ChannelBufferVersion {
channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
epoch: op.epoch as u64,
version: vec![proto::VectorClockEntry {
replica_id: latest.replica_id as u32,
timestamp: latest.lamport_timestamp as u32,
replica_id: op.replica_id as u32,
timestamp: op.lamport_timestamp as u32,
}],
});
}
}
Ok(changes)
})
})
.collect())
}
/// Returns the latest operations for the buffers with the specified IDs.

View file

@ -687,8 +687,8 @@ impl Database {
}
let channel_ids = channels.iter().map(|c| c.id).collect::<Vec<_>>();
let channel_buffer_changes = self
.unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
let latest_buffer_versions = self
.latest_channel_buffer_changes(&channel_ids, &*tx)
.await?;
let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
@ -696,7 +696,7 @@ impl Database {
Ok(ChannelsForUser {
channels,
channel_participants,
latest_buffer_versions: channel_buffer_changes,
latest_buffer_versions,
latest_channel_messages: latest_messages,
})
}

View file

@ -398,6 +398,10 @@ impl Database {
write!(&mut values, "({})", id).unwrap();
}
if values.is_empty() {
return Ok(Vec::default());
}
let sql = format!(
r#"
SELECT

View file

@ -330,8 +330,7 @@ async fn test_channel_buffers_last_operations(db: &Database) {
.transaction(|tx| {
let buffers = &buffers;
async move {
db.unseen_channel_buffer_changes(
observer_id,
db.latest_channel_buffer_changes(
&[
buffers[0].channel_id,
buffers[1].channel_id,
@ -348,12 +347,12 @@ async fn test_channel_buffers_last_operations(db: &Database) {
pretty_assertions::assert_eq!(
buffer_changes,
[
rpc::proto::UnseenChannelBufferChange {
rpc::proto::ChannelBufferVersion {
channel_id: buffers[0].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[0].version()),
},
rpc::proto::UnseenChannelBufferChange {
rpc::proto::ChannelBufferVersion {
channel_id: buffers[1].channel_id.to_proto(),
epoch: 1,
version: serialize_version(&text_buffers[1].version())
@ -362,99 +361,7 @@ async fn test_channel_buffers_last_operations(db: &Database) {
== buffer_changes[1].version.first().unwrap().replica_id)
.collect::<Vec<_>>(),
},
rpc::proto::UnseenChannelBufferChange {
channel_id: buffers[2].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[2].version()),
},
]
);
db.observe_buffer_version(
buffers[1].id,
observer_id,
1,
serialize_version(&text_buffers[1].version()).as_slice(),
)
.await
.unwrap();
let buffer_changes = db
.transaction(|tx| {
let buffers = &buffers;
async move {
db.unseen_channel_buffer_changes(
observer_id,
&[
buffers[0].channel_id,
buffers[1].channel_id,
buffers[2].channel_id,
],
&*tx,
)
.await
}
})
.await
.unwrap();
assert_eq!(
buffer_changes,
[
rpc::proto::UnseenChannelBufferChange {
channel_id: buffers[0].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[0].version()),
},
rpc::proto::UnseenChannelBufferChange {
channel_id: buffers[2].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[2].version()),
},
]
);
// Observe an earlier version of the buffer.
db.observe_buffer_version(
buffers[1].id,
observer_id,
1,
&[rpc::proto::VectorClockEntry {
replica_id: 0,
timestamp: 0,
}],
)
.await
.unwrap();
let buffer_changes = db
.transaction(|tx| {
let buffers = &buffers;
async move {
db.unseen_channel_buffer_changes(
observer_id,
&[
buffers[0].channel_id,
buffers[1].channel_id,
buffers[2].channel_id,
],
&*tx,
)
.await
}
})
.await
.unwrap();
assert_eq!(
buffer_changes,
[
rpc::proto::UnseenChannelBufferChange {
channel_id: buffers[0].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[0].version()),
},
rpc::proto::UnseenChannelBufferChange {
rpc::proto::ChannelBufferVersion {
channel_id: buffers[2].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[2].version()),

View file

@ -235,11 +235,10 @@ async fn test_unseen_channel_messages(db: &Arc<Database>) {
.await
.unwrap();
let second_message = db
let _ = db
.create_channel_message(channel_1, user, "1_2", &[], OffsetDateTime::now_utc(), 2)
.await
.unwrap()
.message_id;
.unwrap();
let third_message = db
.create_channel_message(channel_1, user, "1_3", &[], OffsetDateTime::now_utc(), 3)
@ -258,97 +257,27 @@ async fn test_unseen_channel_messages(db: &Arc<Database>) {
.message_id;
// Check that observer has new messages
let unseen_messages = db
let latest_messages = db
.transaction(|tx| async move {
db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx)
db.latest_channel_messages(&[channel_1, channel_2], &*tx)
.await
})
.await
.unwrap();
assert_eq!(
unseen_messages,
latest_messages,
[
rpc::proto::UnseenChannelMessage {
rpc::proto::ChannelMessageId {
channel_id: channel_1.to_proto(),
message_id: third_message.to_proto(),
},
rpc::proto::UnseenChannelMessage {
rpc::proto::ChannelMessageId {
channel_id: channel_2.to_proto(),
message_id: fourth_message.to_proto(),
},
]
);
// Observe the second message
db.observe_channel_message(channel_1, observer, second_message)
.await
.unwrap();
// Make sure the observer still has a new message
let unseen_messages = db
.transaction(|tx| async move {
db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx)
.await
})
.await
.unwrap();
assert_eq!(
unseen_messages,
[
rpc::proto::UnseenChannelMessage {
channel_id: channel_1.to_proto(),
message_id: third_message.to_proto(),
},
rpc::proto::UnseenChannelMessage {
channel_id: channel_2.to_proto(),
message_id: fourth_message.to_proto(),
},
]
);
// Observe the third message,
db.observe_channel_message(channel_1, observer, third_message)
.await
.unwrap();
// Make sure the observer does not have a new method
let unseen_messages = db
.transaction(|tx| async move {
db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx)
.await
})
.await
.unwrap();
assert_eq!(
unseen_messages,
[rpc::proto::UnseenChannelMessage {
channel_id: channel_2.to_proto(),
message_id: fourth_message.to_proto(),
}]
);
// Observe the second message again, should not regress our observed state
db.observe_channel_message(channel_1, observer, second_message)
.await
.unwrap();
// Make sure the observer does not have a new message
let unseen_messages = db
.transaction(|tx| async move {
db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx)
.await
})
.await
.unwrap();
assert_eq!(
unseen_messages,
[rpc::proto::UnseenChannelMessage {
channel_id: channel_2.to_proto(),
message_id: fourth_message.to_proto(),
}]
);
}
test_both_dbs!(

View file

@ -2842,27 +2842,25 @@ async fn update_channel_buffer(
let pool = &*session.connection_pool().await;
todo!()
// broadcast(
// None,
// non_collaborators
// .iter()
// .flat_map(|user_id| pool.user_connection_ids(*user_id)),
// |peer_id| {
// session.peer.send(
// peer_id.into(),
// proto::UpdateChannels {
// unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange {
// channel_id: channel_id.to_proto(),
// epoch: epoch as u64,
// version: version.clone(),
// }],
// ..Default::default()
// },
// )
// },
// );
broadcast(
None,
non_collaborators
.iter()
.flat_map(|user_id| pool.user_connection_ids(*user_id)),
|peer_id| {
session.peer.send(
peer_id.into(),
proto::UpdateChannels {
latest_channel_buffer_versions: vec![proto::ChannelBufferVersion {
channel_id: channel_id.to_proto(),
epoch: epoch as u64,
version: version.clone(),
}],
..Default::default()
},
)
},
);
Ok(())
}
@ -3039,7 +3037,7 @@ async fn send_channel_message(
session.peer.send(
peer_id.into(),
proto::UpdateChannels {
unseen_channel_messages: vec![proto::UnseenChannelMessage {
latest_channel_message_ids: vec![proto::ChannelMessageId {
channel_id: channel_id.to_proto(),
message_id: message_id.to_proto(),
}],

View file

@ -637,7 +637,6 @@ async fn test_channel_buffer_changes(
.channel_store()
.read(cx)
.has_channel_buffer_changed(channel_id)
.unwrap()
});
assert!(has_buffer_changed);
@ -655,7 +654,6 @@ async fn test_channel_buffer_changes(
.channel_store()
.read(cx)
.has_channel_buffer_changed(channel_id)
.unwrap()
});
assert!(!has_buffer_changed);
@ -672,7 +670,6 @@ async fn test_channel_buffer_changes(
.channel_store()
.read(cx)
.has_channel_buffer_changed(channel_id)
.unwrap()
});
assert!(!has_buffer_changed);
@ -687,7 +684,6 @@ async fn test_channel_buffer_changes(
.channel_store()
.read(cx)
.has_channel_buffer_changed(channel_id)
.unwrap()
});
assert!(!has_buffer_changed);
@ -714,7 +710,6 @@ async fn test_channel_buffer_changes(
.channel_store()
.read(cx)
.has_channel_buffer_changed(channel_id)
.unwrap()
});
assert!(has_buffer_changed);
}

View file

@ -313,7 +313,6 @@ async fn test_channel_message_changes(
.channel_store()
.read(cx)
.has_new_messages(channel_id)
.unwrap()
});
assert!(b_has_messages);
@ -341,7 +340,6 @@ async fn test_channel_message_changes(
.channel_store()
.read(cx)
.has_new_messages(channel_id)
.unwrap()
});
assert!(!b_has_messages);
@ -359,7 +357,6 @@ async fn test_channel_message_changes(
.channel_store()
.read(cx)
.has_new_messages(channel_id)
.unwrap()
});
assert!(!b_has_messages);
@ -382,7 +379,6 @@ async fn test_channel_message_changes(
.channel_store()
.read(cx)
.has_new_messages(channel_id)
.unwrap()
});
assert!(b_has_messages);
@ -402,7 +398,6 @@ async fn test_channel_message_changes(
.channel_store()
.read(cx)
.has_new_messages(channel_id)
.unwrap()
});
assert!(b_has_messages);