diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index 4b3510627e..f321210add 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -148,6 +148,11 @@ impl Debug for OperationId { } impl OperationId { + const MAX: Self = Self { + replica_id: ReplicaId(u32::MAX), + operation_count: OperationCount(usize::MAX), + }; + pub fn new(replica_id: ReplicaId) -> Self { Self { replica_id, diff --git a/crates/crdb/src/digest.rs b/crates/crdb/src/digest.rs index 15dea4f42f..a5dac72424 100644 --- a/crates/crdb/src/digest.rs +++ b/crates/crdb/src/digest.rs @@ -82,12 +82,11 @@ impl DigestSequence { self.digests.summary().count } - pub fn digest(&self, mut range: Range) -> (Range, Digest) { + pub fn digest(&self, mut range: Range) -> Digest { range.start = cmp::min(range.start, self.digests.summary().count); range.end = cmp::min(range.end, self.digests.summary().count); let mut cursor = self.digests.cursor::<(usize, Digest)>(); cursor.seek(&range.start, Bias::Right, &()); - let start_op_id = cursor.start().1.max_op_id; assert_eq!( cursor.start().0, range.start, @@ -99,16 +98,12 @@ impl DigestSequence { hash = hash * digest.hash; cursor.next(&()); } - let end_op_id = cursor.start().1.max_op_id; - ( - start_op_id..end_op_id, - Digest { - count: cursor.start().0 - range.start, - hash, - max_op_id: end_op_id, - }, - ) + Digest { + count: cursor.start().0 - range.start, + hash, + max_op_id: cursor.start().1.max_op_id, + } } pub fn splice(&mut self, mut range: Range, digests: impl IntoIterator) { diff --git a/crates/crdb/src/sync.rs b/crates/crdb/src/sync.rs index 95036f35a2..e084041562 100644 --- a/crates/crdb/src/sync.rs +++ b/crates/crdb/src/sync.rs @@ -234,7 +234,7 @@ fn sync( continue; } - let (op_range, server_digest) = server_digests.digest(sync_range.clone()); + let server_digest = server_digests.digest(sync_range.clone()); sync_range.end = cmp::max(sync_range.start + server_digest.count, sync_range.end); let mut server_range = server_end..server_end + sync_range.len(); @@ -244,30 +244,23 @@ fn sync( synced_end = sync_range.end; server_end += server_digest.count; } else { - let client_start_op = operations_for_range(client, sync_range.start..) - .next() - .map(|op| op.id()) - .unwrap(); - let client_end_op = operations_for_range(client, sync_range.start + 1..) - .next() - .map(|op| op.id()) - .unwrap(); - let recurse = client_start_op < op_range.end && client_end_op > op_range.start; + let next_client_op_id = { + let mut cursor = client.cursor::<(usize, Digest)>(); + cursor.seek(&sync_range.start, Bias::Right, &()); + cursor.item().map_or(OperationId::MAX, |op| op.id()) + }; + let recurse = next_client_op_id <= server_digest.max_op_id; while let Some(next_sync_range) = stack.last_mut() { - let max_end = cmp::max(sync_range.end, next_sync_range.end); - let mut merged_sync_range = sync_range.start..max_end; - let (merged_op_range, merged_digest) = - server_digests.digest(merged_sync_range.clone()); - merged_sync_range.end = cmp::max( - merged_sync_range.start + merged_digest.count, - merged_sync_range.end, - ); - let intersects = - client_start_op < merged_op_range.end && client_end_op > merged_op_range.start; - if intersects { + let merged_sync_range = + sync_range.start..cmp::max(sync_range.end, next_sync_range.end); + let merged_digest = server_digests.digest(merged_sync_range.clone()); + if next_client_op_id <= merged_digest.max_op_id { break; } else { - sync_range.end = merged_sync_range.end; + sync_range.end = cmp::max( + merged_sync_range.start + merged_digest.count, + merged_sync_range.end, + ); server_range.end = server_end + sync_range.len(); stack.pop(); } @@ -291,7 +284,7 @@ fn sync( ); stats.roundtrips += 1; let server_operations = request_operations(server, server_range.clone()); - debug_assert!(server_operations.len() > 0); + // debug_assert!(server_operations.len() > 0); server_digests.splice( sync_range.clone(), server_operations.iter().map(|op| op.into()), @@ -417,19 +410,19 @@ mod tests { use super::*; use crate::{operations, OperationCount, ReplicaId}; use rand::prelude::*; - use std::env; + use std::{env, mem}; #[test] fn test_sync() { - // assert_sync(1..=15, (1..=5).chain(7..=15)); - // assert_sync(1..=10, 5..=10); - // assert_sync(1..=10, 4..=10); - // assert_sync(1..=10, 1..=5); - // assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]); - // assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]); - // assert_sync(1..=10, 5..=14); - // assert_sync(1..=80, (1..=70).chain(90..=100)); - // assert_sync(1..=1910, (1..=1900).chain(1910..=2000)); + assert_sync(1..=15, (1..=5).chain(7..=15)); + assert_sync(1..=10, 5..=10); + assert_sync(1..=10, 4..=10); + assert_sync(1..=10, 1..=5); + assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]); + assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]); + assert_sync(1..=10, 5..=14); + assert_sync(1..=80, (1..=70).chain(90..=100)); + assert_sync(1..=1910, (1..=1900).chain(1910..=2000)); assert_sync( (1..=1500).chain(4000..=10000), (1..=1000).chain(4000..=11000), @@ -437,22 +430,27 @@ mod tests { } #[gpui::test(iterations = 100)] - fn test_random(mut rng: StdRng) { + fn test_performance(mut rng: StdRng) { let max_operations = env::var("OPERATIONS") .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) .unwrap_or(10); - let max_digest_count = 4096; - let min_operations = 4096; + let max_digest_count = env::var("MAX_DIGEST_COUNT") + .map(|i| i.parse().expect("invalid `MAX_DIGEST_COUNT` variable")) + .unwrap_or(1024); + let min_operations = env::var("MIN_OPERATIONS") + .map(|i| i.parse().expect("invalid `MIN_OPERATIONS` variable")) + .unwrap_or(4096); let mut connected = true; let mut client_ops = btree::Sequence::::new(); let mut server_ops = btree::Sequence::::new(); + let mut client_edits = Vec::new(); + let mut server_edits = Vec::new(); let mut ideal_server_ops = 0; let mut ideal_client_ops = 0; let mut next_reconnection = None; for ix in 1..=max_operations { if connected && rng.gen_bool(0.0005) { - dbg!(ix); connected = false; let mut factor = 0.0005; @@ -463,21 +461,15 @@ mod tests { let remaining_operations = max_operations - ix; let disconnection_period = (remaining_operations as f64 * factor) as usize; next_reconnection = Some(ix + disconnection_period); - dbg!(disconnection_period); + log::info!("disconnecting for {} operations", disconnection_period); } if next_reconnection == Some(ix) { connected = true; next_reconnection = None; - log::debug!("==============="); - // log::debug!( - // "client ops: {:?}", - // client_ops.iter().map(|op| op.id()).collect::>() - // ); - // log::debug!( - // "server ops: {:?}", - // server_ops.iter().map(|op| op.id()).collect::>() - // ); + log::info!("reconnecting"); + client_ops.edit(mem::take(&mut client_edits), &()); + server_ops.edit(mem::take(&mut server_edits), &()); let stats = sync( &mut client_ops, @@ -485,15 +477,15 @@ mod tests { max_digest_count, min_operations, ); - log::debug!("roundtrips: {}", stats.roundtrips); - log::debug!( + log::info!("roundtrips: {}", stats.roundtrips); + log::info!( "ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%", ideal_server_ops, stats.server_operations, stats.server_operations - ideal_server_ops, ((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100. ); - log::debug!( + log::info!( "ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%", ideal_client_ops, stats.client_operations, @@ -511,41 +503,35 @@ mod tests { if connected { let replica_id = ReplicaId(rng.gen_range(0..=1)); - client_ops.insert_or_replace(build_operation2(replica_id, ix), &()); - server_ops.insert_or_replace(build_operation2(replica_id, ix), &()); + client_edits.push(btree::Edit::Insert(op_for_replica(replica_id, ix))); + server_edits.push(btree::Edit::Insert(op_for_replica(replica_id, ix))); } else if rng.gen_bool(0.95) { ideal_server_ops += 1; - server_ops.insert_or_replace(build_operation2(ReplicaId(0), ix), &()); + server_edits.push(btree::Edit::Insert(op_for_replica(ReplicaId(0), ix))); } else { ideal_client_ops += 1; - client_ops.insert_or_replace(build_operation2(ReplicaId(1), ix), &()); + client_edits.push(btree::Edit::Insert(op_for_replica(ReplicaId(1), ix))); } } - log::debug!("============"); - // log::debug!( - // "client ops: {:?}", - // client_ops.iter().map(|op| op.id()).collect::>() - // ); - // log::debug!( - // "server ops: {:?}", - // server_ops.iter().map(|op| op.id()).collect::>() - // ); + log::info!("quiescing"); + client_ops.edit(mem::take(&mut client_edits), &()); + server_ops.edit(mem::take(&mut server_edits), &()); let stats = sync( &mut client_ops, &mut server_ops, max_digest_count, min_operations, ); - log::debug!("roundtrips: {}", stats.roundtrips); - log::debug!( + log::info!("roundtrips: {}", stats.roundtrips); + log::info!( "ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%", ideal_server_ops, stats.server_operations, stats.server_operations - ideal_server_ops, ((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100. ); - log::debug!( + log::info!( "ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%", ideal_client_ops, stats.client_operations, @@ -558,6 +544,45 @@ mod tests { ); } + #[gpui::test(iterations = 100)] + fn test_random(mut rng: StdRng) { + let max_operations = env::var("OPERATIONS") + .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) + .unwrap_or(10); + + let mut client_ops = btree::Sequence::::new(); + let mut server_ops = btree::Sequence::::new(); + for ix in 1..=max_operations { + let replica_id = ReplicaId(rng.gen_range(0..=1)); + + if rng.gen() { + client_ops.insert_or_replace(op_for_replica(replica_id, ix), &()); + } + + if rng.gen() { + server_ops.insert_or_replace(op_for_replica(replica_id, ix), &()); + } + } + + let max_digest_count = rng.gen_range(2..=32); + let min_operations = rng.gen_range(1..100); + log::info!( + "syncing, max digest count: {}, min operations: {}", + max_digest_count, + min_operations + ); + sync( + &mut client_ops, + &mut server_ops, + max_digest_count, + min_operations, + ); + assert_eq!( + client_ops.iter().map(|op| op.id()).collect::>(), + server_ops.iter().map(|op| op.id()).collect::>() + ); + } + fn assert_sync( client_ops: impl IntoIterator, server_ops: impl IntoIterator, @@ -571,9 +596,9 @@ mod tests { .map(build_operation) .collect::>(); - for max_digests in 256..=256 { - for min_operations in 256..=256 { - log::debug!( + for max_digests in [2, 3, 4, 7, 8, 16, 32] { + for min_operations in [1, 2, 3, 4, 7, 8, 16, 32] { + log::info!( "max digests: {}, min operations: {}", max_digests, min_operations @@ -611,7 +636,7 @@ mod tests { }) } - fn build_operation2(replica_id: ReplicaId, id: usize) -> Operation { + fn op_for_replica(replica_id: ReplicaId, id: usize) -> Operation { Operation::CreateBranch(operations::CreateBranch { id: OperationId { replica_id,