Send only missing operations from client to server

This commit is contained in:
Antonio Scandurra 2023-08-06 11:54:06 +02:00
parent 8d7b37b743
commit d5502090f8
3 changed files with 135 additions and 389 deletions

View file

@ -1,6 +1,5 @@
mod btree;
mod dense_id;
mod digest;
mod history;
mod messages;
mod operations;
@ -137,8 +136,8 @@ pub struct OperationCount(usize);
#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
pub struct OperationId {
pub replica_id: ReplicaId,
pub operation_count: OperationCount,
pub replica_id: ReplicaId,
}
impl Debug for OperationId {

View file

@ -1,129 +0,0 @@
use std::{cmp, ops::Range};
use crate::{
btree::{self, Bias},
messages::Operation,
};
use bromberg_sl2::HashMatrix;
#[derive(Clone, Default, PartialEq, Eq)]
pub struct Digest {
pub count: usize,
pub hash: HashMatrix,
}
impl std::fmt::Debug for Digest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Digest")
.field("count", &self.count)
.field("hash", &self.hash.to_hex())
.finish()
}
}
impl Digest {
pub fn new(count: usize, hash: HashMatrix) -> Self {
assert!(count > 0);
Self { count, hash }
}
}
impl From<&'_ Operation> for Digest {
fn from(op: &'_ Operation) -> Self {
Self::new(1, op.id().digest())
}
}
impl btree::Item for Digest {
type Summary = Digest;
fn summary(&self) -> Self::Summary {
self.clone()
}
}
impl btree::Summary for Digest {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
self.count += summary.count;
self.hash = self.hash * summary.hash;
}
}
impl btree::Dimension<'_, Digest> for usize {
fn add_summary(&mut self, summary: &'_ Digest, _: &()) {
*self += summary.count;
}
}
impl btree::Dimension<'_, Digest> for HashMatrix {
fn add_summary(&mut self, summary: &'_ Digest, _: &()) {
*self = *self * summary.hash;
}
}
pub struct DigestSequence {
digests: btree::Sequence<Digest>,
}
impl DigestSequence {
pub fn new() -> Self {
Self {
digests: Default::default(),
}
}
pub fn items(&self) -> Vec<Digest> {
self.digests.items(&())
}
pub fn operation_count(&self) -> usize {
self.digests.summary().count
}
pub fn digest(&self, mut range: Range<usize>) -> Digest {
range.start = cmp::min(range.start, self.digests.summary().count);
range.end = cmp::min(range.end, self.digests.summary().count);
let mut cursor = self.digests.cursor::<usize>();
cursor.seek(&range.start, Bias::Right, &());
assert_eq!(
*cursor.start(),
range.start,
"start is not at the start of a digest range"
);
let mut hash: HashMatrix = cursor.summary(&range.end, Bias::Right, &());
if range.end > *cursor.start() {
let digest = cursor.item().unwrap();
hash = hash * digest.hash;
cursor.next(&());
}
Digest {
count: cursor.start() - range.start,
hash,
}
}
pub fn splice(&mut self, mut range: Range<usize>, digests: impl IntoIterator<Item = Digest>) {
let max_index = self.digests.summary().count;
if range.start > max_index {
panic!("range out of bounds");
}
range.end = cmp::min(range.end, max_index);
let mut cursor = self.digests.cursor::<usize>();
let mut new_digests = cursor.slice(&range.start, Bias::Right, &());
assert_eq!(*cursor.start(), range.start, "start is nedigest range");
cursor.seek(&range.end, Bias::Right, &());
assert_eq!(
*cursor.start(),
range.end,
"end is not at the start of a digest range"
);
new_digests.extend(digests, &());
new_digests.append(cursor.suffix(&()), &());
drop(cursor);
self.digests = new_digests;
}
}

View file

