This commit is contained in:
Antonio Scandurra 2023-08-05 15:33:57 +02:00
parent 92d748134e
commit 2fe736fc6b
3 changed files with 106 additions and 81 deletions

View file

@ -148,6 +148,11 @@ impl Debug for OperationId {
}
impl OperationId {
const MAX: Self = Self {
replica_id: ReplicaId(u32::MAX),
operation_count: OperationCount(usize::MAX),
};
pub fn new(replica_id: ReplicaId) -> Self {
Self {
replica_id,

View file

@ -82,12 +82,11 @@ impl DigestSequence {
self.digests.summary().count
}
pub fn digest(&self, mut range: Range<usize>) -> (Range<OperationId>, Digest) {
pub fn digest(&self, mut range: Range<usize>) -> Digest {
range.start = cmp::min(range.start, self.digests.summary().count);
range.end = cmp::min(range.end, self.digests.summary().count);
let mut cursor = self.digests.cursor::<(usize, Digest)>();
cursor.seek(&range.start, Bias::Right, &());
let start_op_id = cursor.start().1.max_op_id;
assert_eq!(
cursor.start().0,
range.start,
@ -99,16 +98,12 @@ impl DigestSequence {
hash = hash * digest.hash;
cursor.next(&());
}
let end_op_id = cursor.start().1.max_op_id;
(
start_op_id..end_op_id,
Digest {
count: cursor.start().0 - range.start,
hash,
max_op_id: end_op_id,
},
)
Digest {
count: cursor.start().0 - range.start,
hash,
max_op_id: cursor.start().1.max_op_id,
}
}
pub fn splice(&mut self, mut range: Range<usize>, digests: impl IntoIterator<Item = Digest>) {

View file

@ -234,7 +234,7 @@ fn sync(
continue;
}
let (op_range, server_digest) = server_digests.digest(sync_range.clone());
let server_digest = server_digests.digest(sync_range.clone());
sync_range.end = cmp::max(sync_range.start + server_digest.count, sync_range.end);
let mut server_range = server_end..server_end + sync_range.len();
@ -244,30 +244,23 @@ fn sync(
synced_end = sync_range.end;
server_end += server_digest.count;
} else {
let client_start_op = operations_for_range(client, sync_range.start..)
.next()
.map(|op| op.id())
.unwrap();
let client_end_op = operations_for_range(client, sync_range.start + 1..)
.next()
.map(|op| op.id())
.unwrap();
let recurse = client_start_op < op_range.end && client_end_op > op_range.start;
let next_client_op_id = {
let mut cursor = client.cursor::<(usize, Digest)>();
cursor.seek(&sync_range.start, Bias::Right, &());
cursor.item().map_or(OperationId::MAX, |op| op.id())
};
let recurse = next_client_op_id <= server_digest.max_op_id;
while let Some(next_sync_range) = stack.last_mut() {
let max_end = cmp::max(sync_range.end, next_sync_range.end);
let mut merged_sync_range = sync_range.start..max_end;
let (merged_op_range, merged_digest) =
server_digests.digest(merged_sync_range.clone());
merged_sync_range.end = cmp::max(
merged_sync_range.start + merged_digest.count,
merged_sync_range.end,
);
let intersects =
client_start_op < merged_op_range.end && client_end_op > merged_op_range.start;
if intersects {
let merged_sync_range =
sync_range.start..cmp::max(sync_range.end, next_sync_range.end);
let merged_digest = server_digests.digest(merged_sync_range.clone());
if next_client_op_id <= merged_digest.max_op_id {
break;
} else {
sync_range.end = merged_sync_range.end;
sync_range.end = cmp::max(
merged_sync_range.start + merged_digest.count,
merged_sync_range.end,
);
server_range.end = server_end + sync_range.len();
stack.pop();
}
@ -291,7 +284,7 @@ fn sync(
);
stats.roundtrips += 1;
let server_operations = request_operations(server, server_range.clone());
debug_assert!(server_operations.len() > 0);
// debug_assert!(server_operations.len() > 0);
server_digests.splice(
sync_range.clone(),
server_operations.iter().map(|op| op.into()),
@ -417,19 +410,19 @@ mod tests {
use super::*;
use crate::{operations, OperationCount, ReplicaId};
use rand::prelude::*;
use std::env;
use std::{env, mem};
#[test]
fn test_sync() {
// assert_sync(1..=15, (1..=5).chain(7..=15));
// assert_sync(1..=10, 5..=10);
// assert_sync(1..=10, 4..=10);
// assert_sync(1..=10, 1..=5);
// assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]);
// assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]);
// assert_sync(1..=10, 5..=14);
// assert_sync(1..=80, (1..=70).chain(90..=100));
// assert_sync(1..=1910, (1..=1900).chain(1910..=2000));
assert_sync(1..=15, (1..=5).chain(7..=15));
assert_sync(1..=10, 5..=10);
assert_sync(1..=10, 4..=10);
assert_sync(1..=10, 1..=5);
assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]);
assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]);
assert_sync(1..=10, 5..=14);
assert_sync(1..=80, (1..=70).chain(90..=100));
assert_sync(1..=1910, (1..=1900).chain(1910..=2000));
assert_sync(
(1..=1500).chain(4000..=10000),
(1..=1000).chain(4000..=11000),
@ -437,22 +430,27 @@ mod tests {
}
#[gpui::test(iterations = 100)]
fn test_random(mut rng: StdRng) {
fn test_performance(mut rng: StdRng) {
let max_operations = env::var("OPERATIONS")
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(10);
let max_digest_count = 4096;
let min_operations = 4096;
let max_digest_count = env::var("MAX_DIGEST_COUNT")
.map(|i| i.parse().expect("invalid `MAX_DIGEST_COUNT` variable"))
.unwrap_or(1024);
let min_operations = env::var("MIN_OPERATIONS")
.map(|i| i.parse().expect("invalid `MIN_OPERATIONS` variable"))
.unwrap_or(4096);
let mut connected = true;
let mut client_ops = btree::Sequence::<Operation>::new();
let mut server_ops = btree::Sequence::<Operation>::new();
let mut client_edits = Vec::new();
let mut server_edits = Vec::new();
let mut ideal_server_ops = 0;
let mut ideal_client_ops = 0;
let mut next_reconnection = None;
for ix in 1..=max_operations {
if connected && rng.gen_bool(0.0005) {
dbg!(ix);
connected = false;
let mut factor = 0.0005;
@ -463,21 +461,15 @@ mod tests {
let remaining_operations = max_operations - ix;
let disconnection_period = (remaining_operations as f64 * factor) as usize;
next_reconnection = Some(ix + disconnection_period);
dbg!(disconnection_period);
log::info!("disconnecting for {} operations", disconnection_period);
}
if next_reconnection == Some(ix) {
connected = true;
next_reconnection = None;
log::debug!("===============");
// log::debug!(
// "client ops: {:?}",
// client_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
// );
// log::debug!(
// "server ops: {:?}",
// server_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
// );
log::info!("reconnecting");
client_ops.edit(mem::take(&mut client_edits), &());
server_ops.edit(mem::take(&mut server_edits), &());
let stats = sync(
&mut client_ops,
@ -485,15 +477,15 @@ mod tests {
max_digest_count,
min_operations,
);
log::debug!("roundtrips: {}", stats.roundtrips);
log::debug!(
log::info!("roundtrips: {}", stats.roundtrips);
log::info!(
"ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%",
ideal_server_ops,
stats.server_operations,
stats.server_operations - ideal_server_ops,
((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100.
);
log::debug!(
log::info!(
"ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%",
ideal_client_ops,
stats.client_operations,
@ -511,41 +503,35 @@ mod tests {
if connected {
let replica_id = ReplicaId(rng.gen_range(0..=1));
client_ops.insert_or_replace(build_operation2(replica_id, ix), &());
server_ops.insert_or_replace(build_operation2(replica_id, ix), &());
client_edits.push(btree::Edit::Insert(op_for_replica(replica_id, ix)));
server_edits.push(btree::Edit::Insert(op_for_replica(replica_id, ix)));
} else if rng.gen_bool(0.95) {
ideal_server_ops += 1;
server_ops.insert_or_replace(build_operation2(ReplicaId(0), ix), &());
server_edits.push(btree::Edit::Insert(op_for_replica(ReplicaId(0), ix)));
} else {
ideal_client_ops += 1;
client_ops.insert_or_replace(build_operation2(ReplicaId(1), ix), &());
client_edits.push(btree::Edit::Insert(op_for_replica(ReplicaId(1), ix)));
}
}
log::debug!("============");
// log::debug!(
// "client ops: {:?}",
// client_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
// );
// log::debug!(
// "server ops: {:?}",
// server_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
// );
log::info!("quiescing");
client_ops.edit(mem::take(&mut client_edits), &());
server_ops.edit(mem::take(&mut server_edits), &());
let stats = sync(
&mut client_ops,
&mut server_ops,
max_digest_count,
min_operations,
);
log::debug!("roundtrips: {}", stats.roundtrips);
log::debug!(
log::info!("roundtrips: {}", stats.roundtrips);
log::info!(
"ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%",
ideal_server_ops,
stats.server_operations,
stats.server_operations - ideal_server_ops,
((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100.
);
log::debug!(
log::info!(
"ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%",
ideal_client_ops,
stats.client_operations,
@ -558,6 +544,45 @@ mod tests {
);
}
#[gpui::test(iterations = 100)]
fn test_random(mut rng: StdRng) {
let max_operations = env::var("OPERATIONS")
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(10);
let mut client_ops = btree::Sequence::<Operation>::new();
let mut server_ops = btree::Sequence::<Operation>::new();
for ix in 1..=max_operations {
let replica_id = ReplicaId(rng.gen_range(0..=1));
if rng.gen() {
client_ops.insert_or_replace(op_for_replica(replica_id, ix), &());
}
if rng.gen() {
server_ops.insert_or_replace(op_for_replica(replica_id, ix), &());
}
}
let max_digest_count = rng.gen_range(2..=32);
let min_operations = rng.gen_range(1..100);
log::info!(
"syncing, max digest count: {}, min operations: {}",
max_digest_count,
min_operations
);
sync(
&mut client_ops,
&mut server_ops,
max_digest_count,
min_operations,
);
assert_eq!(
client_ops.iter().map(|op| op.id()).collect::<Vec<_>>(),
server_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
);
}
fn assert_sync(
client_ops: impl IntoIterator<Item = usize>,
server_ops: impl IntoIterator<Item = usize>,
@ -571,9 +596,9 @@ mod tests {
.map(build_operation)
.collect::<Vec<_>>();
for max_digests in 256..=256 {
for min_operations in 256..=256 {
log::debug!(
for max_digests in [2, 3, 4, 7, 8, 16, 32] {
for min_operations in [1, 2, 3, 4, 7, 8, 16, 32] {
log::info!(
"max digests: {}, min operations: {}",
max_digests,
min_operations
@ -611,7 +636,7 @@ mod tests {
})
}
fn build_operation2(replica_id: ReplicaId, id: usize) -> Operation {
fn op_for_replica(replica_id: ReplicaId, id: usize) -> Operation {
Operation::CreateBranch(operations::CreateBranch {
id: OperationId {
replica_id,