This commit is contained in:
Antonio Scandurra 2023-07-30 13:45:31 +02:00
parent 93701f9c5d
commit 05ec6b89c2
3 changed files with 496 additions and 391 deletions

View file

@ -60,6 +60,16 @@ where
Ok(Self(Sequence::load_root(id, kv).await?))
}
pub async fn load_all(id: SavedId, kv: &dyn KvStore) -> Result<Self>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
let mut sequence = Sequence::load_root(id, kv).await?;
sequence.load(kv, &(), |_| true).await?;
Ok(Self(sequence))
}
pub async fn load(&mut self, key: &K, kv: &dyn KvStore) -> Result<Option<&V>>
where
K: Serialize + for<'de> Deserialize<'de>,
@ -77,6 +87,21 @@ where
Ok(self.get(key))
}
pub async fn load_from(
&mut self,
start: &K,
kv: &dyn KvStore,
) -> Result<impl Iterator<Item = (&K, &V)>>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| probe.start.0 >= *start)
.await?;
Ok(self.iter_from(start))
}
pub async fn store(&mut self, key: K, value: V, kv: &dyn KvStore) -> Result<()>
where
K: Serialize + for<'de> Deserialize<'de>,
@ -164,33 +189,7 @@ where
cursor.item().map(|item| (&item.key, &item.value))
}
pub fn range<'a, R>(&self, range: R) -> impl Iterator<Item = (&K, &V)>
where
K: 'a,
R: RangeBounds<&'a K>,
{
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
match range.start_bound() {
Bound::Included(start) => {
let start = MapKeyRef(Some(*start));
cursor.seek(&start, Bias::Left, &());
}
Bound::Excluded(start) => {
let start = MapKeyRef(Some(*start));
cursor.seek(&start, Bias::Right, &());
}
Bound::Unbounded => cursor.next(&()),
}
cursor
.map(|entry| (&entry.key, &entry.value))
.take_while(move |(key, _)| match range.end_bound() {
Bound::Included(end) => key <= end,
Bound::Excluded(end) => key < end,
Bound::Unbounded => true,
})
}
pub fn iter_from<'a>(&'a self, from: &'a K) -> impl Iterator<Item = (&K, &V)> + '_ {
pub fn iter_from<'a>(&self, from: &'a K) -> impl Iterator<Item = (&K, &V)> {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let from_key = MapKeyRef(Some(from));
cursor.seek(&from_key, Bias::Left, &());

View file

@ -14,6 +14,7 @@ use btree::{Bias, KvStore, SavedId};
use collections::{btree_map, BTreeMap, BTreeSet, Bound, HashMap, HashSet, VecDeque};
use dense_id::DenseId;
use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt};
use history::{History, SavedHistory};
use messages::{MessageEnvelope, Operation, RequestEnvelope};
use parking_lot::{Mutex, RwLock};
use rope::Rope;
@ -138,7 +139,7 @@ impl OperationId {
pub fn new(replica_id: ReplicaId) -> Self {
Self {
replica_id,
operation_count: OperationCount::default(),
operation_count: OperationCount(1),
}
}
@ -381,25 +382,28 @@ impl<E: Executor, N: ClientNetwork> Checkout<E, N> {
let response = client
.request(messages::SyncRepo {
id: self.repo.id,
max_operation_ids: self.repo.read(|repo| (&repo.max_operation_ids).into()),
})
.await?;
self.repo
.update_async(|repo| {
let kv = client.db.kv.clone();
let operations = response.operations.clone();
async move {
repo.apply_operations(operations, &*kv).await?;
Ok((None, ()))
}
.boxed()
max_operation_ids: self
.repo
.read(|repo| repo.history.max_operation_ids().into()),
})
.await?;
let operations = self
.repo
.read(|snapshot| snapshot.operations_since(&(&response.max_operation_ids).into()));
.update_async(|repo| {
let kv = client.db.kv.clone();
let response = response.clone();
async move {
repo.apply_operations(response.operations, &*kv).await?;
let operations = repo
.history
.operations_since(&(&response.max_operation_ids).into(), &*kv)
.await?;
Ok((None, operations))
}
.boxed()
})
.await?;
for chunk in operations.chunks(CHUNK_SIZE) {
client
@ -737,12 +741,25 @@ impl<N: ServerNetwork> Server<N> {
.repo(request.id)
.ok_or_else(|| anyhow!("repo not found"))?;
repo.read(|snapshot| {
Ok(messages::SyncRepoResponse {
operations: snapshot.operations_since(&(&request.max_operation_ids).into()),
max_operation_ids: (&snapshot.max_operation_ids).into(),
})
repo.update_async(|snapshot| {
let request = request.clone();
let kv = self.db.kv.clone();
async move {
let operations = snapshot
.history
.operations_since(&(&request.max_operation_ids).into(), &*kv)
.await?;
Ok((
None,
messages::SyncRepoResponse {
operations,
max_operation_ids: snapshot.history.max_operation_ids().into(),
},
))
}
.boxed()
})
.await
}
async fn handle_publish_operations(
@ -899,7 +916,7 @@ impl Repo {
.update(&self.id, |repo| {
let (operation, result) = f(repo);
if let Some(operation) = operation {
repo.save_local_operation(operation.clone());
repo.history.insert_local(operation.clone());
if let Some(local_operation_created) = self.db.local_operation_created.as_ref()
{
local_operation_created(self.id, operation);
@ -1007,7 +1024,7 @@ impl Branch {
let mut revision = repo
.cached_revision(&head)
.expect("head revision must exist");
let operation_id = repo.last_operation_id.tick();
let operation_id = repo.history.next_operation_id();
let (operation, result) = f(operation_id, head.clone(), &mut revision);
repo.branches
.update(&self.id, |branch| branch.head = operation_id.into());
@ -1686,47 +1703,36 @@ impl<'a> RopeBuilder<'a> {
#[derive(Clone, Debug)]
pub struct RepoSnapshot {
id: RepoId,
last_operation_id: OperationId,
history: History,
branches: btree::Map<OperationId, BranchSnapshot>,
// TODO: Change String to Arc<str> for branch_ids_by_name
branch_ids_by_name: btree::Map<String, OperationId>,
operations: btree::Map<OperationId, Operation>,
revisions: Arc<Mutex<HashMap<RevisionId, Revision>>>,
max_operation_ids: btree::Map<ReplicaId, OperationCount>,
deferred_operations: btree::Sequence<DeferredOperation>,
}
#[derive(Serialize, Deserialize)]
struct SavedRepoSnapshot {
last_operation_id: OperationId,
history: SavedHistory,
branches: btree::SavedId,
branch_ids_by_name: btree::SavedId,
operations: btree::SavedId,
max_operation_ids: btree::SavedId,
deferred_operations: btree::SavedId,
}
impl RepoSnapshot {
fn new(id: RepoId, replica_id: ReplicaId) -> Self {
Self {
id,
last_operation_id: OperationId::new(replica_id),
history: History::new(replica_id),
branches: Default::default(),
branch_ids_by_name: Default::default(),
operations: Default::default(),
revisions: Default::default(),
max_operation_ids: Default::default(),
deferred_operations: Default::default(),
}
}
fn ptr_eq(this: &Self, other: &Self) -> bool {
btree::Map::ptr_eq(&this.branches, &other.branches)
this.id == other.id
&& btree::Map::ptr_eq(&this.branches, &other.branches)
&& btree::Map::ptr_eq(&this.branch_ids_by_name, &other.branch_ids_by_name)
&& btree::Map::ptr_eq(&this.operations, &other.operations)
&& btree::Map::ptr_eq(&this.max_operation_ids, &other.max_operation_ids)
&& this.last_operation_id == other.last_operation_id
&& History::ptr_eq(&this.history, &other.history)
}
async fn load(id: RepoId, kv: &dyn KvStore) -> Result<Self> {
@ -1734,25 +1740,18 @@ impl RepoSnapshot {
let saved_repo = serde_bare::from_slice::<SavedRepoSnapshot>(&repo_bytes)?;
Ok(Self {
id,
last_operation_id: saved_repo.last_operation_id,
history: History::load(saved_repo.history, kv).await?,
branches: btree::Map::load_root(saved_repo.branches, kv).await?,
branch_ids_by_name: btree::Map::load_root(saved_repo.branch_ids_by_name, kv).await?,
operations: btree::Map::load_root(saved_repo.operations, kv).await?,
revisions: Default::default(),
max_operation_ids: btree::Map::load_root(saved_repo.max_operation_ids, kv).await?,
deferred_operations: btree::Sequence::load_root(saved_repo.deferred_operations, kv)
.await?,
})
}
async fn save(&self, id: RepoId, kv: &dyn KvStore) -> Result<()> {
let saved_repo = SavedRepoSnapshot {
last_operation_id: self.last_operation_id,
history: self.history.save(kv).await?,
branches: self.branches.save(kv).await?,
branch_ids_by_name: self.branch_ids_by_name.save(kv).await?,
operations: self.operations.save(kv).await?,
max_operation_ids: self.max_operation_ids.save(kv).await?,
deferred_operations: self.deferred_operations.save(kv).await?,
};
let repo_bytes = serde_bare::to_vec(&saved_repo)?;
kv.store(id.to_be_bytes(), "root".into(), repo_bytes)
@ -1762,7 +1761,7 @@ impl RepoSnapshot {
fn create_empty_branch(&mut self, name: impl Into<Arc<str>>) -> (Operation, OperationId) {
let name = name.into();
let branch_id = self.last_operation_id.tick();
let branch_id = self.history.next_operation_id();
self.branches.insert(
branch_id,
BranchSnapshot {
@ -1781,42 +1780,6 @@ impl RepoSnapshot {
)
}
fn operations_since(&self, version: &btree::Map<ReplicaId, OperationCount>) -> Vec<Operation> {
let mut new_operations = Vec::new();
for (replica_id, end_op_count) in self.max_operation_ids.iter() {
let end_op = OperationId {
replica_id: *replica_id,
operation_count: *end_op_count,
};
if let Some(start_op_count) = version.get(&replica_id) {
let start_op = OperationId {
replica_id: *replica_id,
operation_count: *start_op_count,
};
new_operations.extend(
self.operations
.range((Bound::Excluded(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
} else {
let start_op = OperationId::new(*replica_id);
new_operations.extend(
self.operations
.range((Bound::Included(&start_op), Bound::Included(&end_op)))
.map(|(_, op)| op.clone()),
);
}
}
new_operations
}
fn save_local_operation(&mut self, operation: Operation) {
let replica_id = operation.id().replica_id;
let count = operation.id().operation_count;
self.max_operation_ids.insert(replica_id, count);
self.operations.insert(operation.id(), operation);
}
/// Apply the given operations and any deferred operations that are now applicable.
async fn apply_operations(
&mut self,
@ -1836,15 +1799,11 @@ impl RepoSnapshot {
operation: Operation,
kv: &dyn KvStore,
) -> Result<SmallVec<[Operation; 1]>> {
if self.operations.contains_key(&operation.id()) {
if self.history.has_applied(&operation, kv).await? {
return Ok(Default::default());
}
if operation
.parent()
.iter()
.all(|parent| self.operations.contains_key(&parent))
{
if self.history.can_apply(&operation, kv).await? {
let operation_id = operation.id();
let mut new_head;
match &operation {
@ -1864,7 +1823,7 @@ impl RepoSnapshot {
| Operation::Edit(operations::Edit {
branch_id, parent, ..
}) => {
if let Some(branch) = self.branches.get(branch_id).cloned() {
if let Some(branch) = self.branches.load(branch_id, kv).await?.cloned() {
new_head = branch.head;
new_head.observe(operation_id, &parent);
self.branches
@ -1880,8 +1839,7 @@ impl RepoSnapshot {
}
};
self.save_remote_operation(operation, kv).await?;
let flushed_operations = self.flush_deferred_operations(operation_id, kv).await?;
let flushed_operations = self.history.insert(operation, kv).await?;
// The following ensures that a revision for the branch head is always present.
#[cfg(not(any(test, feature = "test-support")))]
@ -1891,73 +1849,11 @@ impl RepoSnapshot {
Ok(flushed_operations)
} else {
for parent in operation.parent().iter() {
self.deferred_operations.insert_or_replace(
DeferredOperation {
parent: *parent,
operation: operation.clone(),
},
&(),
);
}
self.history.defer(operation, kv).await?;
Ok(Default::default())
}
}
/// Remove any operations deferred on the given parent and add them to the
/// provided operation queue. This is called in `apply_operations`.
async fn flush_deferred_operations(
&mut self,
parent_id: OperationId,
kv: &dyn KvStore,
) -> Result<SmallVec<[Operation; 1]>> {
self.deferred_operations
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(*probe.start),
Bound::Included(*probe.summary),
);
key_range.contains(&parent_id)
})
.await?;
let mut cursor = self.deferred_operations.cursor::<OperationId>();
let mut remaining = cursor.slice(&parent_id, Bias::Left, &());
let mut flushed = SmallVec::new();
flushed.extend(
cursor
.slice(&parent_id, Bias::Right, &())
.iter()
.map(|deferred| deferred.operation.clone()),
);
remaining.append(cursor.suffix(&()), &());
drop(cursor);
self.deferred_operations = remaining;
Ok(flushed)
}
async fn save_remote_operation(
&mut self,
operation: Operation,
kv: &dyn KvStore,
) -> Result<()> {
let replica_id = operation.id().replica_id;
let count = operation.id().operation_count;
if self.max_operation_ids.load(&replica_id, kv).await?.copied() < Some(count) {
self.max_operation_ids.insert(replica_id, count);
}
self.last_operation_id.observe(operation.id());
self.operations.store(operation.id(), operation, kv).await?;
Ok(())
}
async fn operation(
&mut self,
operation_id: OperationId,
kv: &dyn KvStore,
) -> Result<Option<&Operation>> {
self.operations.load(&operation_id, kv).await
}
fn cached_revision(&self, revision_id: &RevisionId) -> Result<Revision> {
Ok(self
.revisions
@ -1973,123 +1869,124 @@ impl RepoSnapshot {
revision_id: &RevisionId,
kv: &dyn KvStore,
) -> Result<Revision> {
// First, check if we have a revision cached for this revision id.
// If not, we'll need to reconstruct it from a previous revision.
// We need to find a cached revision that is an ancestor of the given revision id.
// Once we find it, we must apply all ancestors of the given revision id that are not contained in the cached revision.
let mut revision = self.revisions.lock().get(revision_id).cloned();
if revision.is_none() {
revision = Revision::load(self.id, revision_id, kv).await.ok();
if let Some(revision) = revision.as_ref() {
self.revisions
.lock()
.entry(revision_id.clone())
.or_insert_with(|| revision.clone());
}
}
todo!()
// // First, check if we have a revision cached for this revision id.
// // If not, we'll need to reconstruct it from a previous revision.
// // We need to find a cached revision that is an ancestor of the given revision id.
// // Once we find it, we must apply all ancestors of the given revision id that are not contained in the cached revision.
// let mut revision = self.revisions.lock().get(revision_id).cloned();
// if revision.is_none() {
// revision = Revision::load(self.id, revision_id, kv).await.ok();
// if let Some(revision) = revision.as_ref() {
// self.revisions
// .lock()
// .entry(revision_id.clone())
// .or_insert_with(|| revision.clone());
// }
// }
if let Some(revision) = revision {
Ok(revision)
} else {
struct Search {
start: OperationId,
ancestor: RevisionId,
}
// if let Some(revision) = revision {
// Ok(revision)
// } else {
// struct Search {
// start: OperationId,
// ancestor: RevisionId,
// }
let mut ancestors = HashMap::<RevisionId, HashSet<OperationId>>::default();
let mut searches = VecDeque::new();
let mut operations = BTreeSet::new();
for operation_id in revision_id.iter() {
operations.insert((operation_id.operation_count, operation_id.replica_id));
searches.push_back(Search {
start: *operation_id,
ancestor: self
.operation(*operation_id, kv)
.await?
.ok_or_else(|| anyhow!("operation {:?} not found", operation_id))?
.parent()
.clone(),
});
}
// let mut ancestors = HashMap::<RevisionId, HashSet<OperationId>>::default();
// let mut searches = VecDeque::new();
// let mut operations = BTreeSet::new();
// for operation_id in revision_id.iter() {
// operations.insert((operation_id.operation_count, operation_id.replica_id));
// searches.push_back(Search {
// start: *operation_id,
// ancestor: self
// .operation(*operation_id, kv)
// .await?
// .ok_or_else(|| anyhow!("operation {:?} not found", operation_id))?
// .parent()
// .clone(),
// });
// }
let mut common_ancestor_revision = Revision::default();
let mut missing_operations_start = (OperationCount::default(), ReplicaId::default());
while let Some(search) = searches.pop_front() {
let reachable_from = ancestors.entry(search.ancestor.clone()).or_default();
reachable_from.insert(search.start);
// let mut common_ancestor_revision = Revision::default();
// let mut missing_operations_start = (OperationCount::default(), ReplicaId::default());
// while let Some(search) = searches.pop_front() {
// let reachable_from = ancestors.entry(search.ancestor.clone()).or_default();
// reachable_from.insert(search.start);
// If the current revision is reachable from every operation in the original
// revision id, it's a common ancestor.
if reachable_from.len() == revision_id.len() {
// We've found a common ancestor, now we load a revision for it. For it to
// be a common ancestor means that all its downstream operations must
// have causally happened after it. Therefore, we should be able to
// use the maximum lamport timestamp in the common ancestor's revision
// and select only those operations we've found in the backwards search
// which have a higher lamport timestamp.
let revision = self.load_revision(&search.ancestor, kv).await?;
common_ancestor_revision = revision.clone();
if let Some(max_operation_count) = search
.ancestor
.iter()
.map(|operation_id| operation_id.operation_count)
.max()
{
missing_operations_start = (
OperationCount(max_operation_count.0 + 1),
ReplicaId::default(),
);
}
// // If the current revision is reachable from every operation in the original
// // revision id, it's a common ancestor.
// if reachable_from.len() == revision_id.len() {
// // We've found a common ancestor, now we load a revision for it. For it to
// // be a common ancestor means that all its downstream operations must
// // have causally happened after it. Therefore, we should be able to
// // use the maximum lamport timestamp in the common ancestor's revision
// // and select only those operations we've found in the backwards search
// // which have a higher lamport timestamp.
// let revision = self.load_revision(&search.ancestor, kv).await?;
// common_ancestor_revision = revision.clone();
// if let Some(max_operation_count) = search
// .ancestor
// .iter()
// .map(|operation_id| operation_id.operation_count)
// .max()
// {
// missing_operations_start = (
// OperationCount(max_operation_count.0 + 1),
// ReplicaId::default(),
// );
// }
break;
}
// break;
// }
for operation_id in search.ancestor.iter() {
operations.insert((operation_id.operation_count, operation_id.replica_id));
searches.push_back(Search {
start: search.start,
ancestor: self
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone(),
});
}
}
// for operation_id in search.ancestor.iter() {
// operations.insert((operation_id.operation_count, operation_id.replica_id));
// searches.push_back(Search {
// start: search.start,
// ancestor: self
// .operation(*operation_id, kv)
// .await?
// .expect("operation must exist")
// .parent()
// .clone(),
// });
// }
// }
// Apply all the missing operations to the found revision.
for (operation_count, replica_id) in operations.range(missing_operations_start..) {
let missing_operation_id = OperationId {
replica_id: *replica_id,
operation_count: *operation_count,
};
match self
.operation(missing_operation_id, kv)
.await?
.expect("operation must exist")
.clone()
{
Operation::CreateDocument(op) => {
op.apply(&mut common_ancestor_revision);
}
Operation::Edit(op) => {
let parent_revision = self.load_revision(&op.parent, kv).await?;
op.apply(&parent_revision, &mut common_ancestor_revision)?;
}
Operation::CreateBranch(_) => {
// Creating a branch doesn't have an impact on the revision, so we
// can ignore it.
}
}
}
// // Apply all the missing operations to the found revision.
// for (operation_count, replica_id) in operations.range(missing_operations_start..) {
// let missing_operation_id = OperationId {
// replica_id: *replica_id,
// operation_count: *operation_count,
// };
// match self
// .operation(missing_operation_id, kv)
// .await?
// .expect("operation must exist")
// .clone()
// {
// Operation::CreateDocument(op) => {
// op.apply(&mut common_ancestor_revision);
// }
// Operation::Edit(op) => {
// let parent_revision = self.load_revision(&op.parent, kv).await?;
// op.apply(&parent_revision, &mut common_ancestor_revision)?;
// }
// Operation::CreateBranch(_) => {
// // Creating a branch doesn't have an impact on the revision, so we
// // can ignore it.
// }
// }
// }
self.revisions
.lock()
.entry(revision_id.clone())
.or_insert_with(|| common_ancestor_revision.clone());
Ok(common_ancestor_revision)
}
// self.revisions
// .lock()
// .entry(revision_id.clone())
// .or_insert_with(|| common_ancestor_revision.clone());
// Ok(common_ancestor_revision)
// }
}
}

View file

@ -1,39 +1,204 @@
use std::{cmp::Ordering, ops::RangeBounds};
use crate::{
btree::{self, KvStore},
btree::{self, Bias, KvStore, SavedId},
messages::Operation,
OperationCount, OperationId, ReplicaId, RevisionId,
};
use anyhow::{anyhow, Result};
use collections::{BTreeSet, HashMap, HashSet, VecDeque};
use collections::{BTreeSet, Bound, HashMap, HashSet, VecDeque};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
struct History {
#[derive(Serialize, Deserialize)]
pub struct SavedHistory {
operations: SavedId,
next_operation_id: OperationId,
max_operation_ids: SavedId,
deferred_operations: SavedId,
}
#[derive(Clone, Debug)]
pub struct History {
operations: btree::Map<OperationId, Operation>,
next_operation_id: OperationId,
max_operation_ids: btree::Map<ReplicaId, OperationCount>,
deferred_operations: btree::Sequence<DeferredOperation>,
}
impl History {
fn new(replica_id: ReplicaId) -> Self {
pub fn new(replica_id: ReplicaId) -> Self {
Self {
operations: Default::default(),
next_operation_id: OperationId::new(replica_id),
max_operation_ids: Default::default(),
deferred_operations: Default::default(),
}
}
fn next_operation_id(&mut self) -> OperationId {
pub fn ptr_eq(&self, other: &Self) -> bool {
btree::Map::ptr_eq(&self.operations, &other.operations)
&& btree::Map::ptr_eq(&self.max_operation_ids, &other.max_operation_ids)
&& btree::Sequence::ptr_eq(&self.deferred_operations, &other.deferred_operations)
&& self.next_operation_id == other.next_operation_id
}
pub async fn load(saved_history: SavedHistory, kv: &dyn KvStore) -> Result<Self> {
Ok(Self {
operations: btree::Map::load_root(saved_history.operations, kv).await?,
next_operation_id: saved_history.next_operation_id,
max_operation_ids: btree::Map::load_all(saved_history.max_operation_ids, kv).await?,
deferred_operations: btree::Sequence::load_root(saved_history.deferred_operations, kv)
.await?,
})
}
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedHistory> {
Ok(SavedHistory {
operations: self.operations.save(kv).await?,
next_operation_id: self.next_operation_id,
max_operation_ids: self.max_operation_ids.save(kv).await?,
deferred_operations: self.deferred_operations.save(kv).await?,
})
}
pub fn next_operation_id(&mut self) -> OperationId {
self.next_operation_id.tick()
}
async fn insert(&mut self, operation: Operation, kv: &dyn KvStore) -> Result<()> {
pub fn max_operation_ids(&self) -> &btree::Map<ReplicaId, OperationCount> {
&self.max_operation_ids
}
pub async fn insert(
&mut self,
operation: Operation,
kv: &dyn KvStore,
) -> Result<SmallVec<[Operation; 1]>> {
let op_id = operation.id();
self.next_operation_id.observe(op_id);
if self
.max_operation_ids
.load(&op_id.replica_id, kv)
.await?
.copied()
< Some(op_id.operation_count)
{
self.max_operation_ids
.insert(op_id.replica_id, op_id.operation_count);
}
self.operations.store(op_id, operation, kv).await?;
self.deferred_operations
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(*probe.start),
Bound::Included(*probe.summary),
);
key_range.contains(&op_id)
})
.await?;
let mut cursor = self.deferred_operations.cursor::<OperationId>();
let mut remaining = cursor.slice(&op_id, Bias::Left, &());
let mut flushed = SmallVec::new();
flushed.extend(
cursor
.slice(&op_id, Bias::Right, &())
.iter()
.map(|deferred| deferred.operation.clone()),
);
remaining.append(cursor.suffix(&()), &());
drop(cursor);
self.deferred_operations = remaining;
Ok(flushed)
}
pub fn insert_local(&mut self, operation: Operation) {
let id = operation.id();
self.next_operation_id.observe(operation.id());
self.operations.store(operation.id(), operation, kv).await?;
self.max_operation_ids
.insert(id.replica_id, id.operation_count);
self.operations.insert(id, operation);
}
pub async fn defer(&mut self, operation: Operation, kv: &dyn KvStore) -> Result<()> {
for parent in operation.parent().iter() {
self.deferred_operations
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(*probe.start),
Bound::Included(*probe.summary),
);
key_range.contains(&operation.id())
})
.await?;
self.deferred_operations.insert_or_replace(
DeferredOperation {
parent: *parent,
operation: operation.clone(),
},
&(),
);
}
Ok(())
}
async fn operation(&mut self, id: OperationId, kv: &dyn KvStore) -> Result<Option<&Operation>> {
pub async fn can_apply(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result<bool> {
for parent in operation.parent().iter() {
if self.operations.load(parent, kv).await?.is_none() {
return Ok(false);
}
}
Ok(true)
}
pub async fn has_applied(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result<bool> {
Ok(self.operations.load(&operation.id(), kv).await?.is_some())
}
pub async fn operation(
&mut self,
id: OperationId,
kv: &dyn KvStore,
) -> Result<Option<&Operation>> {
self.operations.load(&id, kv).await
}
async fn traverse(&mut self, revision_id: &RevisionId, kv: &dyn KvStore) -> Result<Traversal> {
pub async fn operations_since(
&mut self,
version: &btree::Map<ReplicaId, OperationCount>,
kv: &dyn KvStore,
) -> Result<Vec<Operation>> {
let mut new_operations = Vec::new();
for (replica_id, end_op_count) in self.max_operation_ids.iter() {
let start_op = OperationId {
replica_id: *replica_id,
operation_count: version
.get(&replica_id)
.map(|count| OperationCount(count.0 + 1))
.unwrap_or_default(),
};
let end_op = OperationId {
replica_id: *replica_id,
operation_count: *end_op_count,
};
new_operations.extend(
self.operations
.load_from(&start_op, kv)
.await?
.take_while(|(op_id, _)| **op_id <= end_op)
.map(|(_, op)| op.clone()),
);
}
Ok(new_operations)
}
pub async fn traverse(
&mut self,
revision_id: &RevisionId,
kv: &dyn KvStore,
) -> Result<Traversal> {
let mut frontier = VecDeque::new();
let mut traversed = BTreeSet::new();
for operation_id in revision_id.iter() {
@ -59,8 +224,8 @@ impl History {
}
}
struct Traversal<'a> {
history: &'a mut History,
pub struct Traversal<'a> {
pub history: &'a mut History,
frontier: VecDeque<Frontier>,
traversed: BTreeSet<(OperationCount, ReplicaId)>,
ancestors: HashMap<RevisionId, HashSet<OperationId>>,
@ -68,7 +233,7 @@ struct Traversal<'a> {
}
impl Traversal<'_> {
async fn next(&mut self, kv: &dyn KvStore) -> Result<Option<TraversalResult>> {
pub async fn next(&mut self, kv: &dyn KvStore) -> Result<Option<TraversalAncestor>> {
while let Some(frontier) = self.frontier.pop_front() {
let reachable_from = self.ancestors.entry(frontier.revision.clone()).or_default();
reachable_from.insert(frontier.source);
@ -110,7 +275,7 @@ impl Traversal<'_> {
});
}
return Ok(Some(TraversalResult {
return Ok(Some(TraversalAncestor {
revision: frontier.revision,
operations,
}));
@ -142,9 +307,53 @@ struct Frontier {
}
#[derive(Eq, PartialEq, Debug)]
struct TraversalResult {
revision: RevisionId,
operations: BTreeSet<OperationId>,
pub struct TraversalAncestor {
pub revision: RevisionId,
pub operations: BTreeSet<OperationId>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeferredOperation {
parent: OperationId,
operation: Operation,
}
impl PartialEq for DeferredOperation {
fn eq(&self, other: &Self) -> bool {
self.parent == other.parent && self.operation.id() == other.operation.id()
}
}
impl Eq for DeferredOperation {}
impl PartialOrd for DeferredOperation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DeferredOperation {
fn cmp(&self, other: &Self) -> Ordering {
self.parent
.cmp(&other.parent)
.then_with(|| self.operation.id().cmp(&other.operation.id()))
}
}
impl btree::Item for DeferredOperation {
type Summary = OperationId;
fn summary(&self) -> Self::Summary {
self.parent
}
}
impl btree::KeyedItem for DeferredOperation {
type Key = (OperationId, OperationId);
fn key(&self) -> Self::Key {
(self.parent, self.operation.id())
}
}
#[cfg(test)]
@ -156,104 +365,104 @@ mod tests {
async fn test_traversal() {
let kv = InMemoryKv::default();
let mut history = History::new(ReplicaId(0));
let op0 = insert_operation(&[], &mut history, &kv).await;
let op1 = insert_operation(&[op0.id()], &mut history, &kv).await;
let op2 = insert_operation(&[op0.id()], &mut history, &kv).await;
let op3 = insert_operation(&[op1.id(), op2.id()], &mut history, &kv).await;
let op4 = insert_operation(&[op3.id()], &mut history, &kv).await;
let op5 = insert_operation(&[op3.id()], &mut history, &kv).await;
let op6 = insert_operation(&[op3.id(), op2.id()], &mut history, &kv).await;
let op1 = insert_operation(&[], &mut history, &kv).await;
let op2 = insert_operation(&[op1.id()], &mut history, &kv).await;
let op3 = insert_operation(&[op1.id()], &mut history, &kv).await;
let op4 = insert_operation(&[op2.id(), op3.id()], &mut history, &kv).await;
let op5 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op6 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op7 = insert_operation(&[op4.id(), op3.id()], &mut history, &kv).await;
assert_eq!(
traversal(&[op3.id()], &mut history, &kv).await,
traversal(&[op4.id()], &mut history, &kv).await,
&[
TraversalResult {
revision: RevisionId::from([op1.id(), op2.id()].as_slice()),
operations: BTreeSet::from_iter([op3.id()]),
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id()]),
},
TraversalResult {
revision: RevisionId::from([op0.id()].as_slice()),
operations: BTreeSet::from_iter([op1.id(), op2.id()]),
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalResult {
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op0.id()]),
}
]
);
assert_eq!(
traversal(&[op5.id()], &mut history, &kv).await,
&[
TraversalResult {
revision: RevisionId::from([op3.id()].as_slice()),
operations: BTreeSet::from_iter([op5.id()]),
},
TraversalResult {
revision: RevisionId::from([op1.id(), op2.id()].as_slice()),
operations: BTreeSet::from_iter([op3.id()]),
},
TraversalResult {
revision: RevisionId::from([op0.id()].as_slice()),
operations: BTreeSet::from_iter([op1.id(), op2.id()]),
},
TraversalResult {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op0.id()]),
}
]
);
assert_eq!(
traversal(&[op4.id(), op5.id()], &mut history, &kv).await,
&[
TraversalResult {
revision: RevisionId::from([op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id(), op5.id()]),
},
TraversalResult {
revision: RevisionId::from([op1.id(), op2.id()].as_slice()),
operations: BTreeSet::from_iter([op3.id()]),
},
TraversalResult {
revision: RevisionId::from([op0.id()].as_slice()),
operations: BTreeSet::from_iter([op1.id(), op2.id()]),
},
TraversalResult {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op0.id()]),
}
]
);
assert_eq!(
traversal(&[op3.id(), op4.id()], &mut history, &kv).await,
&[
TraversalResult {
revision: RevisionId::from([op1.id(), op2.id()].as_slice()),
operations: BTreeSet::from_iter([op3.id(), op4.id()]),
},
TraversalResult {
revision: RevisionId::from([op0.id()].as_slice()),
operations: BTreeSet::from_iter([op1.id(), op2.id()]),
},
TraversalResult {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op0.id()]),
operations: BTreeSet::from_iter([op1.id()]),
}
]
);
assert_eq!(
traversal(&[op6.id()], &mut history, &kv).await,
&[
TraversalResult {
revision: RevisionId::from([op3.id(), op2.id()].as_slice()),
TraversalAncestor {
revision: RevisionId::from([op4.id()].as_slice()),
operations: BTreeSet::from_iter([op6.id()]),
},
TraversalResult {
revision: RevisionId::from([op0.id()].as_slice()),
operations: BTreeSet::from_iter([op1.id(), op2.id(), op3.id()]),
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id()]),
},
TraversalResult {
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op0.id()]),
operations: BTreeSet::from_iter([op1.id()]),
}
]
);
assert_eq!(
traversal(&[op5.id(), op6.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op4.id()].as_slice()),
operations: BTreeSet::from_iter([op5.id(), op6.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
]
);
assert_eq!(
traversal(&[op4.id(), op5.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id(), op5.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
]
);
assert_eq!(
traversal(&[op7.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op4.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op7.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id(), op4.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
]
);
@ -277,7 +486,7 @@ mod tests {
revision_id: &[OperationId],
history: &mut History,
kv: &dyn KvStore,
) -> Vec<TraversalResult> {
) -> Vec<TraversalAncestor> {
let mut traversal = history.traverse(&revision_id.into(), kv).await.unwrap();
let mut results = Vec::new();
while let Some(result) = traversal.next(kv).await.unwrap() {