@ -1,15 +1,80 @@
use crate::{
btree::{self, Bias},
digest::{Digest, DigestSequence},
messages::{Operation, PublishOperations},
OperationId,
};
use bromberg_sl2::HashMatrix;
use std::{
cmp::{self, Ordering},
cmp::Ordering,
iter,
ops::{Range, RangeBounds},
};
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Digest {
count: usize,
hash: HashMatrix,
}
impl btree::Item for Operation {
type Summary = OperationSummary;
fn summary(&self) -> Self::Summary {
OperationSummary {
max_id: self.id(),
digest: Digest {
count: 1,
hash: bromberg_sl2::hash_strict(&self.id().to_be_bytes()),
},
}
}
}
impl btree::KeyedItem for Operation {
type Key = OperationId;
fn key(&self) -> Self::Key {
self.id()
}
}
#[derive(Clone, Debug, Default)]
pub struct OperationSummary {
max_id: OperationId,
digest: Digest,
}
impl btree::Summary for OperationSummary {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
debug_assert!(self.max_id < summary.max_id);
self.max_id = summary.max_id;
self.digest.count += summary.digest.count;
self.digest.hash = self.digest.hash * summary.digest.hash;
}
}
impl btree::Dimension<'_, OperationSummary> for OperationId {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
debug_assert!(*self < summary.max_id);
*self = summary.max_id;
}
}
impl btree::Dimension<'_, OperationSummary> for usize {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
*self += summary.digest.count;
}
}
impl btree::Dimension<'_, OperationSummary> for Digest {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
self.count += summary.digest.count;
self.hash = self.hash * summary.digest.hash;
}
}
struct SyncRequest {
digests: Vec<Digest>,
}
@ -63,287 +128,98 @@ fn publish_operations(
fn sync_client(
client_operations: &mut btree::Sequence<Operation>,
server_operations: &mut btree::Sequence<Operation>,
min_digest_delta: usize,
min_shared_prefix_end: usize,
max_digest_count: usize,
) -> SyncStats {
let mut client_operation_count = client_operations.summary().digest.count;
let mut digests = Vec::new();
let mut n = client_operation_count;
let mut digest_end_ix = client_operations.summary().digest.count;
// We will multiply by some some factor less than 1 to produce digests
// over ever smaller digest ranges.
// op_count * factor^max_digest_count = min_digest_size
// factor^max_digest_count = min_digest_size/op_count
// max_digest_count * log(factor) = log(min_digest_size/op_count)
// log(factor) = log(min_digest_size/op_count)/max_digest_count
// factor = base^(log(min_digest_size/op_count)/max_digest_count)
// over ever smaller digest ranges. The following formula ensures that
// we will produce `max_digest_count` digests, and that the last digest
// will go from `0` to `min_shared_prefix_end`.
// op_count * factor^max_digest_count = min_shared_prefix_end
// factor^max_digest_count = min_shared_prefix_end/op_count
// max_digest_count * log_2(factor) = log_2(min_shared_prefix_end/op_count)
// log_2(factor) = log_2(min_shared_prefix_end/op_count)/max_digest_count
// factor = 2^(log_2(min_shared_prefix_end/op_count)/max_digest_count)
let factor = 2f64.powf(
(min_digest_delta as f64 / client_operation_count as f64).log2() / max_digest_count as f64,
(min_shared_prefix_end as f64 / digest_end_ix as f64).log2() / max_digest_count as f64,
);
for _ in 0..max_digest_count {
if n <= min_digest_delta {
if digest_end_ix <= min_shared_prefix_end {
break;
}
digests.push(digest_for_range(client_operations, 0..n));
n = (n as f64 * factor).ceil() as usize; // 🪬
digests.push(digest_for_range(client_operations, 0..digest_end_ix));
digest_end_ix = (digest_end_ix as f64 * factor).ceil() as usize; // 🪬
}
let response = sync_server(server_operations, SyncRequest { digests });
let client_suffix = operations_for_range(client_operations, response.shared_prefix_end..)
.cloned()
.collect::<Vec<_>>();
let sync_stats = SyncStats {
server_operations: response.operations.len(),
client_operations: client_suffix.len(),
let server_response = sync_server(server_operations, SyncRequest { digests });
let new_ops_from_client = {
let mut new_ops_from_client = Vec::new();
let mut client_cursor = client_operations.cursor::<usize>();
let mut new_client_operations =
client_cursor.slice(&server_response.shared_prefix_end, Bias::Right, &());
let mut server_operations = server_response.operations.iter().peekable();
let mut new_ops_from_server = Vec::new();
while let Some(server_op) = server_operations.peek() {
match client_cursor.item() {
Some(client_operation) => {
let comparison = server_op.id().cmp(&client_operation.id());
match comparison {
Ordering::Less => {
new_ops_from_server.push(server_operations.next().unwrap().clone());
}
_ => {
new_client_operations.extend(new_ops_from_server.drain(..), &());
new_client_operations.push(client_operation.clone(), &());
client_cursor.next(&());
if comparison == Ordering::Equal {
server_operations.next();
} else {
new_ops_from_client.push(client_operation.clone());
}
}
}
}
None => {
new_ops_from_server.push(server_operations.next().unwrap().clone());
}
}
}
new_client_operations.extend(new_ops_from_server, &());
let client_suffix = client_cursor.suffix(&());
new_client_operations.append(client_suffix.clone(), &());
drop(client_cursor);
*client_operations = new_client_operations;
new_ops_from_client.extend(client_suffix.iter().cloned());
new_ops_from_client
};
let sync_stats = SyncStats {
server_operations: server_response.operations.len(),
client_operations: new_ops_from_client.len(),
};
client_operations.edit(
response
.operations
.into_iter()
.map(btree::Edit::Insert)
.collect(),
&(),
);
publish_operations(
server_operations,
PublishOperations {
repo_id: Default::default(),
operations: client_suffix,
operations: new_ops_from_client,
},
);
sync_stats
}
impl btree::Item for Operation {
type Summary = OperationSummary;
fn summary(&self) -> Self::Summary {
OperationSummary {
max_id: self.id(),
digest: Digest::from(self),
}
}
}
impl btree::KeyedItem for Operation {
type Key = OperationId;
fn key(&self) -> Self::Key {
self.id()
}
}
#[derive(Clone, Debug, Default)]
pub struct OperationSummary {
max_id: OperationId,
digest: Digest,
}
impl btree::Summary for OperationSummary {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
debug_assert!(self.max_id < summary.max_id);
self.max_id = summary.max_id;
Digest::add_summary(&mut self.digest, &summary.digest, &());
}
}
impl btree::Dimension<'_, OperationSummary> for OperationId {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
debug_assert!(*self < summary.max_id);
*self = summary.max_id;
}
}
impl btree::Dimension<'_, OperationSummary> for usize {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
*self += summary.digest.count;
}
}
impl btree::Dimension<'_, OperationSummary> for Digest {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
Digest::add_summary(self, &summary.digest, &());
}
}
fn request_digests(
operations: &btree::Sequence<Operation>,
mut root_range: Range<usize>,
count: usize,
min_operations: usize,
) -> Vec<Digest> {
root_range.start = cmp::min(root_range.start, operations.summary().digest.count);
root_range.end = cmp::min(root_range.end, operations.summary().digest.count);
subdivide_range(root_range, count, min_operations)
.map(|range| digest_for_range(operations, range))
.collect()
}
fn subdivide_range(
root_range: Range<usize>,
count: usize,
min_operations: usize,
) -> impl Iterator<Item = Range<usize>> {
let subrange_len = cmp::max(min_operations, (root_range.len() + count - 1) / count);
let mut subrange_start = root_range.start;
iter::from_fn(move || {
if subrange_start >= root_range.end {
return None;
}
let subrange = subrange_start..cmp::min(subrange_start + subrange_len, root_range.end);
subrange_start = subrange.end;
Some(subrange)
})
}
fn sync(
client: &mut btree::Sequence<Operation>,
server: &mut btree::Sequence<Operation>,
max_digests: usize,
min_operations: usize,
) {
let mut server_digests = DigestSequence::new();
let mut roundtrips = 1;
let digests = request_digests(server, 0..usize::MAX, max_digests, min_operations);
server_digests.splice(0..0, digests.iter().cloned());
let server_operation_count = server_digests.operation_count();
let max_sync_range = 0..(client.summary().digest.count + server_operation_count);
let mut stack =
subdivide_range(max_sync_range, max_digests, min_operations).collect::<Vec<_>>();
stack.reverse();
let mut missed_server_ops = Vec::new();
let mut server_end = 0;
let mut synced_end = 0;
while let Some(mut sync_range) = stack.pop() {
sync_range.start = cmp::max(sync_range.start, synced_end);
if sync_range.start >= client.summary().digest.count || server_end >= server_operation_count
{
// We've exhausted all operations from either the client or the server, so we
// can fast track to publishing anything the server hasn't seen and requesting
// anything the client hasn't seen.
break;
} else if sync_range.end < synced_end {
// This range has already been synced, so we can skip it.
continue;
}
let server_digest = server_digests.digest(sync_range.clone());
sync_range.end = cmp::max(sync_range.start + server_digest.count, sync_range.end);
let server_range = server_end..server_end + sync_range.len();
let client_digest = digest_for_range(client, sync_range.clone());
if client_digest == server_digest {
log::debug!("skipping {:?}", sync_range);
synced_end = sync_range.end;
server_end += server_digest.count;
} else if sync_range.len() > min_operations {
log::debug!("descending into {:?}", sync_range);
roundtrips += 1;
let digests =
request_digests(server, server_range.clone(), max_digests, min_operations);
server_digests.splice(sync_range.clone(), digests.iter().cloned());
let old_stack_len = stack.len();
stack.extend(subdivide_range(sync_range, max_digests, min_operations));
stack[old_stack_len..].reverse();
} else {
log::debug!("exchanging operations for {:?}", sync_range);
roundtrips += 1;
let server_operations = request_operations(server, server_range.clone());
debug_assert!(server_operations.len() > 0);
server_digests.splice(
sync_range.clone(),
server_operations.iter().map(|op| op.into()),
);
let mut missed_client_ops = Vec::new();
let mut server_operations = server_operations.into_iter().peekable();
let mut client_operations = operations_for_range(client, sync_range.clone()).peekable();
for _ in sync_range {
match (client_operations.peek(), server_operations.peek()) {
(Some(client_operation), Some(server_operation)) => {
match client_operation.id().cmp(&server_operation.id()) {
Ordering::Less => {
let client_operation = client_operations.next().unwrap();
missed_server_ops
.push(btree::Edit::Insert(client_operation.clone()));
server_digests
.splice(synced_end..synced_end, [client_operation.into()]);
}
Ordering::Equal => {
client_operations.next().unwrap();
server_operations.next().unwrap();
server_end += 1;
}
Ordering::Greater => {
let server_operation = server_operations.next().unwrap();
missed_client_ops.push(btree::Edit::Insert(server_operation));
server_end += 1;
}
}
}
(None, Some(_)) => {
let server_operation = server_operations.next().unwrap();
missed_client_ops.push(btree::Edit::Insert(server_operation));
server_end += 1;
}
(Some(_), None) => {
let client_operation = client_operations.next().unwrap();
missed_server_ops.push(btree::Edit::Insert(client_operation.clone()));
server_digests.splice(synced_end..synced_end, [client_operation.into()]);
}
(None, None) => break,
}
synced_end += 1;
}
drop(client_operations);
client.edit(missed_client_ops, &());
}
}
// Fetch and publish the remaining suffixes.
if synced_end < client.summary().digest.count || server_end < server_operation_count {
log::debug!("exchanging operations for {:?}..", synced_end);
roundtrips += 1;
let remaining_client_ops = operations_for_range(client, synced_end..);
missed_server_ops.extend(remaining_client_ops.cloned().map(btree::Edit::Insert));
let remaining_server_ops = request_operations(server, server_end..);
client.edit(
remaining_server_ops
.into_iter()
.map(btree::Edit::Insert)
.collect(),
&(),
);
}
server.edit(missed_server_ops, &());
log::debug!("roundtrips: {}", roundtrips);
}
fn digest_for_range(operations: &btree::Sequence<Operation>, range: Range<usize>) -> Digest {
let mut cursor = operations.cursor::<usize>();
cursor.seek(&range.start, Bias::Right, &());
cursor.summary(&range.end, Bias::Right, &())
}
fn request_operations<T: RangeBounds<usize>>(
operations: &btree::Sequence<Operation>,
range: T,
) -> Vec<Operation> {
operations_for_range(operations, range).cloned().collect()
}
fn operations_for_range<T: RangeBounds<usize>>(
operations: &btree::Sequence<Operation>,
range: T,
@ -394,8 +270,8 @@ mod tests {
let max_operations = env::var("OPERATIONS")
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(10);
let min_digest_delta = 32;
let max_digest_count = 2048;
let min_shared_prefix_end = 1024;
let max_digest_count = 1024;
let mut connected = true;
let mut client_ops = btree::Sequence::new();
@ -427,7 +303,7 @@ mod tests {
let stats = sync_client(
&mut client_ops,
&mut server_ops,
min_digest_delta,
min_shared_prefix_end,
max_digest_count,
);
log::debug!(
@ -469,7 +345,7 @@ mod tests {
let stats = sync_client(
&mut client_ops,
&mut server_ops,
min_digest_delta,
min_shared_prefix_end,
max_digest_count,
);
log::debug!(