diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index 7698cc79f8..4d4ba67be3 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -14,7 +14,7 @@ use btree::{Bias, KvStore, SavedId}; use collections::{btree_map, BTreeMap, BTreeSet, Bound, HashMap, HashSet, VecDeque}; use dense_id::DenseId; use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt}; -use history::{History, SavedHistory}; +use history::{History, SavedHistory, TraversalPath}; use messages::{MessageEnvelope, Operation, RequestEnvelope}; use parking_lot::{Mutex, RwLock}; use rope::Rope; @@ -1855,12 +1855,16 @@ impl RepoSnapshot { } fn cached_revision(&self, revision_id: &RevisionId) -> Result { - Ok(self - .revisions - .lock() - .get(revision_id) - .ok_or_else(|| anyhow!("cached revision not found"))? - .clone()) + if revision_id.len() == 0 { + return Ok(Revision::default()); + } else { + Ok(self + .revisions + .lock() + .get(revision_id) + .ok_or_else(|| anyhow!("cached revision not found"))? + .clone()) + } } #[async_recursion] @@ -1869,7 +1873,43 @@ impl RepoSnapshot { revision_id: &RevisionId, kv: &dyn KvStore, ) -> Result { - todo!() + if let Some(revision) = self.cached_revision(revision_id).ok() { + Ok(revision) + } else { + struct StackEntry { + revision_id: RevisionId, + ancestor: TraversalPath, + } + let mut traversal = self.history.traverse(revision_id, kv).await?; + let mut stack = vec![StackEntry { + revision_id: revision_id.clone(), + ancestor: traversal.next(kv).await?.unwrap(), + }]; + + // while let Some(entry) = stack.last() { + // let revision = self.revisions.lock().get(&entry.revision_id).cloned(); + // if revision.is_some() { + // stack.pop(); + // } else { + // let ancestor_revision = + // self.revisions.lock().get(&entry.ancestor.ancestor).cloned(); + // if let Some(mut ancestor_revision) = ancestor_revision { + // let mut operations_stack = vec![]; + // for child in entry.ancestor.traversed[&entry.ancestor.ancestor] { + // operations_stack.push((child, ancestor_revision.clone())); + // } + + // while let Some((revision_id, parent_revision)) = operations_stack.pop() {} + // } else { + // stack.push(StackEntry { + // revision_id: entry.ancestor.ancestor.clone(), + // ancestor: traversal.next(&*kv).await?.unwrap(), + // }); + // } + // } + // } + Ok(self.cached_revision(revision_id).unwrap()) + } // // First, check if we have a revision cached for this revision id. // // If not, we'll need to reconstruct it from a previous revision. // // We need to find a cached revision that is an ancestor of the given revision id. diff --git a/crates/crdb/src/history.rs b/crates/crdb/src/history.rs index 5689e2a6a0..3ca20f015a 100644 --- a/crates/crdb/src/history.rs +++ b/crates/crdb/src/history.rs @@ -1,4 +1,4 @@ -use std::{cmp::Ordering, ops::RangeBounds}; +use std::{cmp::Ordering, iter, mem, ops::RangeBounds}; use crate::{ btree::{self, Bias, KvStore, SavedId}, @@ -200,17 +200,21 @@ impl History { kv: &dyn KvStore, ) -> Result { let mut frontier = VecDeque::new(); - let mut traversed = BTreeSet::new(); + let mut traversed = HashMap::default(); for operation_id in revision_id.iter() { - traversed.insert((operation_id.operation_count, operation_id.replica_id)); + let parent_revision = self + .operation(*operation_id, kv) + .await? + .ok_or_else(|| anyhow!("operation {:?} not found", operation_id))? + .parent() + .clone(); + traversed + .entry(parent_revision.clone()) + .or_insert(HashSet::default()) + .insert((revision_id.clone(), *operation_id)); frontier.push_back(Frontier { source: *operation_id, - revision: self - .operation(*operation_id, kv) - .await? - .ok_or_else(|| anyhow!("operation {:?} not found", operation_id))? - .parent() - .clone(), + revision: parent_revision, }); } @@ -224,74 +228,69 @@ impl History { } } +struct Frontier { + source: OperationId, + revision: RevisionId, +} + pub struct Traversal<'a> { pub history: &'a mut History, frontier: VecDeque, - traversed: BTreeSet<(OperationCount, ReplicaId)>, + traversed: HashMap>, ancestors: HashMap>, reachable_len: usize, } impl Traversal<'_> { - pub async fn next(&mut self, kv: &dyn KvStore) -> Result> { + pub async fn next(&mut self, kv: &dyn KvStore) -> Result> { while let Some(frontier) = self.frontier.pop_front() { let reachable_from = self.ancestors.entry(frontier.revision.clone()).or_default(); reachable_from.insert(frontier.source); - if reachable_from.len() == self.reachable_len { - let missing_operations_start = if let Some(max_op) = frontier - .revision - .iter() - .max_by_key(|op_id| op_id.operation_count) - { - OperationCount(max_op.operation_count.0 + 1) - } else { - OperationCount(0) - }; - let operations = self - .traversed - .range(&(missing_operations_start, ReplicaId::default())..) - .map(|(operation_count, replica_id)| OperationId { - replica_id: *replica_id, - operation_count: *operation_count, - }) - .collect(); + if reachable_from.len() == self.reachable_len { self.reachable_len = frontier.revision.len(); self.frontier.clear(); self.ancestors.clear(); - self.traversed.clear(); + let traversed = mem::take(&mut self.traversed); for operation_id in frontier.revision.iter() { + let parent_revision = self + .history + .operation(*operation_id, kv) + .await? + .expect("operation must exist") + .parent() + .clone(); self.traversed - .insert((operation_id.operation_count, operation_id.replica_id)); + .entry(parent_revision.clone()) + .or_default() + .insert((frontier.revision.clone(), *operation_id)); self.frontier.push_back(Frontier { source: *operation_id, - revision: self - .history - .operation(*operation_id, kv) - .await? - .expect("operation must exist") - .parent() - .clone(), + revision: parent_revision, }); } - return Ok(Some(TraversalAncestor { - revision: frontier.revision, - operations, + return Ok(Some(TraversalPath { + start: frontier.revision, + traversed, })); } else { for operation_id in frontier.revision.iter() { + let parent_revision = self + .history + .operation(*operation_id, kv) + .await? + .expect("operation must exist") + .parent() + .clone(); self.traversed - .insert((operation_id.operation_count, operation_id.replica_id)); + .entry(parent_revision.clone()) + .or_default() + .insert((frontier.revision.clone(), *operation_id)); + self.frontier.push_back(Frontier { source: frontier.source, - revision: self - .history - .operation(*operation_id, kv) - .await? - .expect("operation must exist") - .parent() - .clone(), + revision: parent_revision, }); } } @@ -301,15 +300,50 @@ impl Traversal<'_> { } } -struct Frontier { - source: OperationId, - revision: RevisionId, +#[derive(Eq, PartialEq, Debug)] +pub struct TraversalPath { + pub start: RevisionId, + pub traversed: HashMap>, } -#[derive(Eq, PartialEq, Debug)] -pub struct TraversalAncestor { - pub revision: RevisionId, - pub operations: BTreeSet, +impl TraversalPath { + fn replay(mut self) -> impl Iterator { + let mut stack = VecDeque::new(); + if let Some(children) = self.traversed.remove(&self.start) { + for (child_revision_id, operation_id) in children { + stack.push_back(TraversalPathOperation { + parent_revision_id: self.start.clone(), + target_revision_id: child_revision_id, + operation_id, + }); + } + } + + iter::from_fn(move || { + while let Some(entry) = stack.pop_front() { + if let Some(children) = self.traversed.remove(&entry.target_revision_id) { + for (child_revision, operation_id) in children { + stack.push_back(TraversalPathOperation { + parent_revision_id: entry.target_revision_id.clone(), + target_revision_id: child_revision, + operation_id, + }); + } + } + + return Some(entry); + } + + None + }) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct TraversalPathOperation { + parent_revision_id: RevisionId, + target_revision_id: RevisionId, + operation_id: OperationId, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -371,99 +405,257 @@ mod tests { let op4 = insert_operation(&[op2.id(), op3.id()], &mut history, &kv).await; let op5 = insert_operation(&[op4.id()], &mut history, &kv).await; let op6 = insert_operation(&[op4.id()], &mut history, &kv).await; - let op7 = insert_operation(&[op4.id(), op3.id()], &mut history, &kv).await; + let op7 = insert_operation(&[op2.id()], &mut history, &kv).await; + let op8 = insert_operation(&[op5.id()], &mut history, &kv).await; + let op9 = insert_operation(&[op5.id()], &mut history, &kv).await; + let op10 = insert_operation(&[op8.id()], &mut history, &kv).await; + let op11 = insert_operation(&[op9.id(), op10.id()], &mut history, &kv).await; assert_eq!( traversal(&[op4.id()], &mut history, &kv).await, &[ - TraversalAncestor { - revision: RevisionId::from([op2.id(), op3.id()].as_slice()), - operations: BTreeSet::from_iter([op4.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([op1.id()].as_slice()), - operations: BTreeSet::from_iter([op2.id(), op3.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op1.id()]), - } + ( + RevisionId::from([op2.id(), op3.id()].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + target_revision_id: RevisionId::from([op4.id()].as_slice()), + operation_id: op4.id(), + }] + ), + ( + RevisionId::from([op1.id()].as_slice()), + vec![ + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op2.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op3.id(), + } + ] + ), + ( + RevisionId::from([].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([].as_slice()), + target_revision_id: RevisionId::from([op1.id()].as_slice()), + operation_id: op1.id(), + }] + ), ] ); assert_eq!( traversal(&[op6.id()], &mut history, &kv).await, &[ - TraversalAncestor { - revision: RevisionId::from([op4.id()].as_slice()), - operations: BTreeSet::from_iter([op6.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([op2.id(), op3.id()].as_slice()), - operations: BTreeSet::from_iter([op4.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([op1.id()].as_slice()), - operations: BTreeSet::from_iter([op2.id(), op3.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op1.id()]), - } + ( + RevisionId::from([op4.id()].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([op4.id()].as_slice()), + target_revision_id: RevisionId::from([op6.id()].as_slice()), + operation_id: op6.id(), + }] + ), + ( + RevisionId::from([op2.id(), op3.id()].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + target_revision_id: RevisionId::from([op4.id()].as_slice()), + operation_id: op4.id(), + }] + ), + ( + RevisionId::from([op1.id()].as_slice()), + vec![ + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op2.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op3.id(), + } + ] + ), + ( + RevisionId::from([].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([].as_slice()), + target_revision_id: RevisionId::from([op1.id()].as_slice()), + operation_id: op1.id(), + }] + ), ] ); assert_eq!( traversal(&[op5.id(), op6.id()], &mut history, &kv).await, &[ - TraversalAncestor { - revision: RevisionId::from([op4.id()].as_slice()), - operations: BTreeSet::from_iter([op5.id(), op6.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([op2.id(), op3.id()].as_slice()), - operations: BTreeSet::from_iter([op4.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([op1.id()].as_slice()), - operations: BTreeSet::from_iter([op2.id(), op3.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op1.id()]), - } + ( + RevisionId::from([op4.id()].as_slice()), + vec![ + TraversalPathOperation { + parent_revision_id: RevisionId::from([op4.id()].as_slice()), + target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()), + operation_id: op6.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op4.id()].as_slice()), + target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()), + operation_id: op5.id(), + } + ] + ), + ( + RevisionId::from([op2.id(), op3.id()].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + target_revision_id: RevisionId::from([op4.id()].as_slice()), + operation_id: op4.id(), + }] + ), + ( + RevisionId::from([op1.id()].as_slice()), + vec![ + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op2.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op3.id(), + } + ] + ), + ( + RevisionId::from([].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([].as_slice()), + target_revision_id: RevisionId::from([op1.id()].as_slice()), + operation_id: op1.id(), + }] + ), ] ); assert_eq!( - traversal(&[op4.id(), op5.id()], &mut history, &kv).await, + traversal(&[op4.id(), op7.id()], &mut history, &kv).await, &[ - TraversalAncestor { - revision: RevisionId::from([op2.id(), op3.id()].as_slice()), - operations: BTreeSet::from_iter([op4.id(), op5.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([op1.id()].as_slice()), - operations: BTreeSet::from_iter([op2.id(), op3.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op1.id()]), - } + ( + RevisionId::from([op1.id()].as_slice()), + vec![ + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op2.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id()].as_slice()), + operation_id: op2.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op3.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()), + operation_id: op4.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op2.id()].as_slice()), + target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()), + operation_id: op7.id(), + } + ] + ), + ( + RevisionId::from([].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([].as_slice()), + target_revision_id: RevisionId::from([op1.id()].as_slice()), + operation_id: op1.id(), + }] + ), ] ); assert_eq!( - traversal(&[op7.id()], &mut history, &kv).await, + traversal(&[op11.id()], &mut history, &kv).await, &[ - TraversalAncestor { - revision: RevisionId::from([op4.id(), op3.id()].as_slice()), - operations: BTreeSet::from_iter([op7.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([op1.id()].as_slice()), - operations: BTreeSet::from_iter([op2.id(), op3.id(), op4.id()]), - }, - TraversalAncestor { - revision: RevisionId::from([].as_slice()), - operations: BTreeSet::from_iter([op1.id()]), - } + ( + RevisionId::from([op9.id(), op10.id()].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()), + target_revision_id: RevisionId::from([op11.id()].as_slice()), + operation_id: op11.id(), + }] + ), + ( + RevisionId::from([op5.id()].as_slice()), + vec![ + TraversalPathOperation { + parent_revision_id: RevisionId::from([op5.id()].as_slice()), + target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()), + operation_id: op9.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op5.id()].as_slice()), + target_revision_id: RevisionId::from([op8.id()].as_slice()), + operation_id: op8.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op8.id()].as_slice()), + target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()), + operation_id: op10.id(), + } + ] + ), + ( + RevisionId::from([op4.id()].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([op4.id()].as_slice()), + target_revision_id: RevisionId::from([op5.id()].as_slice()), + operation_id: op5.id(), + }] + ), + ( + RevisionId::from([op2.id(), op3.id()].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + target_revision_id: RevisionId::from([op4.id()].as_slice()), + operation_id: op4.id(), + }] + ), + ( + RevisionId::from([op1.id()].as_slice()), + vec![ + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op2.id(), + }, + TraversalPathOperation { + parent_revision_id: RevisionId::from([op1.id()].as_slice()), + target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()), + operation_id: op3.id(), + } + ] + ), + ( + RevisionId::from([].as_slice()), + vec![TraversalPathOperation { + parent_revision_id: RevisionId::from([].as_slice()), + target_revision_id: RevisionId::from([op1.id()].as_slice()), + operation_id: op1.id(), + }] + ), + // ] ); } @@ -486,11 +678,11 @@ mod tests { revision_id: &[OperationId], history: &mut History, kv: &dyn KvStore, - ) -> Vec { + ) -> Vec<(RevisionId, Vec)> { let mut traversal = history.traverse(&revision_id.into(), kv).await.unwrap(); let mut results = Vec::new(); while let Some(result) = traversal.next(kv).await.unwrap() { - results.push(result); + results.push((result.start.clone(), result.replay().collect())); } results }