Redesign history traversal

This commit is contained in:
Antonio Scandurra 2023-07-31 11:39:41 +02:00
parent 05ec6b89c2
commit ce17cd83cf
2 changed files with 369 additions and 137 deletions

View file

@ -14,7 +14,7 @@ use btree::{Bias, KvStore, SavedId};
use collections::{btree_map, BTreeMap, BTreeSet, Bound, HashMap, HashSet, VecDeque};
use dense_id::DenseId;
use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt};
use history::{History, SavedHistory};
use history::{History, SavedHistory, TraversalPath};
use messages::{MessageEnvelope, Operation, RequestEnvelope};
use parking_lot::{Mutex, RwLock};
use rope::Rope;
@ -1855,12 +1855,16 @@ impl RepoSnapshot {
}
fn cached_revision(&self, revision_id: &RevisionId) -> Result<Revision> {
Ok(self
.revisions
.lock()
.get(revision_id)
.ok_or_else(|| anyhow!("cached revision not found"))?
.clone())
if revision_id.len() == 0 {
return Ok(Revision::default());
} else {
Ok(self
.revisions
.lock()
.get(revision_id)
.ok_or_else(|| anyhow!("cached revision not found"))?
.clone())
}
}
#[async_recursion]
@ -1869,7 +1873,43 @@ impl RepoSnapshot {
revision_id: &RevisionId,
kv: &dyn KvStore,
) -> Result<Revision> {
todo!()
if let Some(revision) = self.cached_revision(revision_id).ok() {
Ok(revision)
} else {
struct StackEntry {
revision_id: RevisionId,
ancestor: TraversalPath,
}
let mut traversal = self.history.traverse(revision_id, kv).await?;
let mut stack = vec![StackEntry {
revision_id: revision_id.clone(),
ancestor: traversal.next(kv).await?.unwrap(),
}];
// while let Some(entry) = stack.last() {
// let revision = self.revisions.lock().get(&entry.revision_id).cloned();
// if revision.is_some() {
// stack.pop();
// } else {
// let ancestor_revision =
// self.revisions.lock().get(&entry.ancestor.ancestor).cloned();
// if let Some(mut ancestor_revision) = ancestor_revision {
// let mut operations_stack = vec![];
// for child in entry.ancestor.traversed[&entry.ancestor.ancestor] {
// operations_stack.push((child, ancestor_revision.clone()));
// }
// while let Some((revision_id, parent_revision)) = operations_stack.pop() {}
// } else {
// stack.push(StackEntry {
// revision_id: entry.ancestor.ancestor.clone(),
// ancestor: traversal.next(&*kv).await?.unwrap(),
// });
// }
// }
// }
Ok(self.cached_revision(revision_id).unwrap())
}
// // First, check if we have a revision cached for this revision id.
// // If not, we'll need to reconstruct it from a previous revision.
// // We need to find a cached revision that is an ancestor of the given revision id.

View file

@ -1,4 +1,4 @@
use std::{cmp::Ordering, ops::RangeBounds};
use std::{cmp::Ordering, iter, mem, ops::RangeBounds};
use crate::{
btree::{self, Bias, KvStore, SavedId},
@ -200,17 +200,21 @@ impl History {
kv: &dyn KvStore,
) -> Result<Traversal> {
let mut frontier = VecDeque::new();
let mut traversed = BTreeSet::new();
let mut traversed = HashMap::default();
for operation_id in revision_id.iter() {
traversed.insert((operation_id.operation_count, operation_id.replica_id));
let parent_revision = self
.operation(*operation_id, kv)
.await?
.ok_or_else(|| anyhow!("operation {:?} not found", operation_id))?
.parent()
.clone();
traversed
.entry(parent_revision.clone())
.or_insert(HashSet::default())
.insert((revision_id.clone(), *operation_id));
frontier.push_back(Frontier {
source: *operation_id,
revision: self
.operation(*operation_id, kv)
.await?
.ok_or_else(|| anyhow!("operation {:?} not found", operation_id))?
.parent()
.clone(),
revision: parent_revision,
});
}
@ -224,74 +228,69 @@ impl History {
}
}
struct Frontier {
source: OperationId,
revision: RevisionId,
}
pub struct Traversal<'a> {
pub history: &'a mut History,
frontier: VecDeque<Frontier>,
traversed: BTreeSet<(OperationCount, ReplicaId)>,
traversed: HashMap<RevisionId, HashSet<(RevisionId, OperationId)>>,
ancestors: HashMap<RevisionId, HashSet<OperationId>>,
reachable_len: usize,
}
impl Traversal<'_> {
pub async fn next(&mut self, kv: &dyn KvStore) -> Result<Option<TraversalAncestor>> {
pub async fn next(&mut self, kv: &dyn KvStore) -> Result<Option<TraversalPath>> {
while let Some(frontier) = self.frontier.pop_front() {
let reachable_from = self.ancestors.entry(frontier.revision.clone()).or_default();
reachable_from.insert(frontier.source);
if reachable_from.len() == self.reachable_len {
let missing_operations_start = if let Some(max_op) = frontier
.revision
.iter()
.max_by_key(|op_id| op_id.operation_count)
{
OperationCount(max_op.operation_count.0 + 1)
} else {
OperationCount(0)
};
let operations = self
.traversed
.range(&(missing_operations_start, ReplicaId::default())..)
.map(|(operation_count, replica_id)| OperationId {
replica_id: *replica_id,
operation_count: *operation_count,
})
.collect();
if reachable_from.len() == self.reachable_len {
self.reachable_len = frontier.revision.len();
self.frontier.clear();
self.ancestors.clear();
self.traversed.clear();
let traversed = mem::take(&mut self.traversed);
for operation_id in frontier.revision.iter() {
let parent_revision = self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone();
self.traversed
.insert((operation_id.operation_count, operation_id.replica_id));
.entry(parent_revision.clone())
.or_default()
.insert((frontier.revision.clone(), *operation_id));
self.frontier.push_back(Frontier {
source: *operation_id,
revision: self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone(),
revision: parent_revision,
});
}
return Ok(Some(TraversalAncestor {
revision: frontier.revision,
operations,
return Ok(Some(TraversalPath {
start: frontier.revision,
traversed,
}));
} else {
for operation_id in frontier.revision.iter() {
let parent_revision = self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone();
self.traversed
.insert((operation_id.operation_count, operation_id.replica_id));
.entry(parent_revision.clone())
.or_default()
.insert((frontier.revision.clone(), *operation_id));
self.frontier.push_back(Frontier {
source: frontier.source,
revision: self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone(),
revision: parent_revision,
});
}
}
@ -301,15 +300,50 @@ impl Traversal<'_> {
}
}
struct Frontier {
source: OperationId,
revision: RevisionId,
#[derive(Eq, PartialEq, Debug)]
pub struct TraversalPath {
pub start: RevisionId,
pub traversed: HashMap<RevisionId, HashSet<(RevisionId, OperationId)>>,
}
#[derive(Eq, PartialEq, Debug)]
pub struct TraversalAncestor {
pub revision: RevisionId,
pub operations: BTreeSet<OperationId>,
impl TraversalPath {
fn replay(mut self) -> impl Iterator<Item = TraversalPathOperation> {
let mut stack = VecDeque::new();
if let Some(children) = self.traversed.remove(&self.start) {
for (child_revision_id, operation_id) in children {
stack.push_back(TraversalPathOperation {
parent_revision_id: self.start.clone(),
target_revision_id: child_revision_id,
operation_id,
});
}
}
iter::from_fn(move || {
while let Some(entry) = stack.pop_front() {
if let Some(children) = self.traversed.remove(&entry.target_revision_id) {
for (child_revision, operation_id) in children {
stack.push_back(TraversalPathOperation {
parent_revision_id: entry.target_revision_id.clone(),
target_revision_id: child_revision,
operation_id,
});
}
}
return Some(entry);
}
None
})
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct TraversalPathOperation {
parent_revision_id: RevisionId,
target_revision_id: RevisionId,
operation_id: OperationId,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -371,99 +405,257 @@ mod tests {
let op4 = insert_operation(&[op2.id(), op3.id()], &mut history, &kv).await;
let op5 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op6 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op7 = insert_operation(&[op4.id(), op3.id()], &mut history, &kv).await;
let op7 = insert_operation(&[op2.id()], &mut history, &kv).await;
let op8 = insert_operation(&[op5.id()], &mut history, &kv).await;
let op9 = insert_operation(&[op5.id()], &mut history, &kv).await;
let op10 = insert_operation(&[op8.id()], &mut history, &kv).await;
let op11 = insert_operation(&[op9.id(), op10.id()], &mut history, &kv).await;
assert_eq!(
traversal(&[op4.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
traversal(&[op6.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op4.id()].as_slice()),
operations: BTreeSet::from_iter([op6.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
(
RevisionId::from([op4.id()].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op6.id()].as_slice()),
operation_id: op6.id(),
}]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
traversal(&[op5.id(), op6.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op4.id()].as_slice()),
operations: BTreeSet::from_iter([op5.id(), op6.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
(
RevisionId::from([op4.id()].as_slice()),
vec![
TraversalPathOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()),
operation_id: op6.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()),
operation_id: op5.id(),
}
]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
traversal(&[op4.id(), op5.id()], &mut history, &kv).await,
traversal(&[op4.id(), op7.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op2.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op4.id(), op5.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
(
RevisionId::from([op1.id()].as_slice()),
vec![
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id()].as_slice()),
operation_id: op2.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()),
operation_id: op4.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op2.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()),
operation_id: op7.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
traversal(&[op7.id()], &mut history, &kv).await,
traversal(&[op11.id()], &mut history, &kv).await,
&[
TraversalAncestor {
revision: RevisionId::from([op4.id(), op3.id()].as_slice()),
operations: BTreeSet::from_iter([op7.id()]),
},
TraversalAncestor {
revision: RevisionId::from([op1.id()].as_slice()),
operations: BTreeSet::from_iter([op2.id(), op3.id(), op4.id()]),
},
TraversalAncestor {
revision: RevisionId::from([].as_slice()),
operations: BTreeSet::from_iter([op1.id()]),
}
(
RevisionId::from([op9.id(), op10.id()].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
target_revision_id: RevisionId::from([op11.id()].as_slice()),
operation_id: op11.id(),
}]
),
(
RevisionId::from([op5.id()].as_slice()),
vec![
TraversalPathOperation {
parent_revision_id: RevisionId::from([op5.id()].as_slice()),
target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
operation_id: op9.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op5.id()].as_slice()),
target_revision_id: RevisionId::from([op8.id()].as_slice()),
operation_id: op8.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op8.id()].as_slice()),
target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
operation_id: op10.id(),
}
]
),
(
RevisionId::from([op4.id()].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id()].as_slice()),
operation_id: op5.id(),
}]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
TraversalPathOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![TraversalPathOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
//
]
);
}
@ -486,11 +678,11 @@ mod tests {
revision_id: &[OperationId],
history: &mut History,
kv: &dyn KvStore,
) -> Vec<TraversalAncestor> {
) -> Vec<(RevisionId, Vec<TraversalPathOperation>)> {
let mut traversal = history.traverse(&revision_id.into(), kv).await.unwrap();
let mut results = Vec::new();
while let Some(result) = traversal.next(kv).await.unwrap() {
results.push(result);
results.push((result.start.clone(), result.replay().collect()));
}
results
}