Avoid N+1 query for channels with notes changes
Also, start work on new timing for recording observed notes edits. Co-authored-by: Mikayla <mikayla@zed.dev>
This commit is contained in:
parent
84c4db13fb
commit
d9d997b218
7 changed files with 381 additions and 97 deletions
|
@ -272,3 +272,193 @@ async fn test_channel_buffers_diffs(db: &Database) {
|
|||
assert!(!db.test_has_note_changed(a_id, zed_id).await.unwrap());
|
||||
assert!(!db.test_has_note_changed(b_id, zed_id).await.unwrap());
|
||||
}
|
||||
|
||||
test_both_dbs!(
|
||||
test_channel_buffers_last_operations,
|
||||
test_channel_buffers_last_operations_postgres,
|
||||
test_channel_buffers_last_operations_sqlite
|
||||
);
|
||||
|
||||
async fn test_channel_buffers_last_operations(db: &Database) {
|
||||
let user_id = db
|
||||
.create_user(
|
||||
"user_a@example.com",
|
||||
false,
|
||||
NewUserParams {
|
||||
github_login: "user_a".into(),
|
||||
github_user_id: 101,
|
||||
invite_count: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.user_id;
|
||||
let owner_id = db.create_server("production").await.unwrap().0 as u32;
|
||||
let connection_id = ConnectionId {
|
||||
owner_id,
|
||||
id: user_id.0 as u32,
|
||||
};
|
||||
|
||||
let mut buffers = Vec::new();
|
||||
let mut text_buffers = Vec::new();
|
||||
for i in 0..3 {
|
||||
let channel = db
|
||||
.create_root_channel(&format!("channel-{i}"), &format!("room-{i}"), user_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
db.join_channel_buffer(channel, user_id, connection_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
buffers.push(
|
||||
db.transaction(|tx| async move { db.get_channel_buffer(channel, &*tx).await })
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
text_buffers.push(Buffer::new(0, 0, "".to_string()));
|
||||
}
|
||||
|
||||
let operations = db
|
||||
.transaction(|tx| {
|
||||
let buffers = &buffers;
|
||||
async move {
|
||||
db.get_last_operations_for_buffers([buffers[0].id, buffers[2].id], &*tx)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(operations.is_empty());
|
||||
|
||||
update_buffer(
|
||||
buffers[0].channel_id,
|
||||
user_id,
|
||||
db,
|
||||
vec![
|
||||
text_buffers[0].edit([(0..0, "a")]),
|
||||
text_buffers[0].edit([(0..0, "b")]),
|
||||
text_buffers[0].edit([(0..0, "c")]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
update_buffer(
|
||||
buffers[1].channel_id,
|
||||
user_id,
|
||||
db,
|
||||
vec![
|
||||
text_buffers[1].edit([(0..0, "d")]),
|
||||
text_buffers[1].edit([(1..1, "e")]),
|
||||
text_buffers[1].edit([(2..2, "f")]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
// cause buffer 1's epoch to increment.
|
||||
db.leave_channel_buffer(buffers[1].channel_id, connection_id)
|
||||
.await
|
||||
.unwrap();
|
||||
db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
|
||||
.await
|
||||
.unwrap();
|
||||
text_buffers[1] = Buffer::new(1, 0, "def".to_string());
|
||||
update_buffer(
|
||||
buffers[1].channel_id,
|
||||
user_id,
|
||||
db,
|
||||
vec![
|
||||
text_buffers[1].edit([(0..0, "g")]),
|
||||
text_buffers[1].edit([(0..0, "h")]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
update_buffer(
|
||||
buffers[2].channel_id,
|
||||
user_id,
|
||||
db,
|
||||
vec![text_buffers[2].edit([(0..0, "i")])],
|
||||
)
|
||||
.await;
|
||||
|
||||
let operations = db
|
||||
.transaction(|tx| {
|
||||
let buffers = &buffers;
|
||||
async move {
|
||||
db.get_last_operations_for_buffers([buffers[1].id, buffers[2].id], &*tx)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_operations(
|
||||
&operations,
|
||||
&[
|
||||
(buffers[1].id, 1, &text_buffers[1]),
|
||||
(buffers[2].id, 0, &text_buffers[2]),
|
||||
],
|
||||
);
|
||||
|
||||
let operations = db
|
||||
.transaction(|tx| {
|
||||
let buffers = &buffers;
|
||||
async move {
|
||||
db.get_last_operations_for_buffers([buffers[0].id, buffers[1].id], &*tx)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert_operations(
|
||||
&operations,
|
||||
&[
|
||||
(buffers[0].id, 0, &text_buffers[0]),
|
||||
(buffers[1].id, 1, &text_buffers[1]),
|
||||
],
|
||||
);
|
||||
|
||||
async fn update_buffer(
|
||||
channel_id: ChannelId,
|
||||
user_id: UserId,
|
||||
db: &Database,
|
||||
operations: Vec<text::Operation>,
|
||||
) {
|
||||
let operations = operations
|
||||
.into_iter()
|
||||
.map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
|
||||
.collect::<Vec<_>>();
|
||||
db.update_channel_buffer(channel_id, user_id, &operations)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn assert_operations(
|
||||
operations: &[buffer_operation::Model],
|
||||
expected: &[(BufferId, i32, &text::Buffer)],
|
||||
) {
|
||||
let actual = operations
|
||||
.iter()
|
||||
.map(|op| buffer_operation::Model {
|
||||
buffer_id: op.buffer_id,
|
||||
epoch: op.epoch,
|
||||
lamport_timestamp: op.lamport_timestamp,
|
||||
replica_id: op.replica_id,
|
||||
value: vec![],
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let expected = expected
|
||||
.iter()
|
||||
.map(|(buffer_id, epoch, buffer)| buffer_operation::Model {
|
||||
buffer_id: *buffer_id,
|
||||
epoch: *epoch,
|
||||
lamport_timestamp: buffer.lamport_clock.value as i32 - 1,
|
||||
replica_id: buffer.replica_id() as i32,
|
||||
value: vec![],
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(actual, expected, "unexpected operations")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue