From 05ec6b89c20e47286625ec6109fc29d22e8dc310 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Sun, 30 Jul 2023 13:45:31 +0200 Subject: [PATCH] WIP --- crates/crdb/src/btree/map.rs | 53 +++-- crates/crdb/src/crdb.rs | 427 +++++++++++++---------------------- crates/crdb/src/history.rs | 407 +++++++++++++++++++++++++-------- 3 files changed, 496 insertions(+), 391 deletions(-) diff --git a/crates/crdb/src/btree/map.rs b/crates/crdb/src/btree/map.rs index 13e6d5bf86..6dac21d1a0 100644 --- a/crates/crdb/src/btree/map.rs +++ b/crates/crdb/src/btree/map.rs @@ -60,6 +60,16 @@ where Ok(Self(Sequence::load_root(id, kv).await?)) } + pub async fn load_all(id: SavedId, kv: &dyn KvStore) -> Result + where + K: Serialize + for<'de> Deserialize<'de>, + V: Serialize + for<'de> Deserialize<'de>, + { + let mut sequence = Sequence::load_root(id, kv).await?; + sequence.load(kv, &(), |_| true).await?; + Ok(Self(sequence)) + } + pub async fn load(&mut self, key: &K, kv: &dyn KvStore) -> Result> where K: Serialize + for<'de> Deserialize<'de>, @@ -77,6 +87,21 @@ where Ok(self.get(key)) } + pub async fn load_from( + &mut self, + start: &K, + kv: &dyn KvStore, + ) -> Result> + where + K: Serialize + for<'de> Deserialize<'de>, + V: Serialize + for<'de> Deserialize<'de>, + { + self.0 + .load(kv, &(), |probe| probe.start.0 >= *start) + .await?; + Ok(self.iter_from(start)) + } + pub async fn store(&mut self, key: K, value: V, kv: &dyn KvStore) -> Result<()> where K: Serialize + for<'de> Deserialize<'de>, @@ -164,33 +189,7 @@ where cursor.item().map(|item| (&item.key, &item.value)) } - pub fn range<'a, R>(&self, range: R) -> impl Iterator - where - K: 'a, - R: RangeBounds<&'a K>, - { - let mut cursor = self.0.cursor::>(); - match range.start_bound() { - Bound::Included(start) => { - let start = MapKeyRef(Some(*start)); - cursor.seek(&start, Bias::Left, &()); - } - Bound::Excluded(start) => { - let start = MapKeyRef(Some(*start)); - cursor.seek(&start, Bias::Right, &()); - } - Bound::Unbounded => cursor.next(&()), - } - cursor - .map(|entry| (&entry.key, &entry.value)) - .take_while(move |(key, _)| match range.end_bound() { - Bound::Included(end) => key <= end, - Bound::Excluded(end) => key < end, - Bound::Unbounded => true, - }) - } - - pub fn iter_from<'a>(&'a self, from: &'a K) -> impl Iterator + '_ { + pub fn iter_from<'a>(&self, from: &'a K) -> impl Iterator { let mut cursor = self.0.cursor::>(); let from_key = MapKeyRef(Some(from)); cursor.seek(&from_key, Bias::Left, &()); diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index fd5c1eceb3..7698cc79f8 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -14,6 +14,7 @@ use btree::{Bias, KvStore, SavedId}; use collections::{btree_map, BTreeMap, BTreeSet, Bound, HashMap, HashSet, VecDeque}; use dense_id::DenseId; use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt}; +use history::{History, SavedHistory}; use messages::{MessageEnvelope, Operation, RequestEnvelope}; use parking_lot::{Mutex, RwLock}; use rope::Rope; @@ -138,7 +139,7 @@ impl OperationId { pub fn new(replica_id: ReplicaId) -> Self { Self { replica_id, - operation_count: OperationCount::default(), + operation_count: OperationCount(1), } } @@ -381,25 +382,28 @@ impl Checkout { let response = client .request(messages::SyncRepo { id: self.repo.id, - max_operation_ids: self.repo.read(|repo| (&repo.max_operation_ids).into()), - }) - .await?; - - self.repo - .update_async(|repo| { - let kv = client.db.kv.clone(); - let operations = response.operations.clone(); - async move { - repo.apply_operations(operations, &*kv).await?; - Ok((None, ())) - } - .boxed() + max_operation_ids: self + .repo + .read(|repo| repo.history.max_operation_ids().into()), }) .await?; let operations = self .repo - .read(|snapshot| snapshot.operations_since(&(&response.max_operation_ids).into())); + .update_async(|repo| { + let kv = client.db.kv.clone(); + let response = response.clone(); + async move { + repo.apply_operations(response.operations, &*kv).await?; + let operations = repo + .history + .operations_since(&(&response.max_operation_ids).into(), &*kv) + .await?; + Ok((None, operations)) + } + .boxed() + }) + .await?; for chunk in operations.chunks(CHUNK_SIZE) { client @@ -737,12 +741,25 @@ impl Server { .repo(request.id) .ok_or_else(|| anyhow!("repo not found"))?; - repo.read(|snapshot| { - Ok(messages::SyncRepoResponse { - operations: snapshot.operations_since(&(&request.max_operation_ids).into()), - max_operation_ids: (&snapshot.max_operation_ids).into(), - }) + repo.update_async(|snapshot| { + let request = request.clone(); + let kv = self.db.kv.clone(); + async move { + let operations = snapshot + .history + .operations_since(&(&request.max_operation_ids).into(), &*kv) + .await?; + Ok(( + None, + messages::SyncRepoResponse { + operations, + max_operation_ids: snapshot.history.max_operation_ids().into(), + }, + )) + } + .boxed() }) + .await } async fn handle_publish_operations( @@ -899,7 +916,7 @@ impl Repo { .update(&self.id, |repo| { let (operation, result) = f(repo); if let Some(operation) = operation { - repo.save_local_operation(operation.clone()); + repo.history.insert_local(operation.clone()); if let Some(local_operation_created) = self.db.local_operation_created.as_ref() { local_operation_created(self.id, operation); @@ -1007,7 +1024,7 @@ impl Branch { let mut revision = repo .cached_revision(&head) .expect("head revision must exist"); - let operation_id = repo.last_operation_id.tick(); + let operation_id = repo.history.next_operation_id(); let (operation, result) = f(operation_id, head.clone(), &mut revision); repo.branches .update(&self.id, |branch| branch.head = operation_id.into()); @@ -1686,47 +1703,36 @@ impl<'a> RopeBuilder<'a> { #[derive(Clone, Debug)] pub struct RepoSnapshot { id: RepoId, - last_operation_id: OperationId, + history: History, branches: btree::Map, // TODO: Change String to Arc for branch_ids_by_name branch_ids_by_name: btree::Map, - operations: btree::Map, revisions: Arc>>, - max_operation_ids: btree::Map, - deferred_operations: btree::Sequence, } #[derive(Serialize, Deserialize)] struct SavedRepoSnapshot { - last_operation_id: OperationId, + history: SavedHistory, branches: btree::SavedId, branch_ids_by_name: btree::SavedId, - operations: btree::SavedId, - max_operation_ids: btree::SavedId, - deferred_operations: btree::SavedId, } impl RepoSnapshot { fn new(id: RepoId, replica_id: ReplicaId) -> Self { Self { id, - last_operation_id: OperationId::new(replica_id), + history: History::new(replica_id), branches: Default::default(), branch_ids_by_name: Default::default(), - operations: Default::default(), revisions: Default::default(), - max_operation_ids: Default::default(), - deferred_operations: Default::default(), } } fn ptr_eq(this: &Self, other: &Self) -> bool { - btree::Map::ptr_eq(&this.branches, &other.branches) + this.id == other.id && btree::Map::ptr_eq(&this.branches, &other.branches) && btree::Map::ptr_eq(&this.branch_ids_by_name, &other.branch_ids_by_name) - && btree::Map::ptr_eq(&this.operations, &other.operations) - && btree::Map::ptr_eq(&this.max_operation_ids, &other.max_operation_ids) - && this.last_operation_id == other.last_operation_id + && History::ptr_eq(&this.history, &other.history) } async fn load(id: RepoId, kv: &dyn KvStore) -> Result { @@ -1734,25 +1740,18 @@ impl RepoSnapshot { let saved_repo = serde_bare::from_slice::(&repo_bytes)?; Ok(Self { id, - last_operation_id: saved_repo.last_operation_id, + history: History::load(saved_repo.history, kv).await?, branches: btree::Map::load_root(saved_repo.branches, kv).await?, branch_ids_by_name: btree::Map::load_root(saved_repo.branch_ids_by_name, kv).await?, - operations: btree::Map::load_root(saved_repo.operations, kv).await?, revisions: Default::default(), - max_operation_ids: btree::Map::load_root(saved_repo.max_operation_ids, kv).await?, - deferred_operations: btree::Sequence::load_root(saved_repo.deferred_operations, kv) - .await?, }) } async fn save(&self, id: RepoId, kv: &dyn KvStore) -> Result<()> { let saved_repo = SavedRepoSnapshot { - last_operation_id: self.last_operation_id, + history: self.history.save(kv).await?, branches: self.branches.save(kv).await?, branch_ids_by_name: self.branch_ids_by_name.save(kv).await?, - operations: self.operations.save(kv).await?, - max_operation_ids: self.max_operation_ids.save(kv).await?, - deferred_operations: self.deferred_operations.save(kv).await?, }; let repo_bytes = serde_bare::to_vec(&saved_repo)?; kv.store(id.to_be_bytes(), "root".into(), repo_bytes) @@ -1762,7 +1761,7 @@ impl RepoSnapshot { fn create_empty_branch(&mut self, name: impl Into>) -> (Operation, OperationId) { let name = name.into(); - let branch_id = self.last_operation_id.tick(); + let branch_id = self.history.next_operation_id(); self.branches.insert( branch_id, BranchSnapshot { @@ -1781,42 +1780,6 @@ impl RepoSnapshot { ) } - fn operations_since(&self, version: &btree::Map) -> Vec { - let mut new_operations = Vec::new(); - for (replica_id, end_op_count) in self.max_operation_ids.iter() { - let end_op = OperationId { - replica_id: *replica_id, - operation_count: *end_op_count, - }; - if let Some(start_op_count) = version.get(&replica_id) { - let start_op = OperationId { - replica_id: *replica_id, - operation_count: *start_op_count, - }; - new_operations.extend( - self.operations - .range((Bound::Excluded(&start_op), Bound::Included(&end_op))) - .map(|(_, op)| op.clone()), - ); - } else { - let start_op = OperationId::new(*replica_id); - new_operations.extend( - self.operations - .range((Bound::Included(&start_op), Bound::Included(&end_op))) - .map(|(_, op)| op.clone()), - ); - } - } - new_operations - } - - fn save_local_operation(&mut self, operation: Operation) { - let replica_id = operation.id().replica_id; - let count = operation.id().operation_count; - self.max_operation_ids.insert(replica_id, count); - self.operations.insert(operation.id(), operation); - } - /// Apply the given operations and any deferred operations that are now applicable. async fn apply_operations( &mut self, @@ -1836,15 +1799,11 @@ impl RepoSnapshot { operation: Operation, kv: &dyn KvStore, ) -> Result> { - if self.operations.contains_key(&operation.id()) { + if self.history.has_applied(&operation, kv).await? { return Ok(Default::default()); } - if operation - .parent() - .iter() - .all(|parent| self.operations.contains_key(&parent)) - { + if self.history.can_apply(&operation, kv).await? { let operation_id = operation.id(); let mut new_head; match &operation { @@ -1864,7 +1823,7 @@ impl RepoSnapshot { | Operation::Edit(operations::Edit { branch_id, parent, .. }) => { - if let Some(branch) = self.branches.get(branch_id).cloned() { + if let Some(branch) = self.branches.load(branch_id, kv).await?.cloned() { new_head = branch.head; new_head.observe(operation_id, &parent); self.branches @@ -1880,8 +1839,7 @@ impl RepoSnapshot { } }; - self.save_remote_operation(operation, kv).await?; - let flushed_operations = self.flush_deferred_operations(operation_id, kv).await?; + let flushed_operations = self.history.insert(operation, kv).await?; // The following ensures that a revision for the branch head is always present. #[cfg(not(any(test, feature = "test-support")))] @@ -1891,73 +1849,11 @@ impl RepoSnapshot { Ok(flushed_operations) } else { - for parent in operation.parent().iter() { - self.deferred_operations.insert_or_replace( - DeferredOperation { - parent: *parent, - operation: operation.clone(), - }, - &(), - ); - } + self.history.defer(operation, kv).await?; Ok(Default::default()) } } - /// Remove any operations deferred on the given parent and add them to the - /// provided operation queue. This is called in `apply_operations`. - async fn flush_deferred_operations( - &mut self, - parent_id: OperationId, - kv: &dyn KvStore, - ) -> Result> { - self.deferred_operations - .load(kv, &(), |probe| { - let key_range = ( - Bound::Excluded(*probe.start), - Bound::Included(*probe.summary), - ); - key_range.contains(&parent_id) - }) - .await?; - let mut cursor = self.deferred_operations.cursor::(); - let mut remaining = cursor.slice(&parent_id, Bias::Left, &()); - let mut flushed = SmallVec::new(); - flushed.extend( - cursor - .slice(&parent_id, Bias::Right, &()) - .iter() - .map(|deferred| deferred.operation.clone()), - ); - remaining.append(cursor.suffix(&()), &()); - drop(cursor); - self.deferred_operations = remaining; - Ok(flushed) - } - - async fn save_remote_operation( - &mut self, - operation: Operation, - kv: &dyn KvStore, - ) -> Result<()> { - let replica_id = operation.id().replica_id; - let count = operation.id().operation_count; - if self.max_operation_ids.load(&replica_id, kv).await?.copied() < Some(count) { - self.max_operation_ids.insert(replica_id, count); - } - self.last_operation_id.observe(operation.id()); - self.operations.store(operation.id(), operation, kv).await?; - Ok(()) - } - - async fn operation( - &mut self, - operation_id: OperationId, - kv: &dyn KvStore, - ) -> Result> { - self.operations.load(&operation_id, kv).await - } - fn cached_revision(&self, revision_id: &RevisionId) -> Result { Ok(self .revisions @@ -1973,123 +1869,124 @@ impl RepoSnapshot { revision_id: &RevisionId, kv: &dyn KvStore, ) -> Result { - // First, check if we have a revision cached for this revision id. - // If not, we'll need to reconstruct it from a previous revision. - // We need to find a cached revision that is an ancestor of the given revision id. - // Once we find it, we must apply all ancestors of the given revision id that are not contained in the cached revision. - let mut revision = self.revisions.lock().get(revision_id).cloned(); - if revision.is_none() { - revision = Revision::load(self.id, revision_id, kv).await.ok(); - if let Some(revision) = revision.as_ref() { - self.revisions - .lock() - .entry(revision_id.clone()) - .or_insert_with(|| revision.clone()); - } - } + todo!() + // // First, check if we have a revision cached for this revision id. + // // If not, we'll need to reconstruct it from a previous revision. + // // We need to find a cached revision that is an ancestor of the given revision id. + // // Once we find it, we must apply all ancestors of the given revision id that are not contained in the cached revision. + // let mut revision = self.revisions.lock().get(revision_id).cloned(); + // if revision.is_none() { + // revision = Revision::load(self.id, revision_id, kv).await.ok(); + // if let Some(revision) = revision.as_ref() { + // self.revisions + // .lock() + // .entry(revision_id.clone()) + // .or_insert_with(|| revision.clone()); + // } + // } - if let Some(revision) = revision { - Ok(revision) - } else { - struct Search { - start: OperationId, - ancestor: RevisionId, - } + // if let Some(revision) = revision { + // Ok(revision) + // } else { + // struct Search { + // start: OperationId, + // ancestor: RevisionId, + // } - let mut ancestors = HashMap::>::default(); - let mut searches = VecDeque::new(); - let mut operations = BTreeSet::new(); - for operation_id in revision_id.iter() { - operations.insert((operation_id.operation_count, operation_id.replica_id)); - searches.push_back(Search { - start: *operation_id, - ancestor: self - .operation(*operation_id, kv) - .await? - .ok_or_else(|| anyhow!("operation {:?} not found", operation_id))? - .parent() - .clone(), - }); - } + // let mut ancestors = HashMap::>::default(); + // let mut searches = VecDeque::new(); + // let mut operations = BTreeSet::new(); + // for operation_id in revision_id.iter() { + // operations.insert((operation_id.operation_count, operation_id.replica_id)); + // searches.push_back(Search { + // start: *operation_id, + // ancestor: self + // .operation(*operation_id, kv) + // .await? + // .ok_or_else(|| anyhow!("operation {:?} not found", operation_id))? + // .parent() + // .clone(), + // }); + // } - let mut common_ancestor_revision = Revision::default(); - let mut missing_operations_start = (OperationCount::default(), ReplicaId::default()); - while let Some(search) = searches.pop_front() { - let reachable_from = ancestors.entry(search.ancestor.clone()).or_default(); - reachable_from.insert(search.start); + // let mut common_ancestor_revision = Revision::default(); + // let mut missing_operations_start = (OperationCount::default(), ReplicaId::default()); + // while let Some(search) = searches.pop_front() { + // let reachable_from = ancestors.entry(search.ancestor.clone()).or_default(); + // reachable_from.insert(search.start); - // If the current revision is reachable from every operation in the original - // revision id, it's a common ancestor. - if reachable_from.len() == revision_id.len() { - // We've found a common ancestor, now we load a revision for it. For it to - // be a common ancestor means that all its downstream operations must - // have causally happened after it. Therefore, we should be able to - // use the maximum lamport timestamp in the common ancestor's revision - // and select only those operations we've found in the backwards search - // which have a higher lamport timestamp. - let revision = self.load_revision(&search.ancestor, kv).await?; - common_ancestor_revision = revision.clone(); - if let Some(max_operation_count) = search - .ancestor - .iter() - .map(|operation_id| operation_id.operation_count) - .max() - { - missing_operations_start = ( - OperationCount(max_operation_count.0 + 1), - ReplicaId::default(), - ); - } + // // If the current revision is reachable from every operation in the original + // // revision id, it's a common ancestor. + // if reachable_from.len() == revision_id.len() { + // // We've found a common ancestor, now we load a revision for it. For it to + // // be a common ancestor means that all its downstream operations must + // // have causally happened after it. Therefore, we should be able to + // // use the maximum lamport timestamp in the common ancestor's revision + // // and select only those operations we've found in the backwards search + // // which have a higher lamport timestamp. + // let revision = self.load_revision(&search.ancestor, kv).await?; + // common_ancestor_revision = revision.clone(); + // if let Some(max_operation_count) = search + // .ancestor + // .iter() + // .map(|operation_id| operation_id.operation_count) + // .max() + // { + // missing_operations_start = ( + // OperationCount(max_operation_count.0 + 1), + // ReplicaId::default(), + // ); + // } - break; - } + // break; + // } - for operation_id in search.ancestor.iter() { - operations.insert((operation_id.operation_count, operation_id.replica_id)); - searches.push_back(Search { - start: search.start, - ancestor: self - .operation(*operation_id, kv) - .await? - .expect("operation must exist") - .parent() - .clone(), - }); - } - } + // for operation_id in search.ancestor.iter() { + // operations.insert((operation_id.operation_count, operation_id.replica_id)); + // searches.push_back(Search { + // start: search.start, + // ancestor: self + // .operation(*operation_id, kv) + // .await? + // .expect("operation must exist") + // .parent() + // .clone(), + // }); + // } + // } - // Apply all the missing operations to the found revision. - for (operation_count, replica_id) in operations.range(missing_operations_start..) { - let missing_operation_id = OperationId { - replica_id: *replica_id, - operation_count: *operation_count, - }; - match self - .operation(missing_operation_id, kv) - .await? - .expect("operation must exist") - .clone() - { - Operation::CreateDocument(op) => { - op.apply(&mut common_ancestor_revision); - } - Operation::Edit(op) => { - let parent_revision = self.load_revision(&op.parent, kv).await?; - op.apply(&parent_revision, &mut common_ancestor_revision)?; - } - Operation::CreateBranch(_) => { - // Creating a branch doesn't have an impact on the revision, so we - // can ignore it. - } - } - } + // // Apply all the missing operations to the found revision. + // for (operation_count, replica_id) in operations.range(missing_operations_start..) { + // let missing_operation_id = OperationId { + // replica_id: *replica_id, + // operation_count: *operation_count, + // }; + // match self + // .operation(missing_operation_id, kv) + // .await? + // .expect("operation must exist") + // .clone() + // { + // Operation::CreateDocument(op) => { + // op.apply(&mut common_ancestor_revision); + // } + // Operation::Edit(op) => { + // let parent_revision = self.load_revision(&op.parent, kv).await?; + // op.apply(&parent_revision, &mut common_ancestor_revision)?; + // } + // Operation::CreateBranch(_) => { + // // Creating a branch doesn't have an impact on the revision, so we + // // can ignore it. + // } + // } + // } - self.revisions - .lock() - .entry(revision_id.clone()) - .or_insert_with(|| common_ancestor_revision.clone()); - Ok(common_ancestor_revision) - } + // self.revisions + // .lock() + // .entry(revision_id.clone()) + // .or_insert_with(|| common_ancestor_revision.clone()); + // Ok(common_ancestor_revision) + // } } } diff --git a/crates/crdb/src/history.rs b/crates/crdb/src/history.rs index fdb84b00c0..5689e2a6a0 100644 --- a/crates/crdb/src/history.rs +++ b/crates/crdb/src/history.rs @@ -1,39 +1,204 @@ +use std::{cmp::Ordering, ops::RangeBounds}; + use crate::{ - btree::{self, KvStore}, + btree::{self, Bias, KvStore, SavedId}, messages::Operation, OperationCount, OperationId, ReplicaId, RevisionId, }; use anyhow::{anyhow, Result}; -use collections::{BTreeSet, HashMap, HashSet, VecDeque}; +use collections::{BTreeSet, Bound, HashMap, HashSet, VecDeque}; +use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; -struct History { +#[derive(Serialize, Deserialize)] +pub struct SavedHistory { + operations: SavedId, + next_operation_id: OperationId, + max_operation_ids: SavedId, + deferred_operations: SavedId, +} + +#[derive(Clone, Debug)] +pub struct History { operations: btree::Map, next_operation_id: OperationId, + max_operation_ids: btree::Map, + deferred_operations: btree::Sequence, } impl History { - fn new(replica_id: ReplicaId) -> Self { + pub fn new(replica_id: ReplicaId) -> Self { Self { operations: Default::default(), next_operation_id: OperationId::new(replica_id), + max_operation_ids: Default::default(), + deferred_operations: Default::default(), } } - fn next_operation_id(&mut self) -> OperationId { + pub fn ptr_eq(&self, other: &Self) -> bool { + btree::Map::ptr_eq(&self.operations, &other.operations) + && btree::Map::ptr_eq(&self.max_operation_ids, &other.max_operation_ids) + && btree::Sequence::ptr_eq(&self.deferred_operations, &other.deferred_operations) + && self.next_operation_id == other.next_operation_id + } + + pub async fn load(saved_history: SavedHistory, kv: &dyn KvStore) -> Result { + Ok(Self { + operations: btree::Map::load_root(saved_history.operations, kv).await?, + next_operation_id: saved_history.next_operation_id, + max_operation_ids: btree::Map::load_all(saved_history.max_operation_ids, kv).await?, + deferred_operations: btree::Sequence::load_root(saved_history.deferred_operations, kv) + .await?, + }) + } + + pub async fn save(&self, kv: &dyn KvStore) -> Result { + Ok(SavedHistory { + operations: self.operations.save(kv).await?, + next_operation_id: self.next_operation_id, + max_operation_ids: self.max_operation_ids.save(kv).await?, + deferred_operations: self.deferred_operations.save(kv).await?, + }) + } + + pub fn next_operation_id(&mut self) -> OperationId { self.next_operation_id.tick() } - async fn insert(&mut self, operation: Operation, kv: &dyn KvStore) -> Result<()> { + pub fn max_operation_ids(&self) -> &btree::Map { + &self.max_operation_ids + } + + pub async fn insert( + &mut self, + operation: Operation, + kv: &dyn KvStore, + ) -> Result> { + let op_id = operation.id(); + self.next_operation_id.observe(op_id); + if self + .max_operation_ids + .load(&op_id.replica_id, kv) + .await? + .copied() + < Some(op_id.operation_count) + { + self.max_operation_ids + .insert(op_id.replica_id, op_id.operation_count); + } + self.operations.store(op_id, operation, kv).await?; + + self.deferred_operations + .load(kv, &(), |probe| { + let key_range = ( + Bound::Excluded(*probe.start), + Bound::Included(*probe.summary), + ); + key_range.contains(&op_id) + }) + .await?; + let mut cursor = self.deferred_operations.cursor::(); + let mut remaining = cursor.slice(&op_id, Bias::Left, &()); + let mut flushed = SmallVec::new(); + flushed.extend( + cursor + .slice(&op_id, Bias::Right, &()) + .iter() + .map(|deferred| deferred.operation.clone()), + ); + remaining.append(cursor.suffix(&()), &()); + drop(cursor); + self.deferred_operations = remaining; + Ok(flushed) + } + + pub fn insert_local(&mut self, operation: Operation) { + let id = operation.id(); self.next_operation_id.observe(operation.id()); - self.operations.store(operation.id(), operation, kv).await?; + self.max_operation_ids + .insert(id.replica_id, id.operation_count); + self.operations.insert(id, operation); + } + + pub async fn defer(&mut self, operation: Operation, kv: &dyn KvStore) -> Result<()> { + for parent in operation.parent().iter() { + self.deferred_operations + .load(kv, &(), |probe| { + let key_range = ( + Bound::Excluded(*probe.start), + Bound::Included(*probe.summary), + ); + key_range.contains(&operation.id()) + }) + .await?; + self.deferred_operations.insert_or_replace( + DeferredOperation { + parent: *parent, + operation: operation.clone(), + }, + &(), + ); + } Ok(()) } - async fn operation(&mut self, id: OperationId, kv: &dyn KvStore) -> Result> { + pub async fn can_apply(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result { + for parent in operation.parent().iter() { + if self.operations.load(parent, kv).await?.is_none() { + return Ok(false); + } + } + Ok(true) + } + + pub async fn has_applied(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result { + Ok(self.operations.load(&operation.id(), kv).await?.is_some()) + } + + pub async fn operation( + &mut self, + id: OperationId, + kv: &dyn KvStore, + ) -> Result> { self.operations.load(&id, kv).await } - async fn traverse(&mut self, revision_id: &RevisionId, kv: &dyn KvStore) -> Result { + pub async fn operations_since( + &mut self, + version: &btree::Map, + kv: &dyn KvStore, + ) -> Result> { + let mut new_operations = Vec::new(); + for (replica_id, end_op_count) in self.max_operation_ids.iter() { + let start_op = OperationId { + replica_id: *replica_id, + operation_count: version + .get(&replica_id) + .map(|count| OperationCount(count.0 + 1)) + .unwrap_or_default(), + }; + let end_op = OperationId { + replica_id: *replica_id, + operation_count: *end_op_count, + }; + + new_operations.extend( + self.operations + .load_from(&start_op, kv) + .await? + .take_while(|(op_id, _)| **op_id <= end_op) + .map(|(_, op)| op.clone()), + ); + } + Ok(new_operations) + } + + pub async fn traverse( + &mut self, + revision_id: &RevisionId, + kv: &dyn KvStore, + ) -> Result { let mut frontier = VecDeque::new(); let mut traversed = BTreeSet::new(); for operation_id in revision_id.iter() { @@ -59,8 +224,8 @@ impl History { } } -struct Traversal<'a> { - history: &'a mut History, +pub struct Traversal<'a> { + pub history: &'a mut History, frontier: VecDeque, traversed: BTreeSet<(OperationCount, ReplicaId)>, ancestors: HashMap>, @@ -68,7 +233,7 @@ struct Traversal<'a> { } impl Traversal<'_> { - async fn next(&mut self, kv: &dyn KvStore) -> Result> { + pub async fn next(&mut self, kv: &dyn KvStore) -> Result> { while let Some(frontier) = self.frontier.pop_front() { let reachable_from = self.ancestors.entry(frontier.revision.clone()).or_default(); reachable_from.insert(frontier.source); @@ -110,7 +275,7 @@ impl Traversal<'_> { }); } - return Ok(Some(TraversalResult { + return Ok(Some(TraversalAncestor { revision: frontier.revision, operations, })); @@ -142,9 +307,53 @@ struct Frontier { } #[derive(Eq, PartialEq, Debug)] -struct TraversalResult { - revision: RevisionId, - operations: BTreeSet, +pub struct TraversalAncestor { + pub revision: RevisionId, + pub operations: BTreeSet, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct DeferredOperation { + parent: OperationId, + operation: Operation, +} + +impl PartialEq for DeferredOperation { + fn eq(&self, other: &Self) -> bool { + self.parent == other.parent && self.operation.id() == other.operation.id() + } +} + +impl Eq for DeferredOperation {} + +impl PartialOrd for DeferredOperation { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DeferredOperation { + fn cmp(&self, other: &Self) -> Ordering { + self.parent + .cmp(&other.parent) + .then_with(|| self.operation.id().cmp(&other.operation.id())) + } +} + +impl btree::Item for DeferredOperation { + type Summary = OperationId; + + fn summary(&self) -> Self::Summary { + self.parent + } +} + +impl btree::KeyedItem for DeferredOperation { + type Key = (OperationId, OperationId); + + fn key(&self) -> Self::Key { + (self.parent, self.operation.id()) + } } #[cfg(test)] @@ -156,104 +365,104 @@ mod tests { async fn test_traversal() { let kv = InMemoryKv::default(); let mut history = History::new(ReplicaId(0)); - let op0 = insert_operation(&[], &mut history, &kv).await; - let op1 = insert_operation(&[op0.id()], &mut history, &kv).await; - let op2 = insert_operation(&[op0.id()], &mut history, &kv).await; - let op3 = insert_operation(&[op1.id(), op2.id()], &mut history, &kv).await; - let op4 = insert_operation(&[op3.id()], &mut history, &kv).await; - let op5 = insert_operation(&[op3.id()], &mut history, &kv).await; - let op6 = insert_operation(&[op3.id(), op2.id()], &mut history, &kv).await; + let op1 = insert_operation(&[], &mut history, &kv).await; + let op2 = insert_operation(&[op1.id()], &mut history, &kv).await; + let op3 = insert_operation(&[op1.id()], &mut history, &kv).await; + let op4 = insert_operation(&[op2.id(), op3.id()], &mut history, &kv).await; + let op5 = insert_operation(&[op4.id()], &mut history, &kv).await; + let op6 = insert_operation(&[op4.id()], &mut history, &kv).await; + let op7 = insert_operation(&[op4.id(), op3.id()], &mut history, &kv).await; assert_eq!( - traversal(&[op3.id()], &mut history, &kv).await, + traversal(&[op4.id()], &mut history, &kv).await, &[ - TraversalResult { - revision: RevisionId::from([op1.id(), op2.id()].as_slice()), - operations: BTreeSet::from_iter([op3.id()]), + TraversalAncestor { + revision: RevisionId::from([op2.id(), op3.id()].as_slice()), + operations: BTreeSet::from_iter([op4.id()]), }, - TraversalResult { - revision: RevisionId::from([op0.id()].as_slice()), - operations: BTreeSet::from_iter([op1.id(), op2.id()]), + TraversalAncestor { + revision: RevisionId::from([op1.id()].as_slice()), + operations: BTreeSet::from_iter([op2.id(), op3.id()]), }, - TraversalResult { + TraversalAncestor { revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op0.id()]), - } - ] - ); - assert_eq!( - traversal(&[op5.id()], &mut history, &kv).await, - &[ - TraversalResult { - revision: RevisionId::from([op3.id()].as_slice()), - operations: BTreeSet::from_iter([op5.id()]), - }, - TraversalResult { - revision: RevisionId::from([op1.id(), op2.id()].as_slice()), - operations: BTreeSet::from_iter([op3.id()]), - }, - TraversalResult { - revision: RevisionId::from([op0.id()].as_slice()), - operations: BTreeSet::from_iter([op1.id(), op2.id()]), - }, - TraversalResult { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op0.id()]), - } - ] - ); - assert_eq!( - traversal(&[op4.id(), op5.id()], &mut history, &kv).await, - &[ - TraversalResult { - revision: RevisionId::from([op3.id()].as_slice()), - operations: BTreeSet::from_iter([op4.id(), op5.id()]), - }, - TraversalResult { - revision: RevisionId::from([op1.id(), op2.id()].as_slice()), - operations: BTreeSet::from_iter([op3.id()]), - }, - TraversalResult { - revision: RevisionId::from([op0.id()].as_slice()), - operations: BTreeSet::from_iter([op1.id(), op2.id()]), - }, - TraversalResult { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op0.id()]), - } - ] - ); - assert_eq!( - traversal(&[op3.id(), op4.id()], &mut history, &kv).await, - &[ - TraversalResult { - revision: RevisionId::from([op1.id(), op2.id()].as_slice()), - operations: BTreeSet::from_iter([op3.id(), op4.id()]), - }, - TraversalResult { - revision: RevisionId::from([op0.id()].as_slice()), - operations: BTreeSet::from_iter([op1.id(), op2.id()]), - }, - TraversalResult { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op0.id()]), + operations: BTreeSet::from_iter([op1.id()]), } ] ); assert_eq!( traversal(&[op6.id()], &mut history, &kv).await, &[ - TraversalResult { - revision: RevisionId::from([op3.id(), op2.id()].as_slice()), + TraversalAncestor { + revision: RevisionId::from([op4.id()].as_slice()), operations: BTreeSet::from_iter([op6.id()]), }, - TraversalResult { - revision: RevisionId::from([op0.id()].as_slice()), - operations: BTreeSet::from_iter([op1.id(), op2.id(), op3.id()]), + TraversalAncestor { + revision: RevisionId::from([op2.id(), op3.id()].as_slice()), + operations: BTreeSet::from_iter([op4.id()]), }, - TraversalResult { + TraversalAncestor { + revision: RevisionId::from([op1.id()].as_slice()), + operations: BTreeSet::from_iter([op2.id(), op3.id()]), + }, + TraversalAncestor { revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op0.id()]), + operations: BTreeSet::from_iter([op1.id()]), + } + ] + ); + assert_eq!( + traversal(&[op5.id(), op6.id()], &mut history, &kv).await, + &[ + TraversalAncestor { + revision: RevisionId::from([op4.id()].as_slice()), + operations: BTreeSet::from_iter([op5.id(), op6.id()]), + }, + TraversalAncestor { + revision: RevisionId::from([op2.id(), op3.id()].as_slice()), + operations: BTreeSet::from_iter([op4.id()]), + }, + TraversalAncestor { + revision: RevisionId::from([op1.id()].as_slice()), + operations: BTreeSet::from_iter([op2.id(), op3.id()]), + }, + TraversalAncestor { + revision: RevisionId::from([].as_slice()), + operations: BTreeSet::from_iter([op1.id()]), + } + ] + ); + assert_eq!( + traversal(&[op4.id(), op5.id()], &mut history, &kv).await, + &[ + TraversalAncestor { + revision: RevisionId::from([op2.id(), op3.id()].as_slice()), + operations: BTreeSet::from_iter([op4.id(), op5.id()]), + }, + TraversalAncestor { + revision: RevisionId::from([op1.id()].as_slice()), + operations: BTreeSet::from_iter([op2.id(), op3.id()]), + }, + TraversalAncestor { + revision: RevisionId::from([].as_slice()), + operations: BTreeSet::from_iter([op1.id()]), + } + ] + ); + assert_eq!( + traversal(&[op7.id()], &mut history, &kv).await, + &[ + TraversalAncestor { + revision: RevisionId::from([op4.id(), op3.id()].as_slice()), + operations: BTreeSet::from_iter([op7.id()]), + }, + TraversalAncestor { + revision: RevisionId::from([op1.id()].as_slice()), + operations: BTreeSet::from_iter([op2.id(), op3.id(), op4.id()]), + }, + TraversalAncestor { + revision: RevisionId::from([].as_slice()), + operations: BTreeSet::from_iter([op1.id()]), } ] ); @@ -277,7 +486,7 @@ mod tests { revision_id: &[OperationId], history: &mut History, kv: &dyn KvStore, - ) -> Vec { + ) -> Vec { let mut traversal = history.traverse(&revision_id.into(), kv).await.unwrap(); let mut results = Vec::new(); while let Some(result) = traversal.next(kv).await.unwrap() {