Remove local timestamps from CRDT operations

Use lamport timestamps for everything.
This commit is contained in:
Max Brunsfeld 2023-08-31 15:52:16 -07:00
parent 00aae5abee
commit 03f0365d4d
10 changed files with 186 additions and 314 deletions

View file

@ -46,18 +46,16 @@ lazy_static! {
static ref LINE_SEPARATORS_REGEX: Regex = Regex::new("\r\n|\r|\u{2028}|\u{2029}").unwrap();
}
pub type TransactionId = clock::Local;
pub type TransactionId = clock::Lamport;
pub struct Buffer {
snapshot: BufferSnapshot,
history: History,
deferred_ops: OperationQueue<Operation>,
deferred_replicas: HashSet<ReplicaId>,
replica_id: ReplicaId,
local_clock: clock::Local,
pub lamport_clock: clock::Lamport,
subscriptions: Topic,
edit_id_resolvers: HashMap<clock::Local, Vec<oneshot::Sender<()>>>,
edit_id_resolvers: HashMap<clock::Lamport, Vec<oneshot::Sender<()>>>,
wait_for_version_txs: Vec<(clock::Global, oneshot::Sender<()>)>,
}
@ -85,7 +83,7 @@ pub struct HistoryEntry {
#[derive(Clone, Debug)]
pub struct Transaction {
pub id: TransactionId,
pub edit_ids: Vec<clock::Local>,
pub edit_ids: Vec<clock::Lamport>,
pub start: clock::Global,
}
@ -97,8 +95,8 @@ impl HistoryEntry {
struct History {
base_text: Rope,
operations: TreeMap<clock::Local, Operation>,
insertion_slices: HashMap<clock::Local, Vec<InsertionSlice>>,
operations: TreeMap<clock::Lamport, Operation>,
insertion_slices: HashMap<clock::Lamport, Vec<InsertionSlice>>,
undo_stack: Vec<HistoryEntry>,
redo_stack: Vec<HistoryEntry>,
transaction_depth: usize,
@ -107,7 +105,7 @@ struct History {
#[derive(Clone, Debug)]
struct InsertionSlice {
insertion_id: clock::Local,
insertion_id: clock::Lamport,
range: Range<usize>,
}
@ -129,18 +127,18 @@ impl History {
}
fn push(&mut self, op: Operation) {
self.operations.insert(op.local_timestamp(), op);
self.operations.insert(op.timestamp(), op);
}
fn start_transaction(
&mut self,
start: clock::Global,
now: Instant,
local_clock: &mut clock::Local,
clock: &mut clock::Lamport,
) -> Option<TransactionId> {
self.transaction_depth += 1;
if self.transaction_depth == 1 {
let id = local_clock.tick();
let id = clock.tick();
self.undo_stack.push(HistoryEntry {
transaction: Transaction {
id,
@ -251,7 +249,7 @@ impl History {
self.redo_stack.clear();
}
fn push_undo(&mut self, op_id: clock::Local) {
fn push_undo(&mut self, op_id: clock::Lamport) {
assert_ne!(self.transaction_depth, 0);
if let Some(Operation::Edit(_)) = self.operations.get(&op_id) {
let last_transaction = self.undo_stack.last_mut().unwrap();
@ -412,37 +410,14 @@ impl<D1, D2> Edit<(D1, D2)> {
}
}
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, PartialOrd, Ord)]
pub struct InsertionTimestamp {
pub replica_id: ReplicaId,
pub local: clock::Seq,
pub lamport: clock::Seq,
}
impl InsertionTimestamp {
pub fn local(&self) -> clock::Local {
clock::Local {
replica_id: self.replica_id,
value: self.local,
}
}
pub fn lamport(&self) -> clock::Lamport {
clock::Lamport {
replica_id: self.replica_id,
value: self.lamport,
}
}
}
#[derive(Eq, PartialEq, Clone, Debug)]
pub struct Fragment {
pub id: Locator,
pub insertion_timestamp: InsertionTimestamp,
pub timestamp: clock::Lamport,
pub insertion_offset: usize,
pub len: usize,
pub visible: bool,
pub deletions: HashSet<clock::Local>,
pub deletions: HashSet<clock::Lamport>,
pub max_undos: clock::Global,
}
@ -470,29 +445,26 @@ impl<'a> sum_tree::Dimension<'a, FragmentSummary> for FragmentTextSummary {
#[derive(Eq, PartialEq, Clone, Debug)]
struct InsertionFragment {
timestamp: clock::Local,
timestamp: clock::Lamport,
split_offset: usize,
fragment_id: Locator,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
struct InsertionFragmentKey {
timestamp: clock::Local,
timestamp: clock::Lamport,
split_offset: usize,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Operation {
Edit(EditOperation),
Undo {
undo: UndoOperation,
lamport_timestamp: clock::Lamport,
},
Undo(UndoOperation),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct EditOperation {
pub timestamp: InsertionTimestamp,
pub timestamp: clock::Lamport,
pub version: clock::Global,
pub ranges: Vec<Range<FullOffset>>,
pub new_text: Vec<Arc<str>>,
@ -500,9 +472,9 @@ pub struct EditOperation {
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UndoOperation {
pub id: clock::Local,
pub counts: HashMap<clock::Local, u32>,
pub timestamp: clock::Lamport,
pub version: clock::Global,
pub counts: HashMap<clock::Lamport, u32>,
}
impl Buffer {
@ -514,24 +486,21 @@ impl Buffer {
let mut fragments = SumTree::new();
let mut insertions = SumTree::new();
let mut local_clock = clock::Local::new(replica_id);
let mut lamport_clock = clock::Lamport::new(replica_id);
let mut version = clock::Global::new();
let visible_text = history.base_text.clone();
if !visible_text.is_empty() {
let insertion_timestamp = InsertionTimestamp {
let insertion_timestamp = clock::Lamport {
replica_id: 0,
local: 1,
lamport: 1,
value: 1,
};
local_clock.observe(insertion_timestamp.local());
lamport_clock.observe(insertion_timestamp.lamport());
version.observe(insertion_timestamp.local());
lamport_clock.observe(insertion_timestamp);
version.observe(insertion_timestamp);
let fragment_id = Locator::between(&Locator::min(), &Locator::max());
let fragment = Fragment {
id: fragment_id,
insertion_timestamp,
timestamp: insertion_timestamp,
insertion_offset: 0,
len: visible_text.len(),
visible: true,
@ -557,8 +526,6 @@ impl Buffer {
history,
deferred_ops: OperationQueue::new(),
deferred_replicas: HashSet::default(),
replica_id,
local_clock,
lamport_clock,
subscriptions: Default::default(),
edit_id_resolvers: Default::default(),
@ -575,7 +542,7 @@ impl Buffer {
}
pub fn replica_id(&self) -> ReplicaId {
self.local_clock.replica_id
self.lamport_clock.replica_id
}
pub fn remote_id(&self) -> u64 {
@ -602,16 +569,12 @@ impl Buffer {
.map(|(range, new_text)| (range, new_text.into()));
self.start_transaction();
let timestamp = InsertionTimestamp {
replica_id: self.replica_id,
local: self.local_clock.tick().value,
lamport: self.lamport_clock.tick().value,
};
let timestamp = self.lamport_clock.tick();
let operation = Operation::Edit(self.apply_local_edit(edits, timestamp));
self.history.push(operation.clone());
self.history.push_undo(operation.local_timestamp());
self.snapshot.version.observe(operation.local_timestamp());
self.history.push_undo(operation.timestamp());
self.snapshot.version.observe(operation.timestamp());
self.end_transaction();
operation
}
@ -619,7 +582,7 @@ impl Buffer {
fn apply_local_edit<S: ToOffset, T: Into<Arc<str>>>(
&mut self,
edits: impl ExactSizeIterator<Item = (Range<S>, T)>,
timestamp: InsertionTimestamp,
timestamp: clock::Lamport,
) -> EditOperation {
let mut edits_patch = Patch::default();
let mut edit_op = EditOperation {
@ -696,7 +659,7 @@ impl Buffer {
.item()
.map_or(&Locator::max(), |old_fragment| &old_fragment.id),
),
insertion_timestamp: timestamp,
timestamp,
insertion_offset,
len: new_text.len(),
deletions: Default::default(),
@ -726,7 +689,7 @@ impl Buffer {
intersection.insertion_offset += fragment_start - old_fragments.start().visible;
intersection.id =
Locator::between(&new_fragments.summary().max_id, &intersection.id);
intersection.deletions.insert(timestamp.local());
intersection.deletions.insert(timestamp);
intersection.visible = false;
}
if intersection.len > 0 {
@ -781,7 +744,7 @@ impl Buffer {
self.subscriptions.publish_mut(&edits_patch);
self.history
.insertion_slices
.insert(timestamp.local(), insertion_slices);
.insert(timestamp, insertion_slices);
edit_op
}
@ -808,28 +771,23 @@ impl Buffer {
fn apply_op(&mut self, op: Operation) -> Result<()> {
match op {
Operation::Edit(edit) => {
if !self.version.observed(edit.timestamp.local()) {
if !self.version.observed(edit.timestamp) {
self.apply_remote_edit(
&edit.version,
&edit.ranges,
&edit.new_text,
edit.timestamp,
);
self.snapshot.version.observe(edit.timestamp.local());
self.local_clock.observe(edit.timestamp.local());
self.lamport_clock.observe(edit.timestamp.lamport());
self.resolve_edit(edit.timestamp.local());
self.snapshot.version.observe(edit.timestamp);
self.lamport_clock.observe(edit.timestamp);
self.resolve_edit(edit.timestamp);
}
}
Operation::Undo {
undo,
lamport_timestamp,
} => {
if !self.version.observed(undo.id) {
Operation::Undo(undo) => {
if !self.version.observed(undo.timestamp) {
self.apply_undo(&undo)?;
self.snapshot.version.observe(undo.id);
self.local_clock.observe(undo.id);
self.lamport_clock.observe(lamport_timestamp);
self.snapshot.version.observe(undo.timestamp);
self.lamport_clock.observe(undo.timestamp);
}
}
}
@ -849,7 +807,7 @@ impl Buffer {
version: &clock::Global,
ranges: &[Range<FullOffset>],
new_text: &[Arc<str>],
timestamp: InsertionTimestamp,
timestamp: clock::Lamport,
) {
if ranges.is_empty() {
return;
@ -916,9 +874,7 @@ impl Buffer {
// Skip over insertions that are concurrent to this edit, but have a lower lamport
// timestamp.
while let Some(fragment) = old_fragments.item() {
if fragment_start == range.start
&& fragment.insertion_timestamp.lamport() > timestamp.lamport()
{
if fragment_start == range.start && fragment.timestamp > timestamp {
new_ropes.push_fragment(fragment, fragment.visible);
new_fragments.push(fragment.clone(), &None);
old_fragments.next(&cx);
@ -955,7 +911,7 @@ impl Buffer {
.item()
.map_or(&Locator::max(), |old_fragment| &old_fragment.id),
),
insertion_timestamp: timestamp,
timestamp,
insertion_offset,
len: new_text.len(),
deletions: Default::default(),
@ -986,7 +942,7 @@ impl Buffer {
fragment_start - old_fragments.start().0.full_offset();
intersection.id =
Locator::between(&new_fragments.summary().max_id, &intersection.id);
intersection.deletions.insert(timestamp.local());
intersection.deletions.insert(timestamp);
intersection.visible = false;
insertion_slices.push(intersection.insertion_slice());
}
@ -1038,13 +994,13 @@ impl Buffer {
self.snapshot.insertions.edit(new_insertions, &());
self.history
.insertion_slices
.insert(timestamp.local(), insertion_slices);
.insert(timestamp, insertion_slices);
self.subscriptions.publish_mut(&edits_patch)
}
fn fragment_ids_for_edits<'a>(
&'a self,
edit_ids: impl Iterator<Item = &'a clock::Local>,
edit_ids: impl Iterator<Item = &'a clock::Lamport>,
) -> Vec<&'a Locator> {
// Get all of the insertion slices changed by the given edits.
let mut insertion_slices = Vec::new();
@ -1105,7 +1061,7 @@ impl Buffer {
let fragment_was_visible = fragment.visible;
fragment.visible = fragment.is_visible(&self.undo_map);
fragment.max_undos.observe(undo.id);
fragment.max_undos.observe(undo.timestamp);
let old_start = old_fragments.start().1;
let new_start = new_fragments.summary().text.visible;
@ -1159,10 +1115,10 @@ impl Buffer {
if self.deferred_replicas.contains(&op.replica_id()) {
false
} else {
match op {
Operation::Edit(edit) => self.version.observed_all(&edit.version),
Operation::Undo { undo, .. } => self.version.observed_all(&undo.version),
}
self.version.observed_all(match op {
Operation::Edit(edit) => &edit.version,
Operation::Undo(undo) => &undo.version,
})
}
}
@ -1180,7 +1136,7 @@ impl Buffer {
pub fn start_transaction_at(&mut self, now: Instant) -> Option<TransactionId> {
self.history
.start_transaction(self.version.clone(), now, &mut self.local_clock)
.start_transaction(self.version.clone(), now, &mut self.lamport_clock)
}
pub fn end_transaction(&mut self) -> Option<(TransactionId, clock::Global)> {
@ -1209,7 +1165,7 @@ impl Buffer {
&self.history.base_text
}
pub fn operations(&self) -> &TreeMap<clock::Local, Operation> {
pub fn operations(&self) -> &TreeMap<clock::Lamport, Operation> {
&self.history.operations
}
@ -1289,16 +1245,13 @@ impl Buffer {
}
let undo = UndoOperation {
id: self.local_clock.tick(),
timestamp: self.lamport_clock.tick(),
version: self.version(),
counts,
};
self.apply_undo(&undo)?;
let operation = Operation::Undo {
undo,
lamport_timestamp: self.lamport_clock.tick(),
};
self.snapshot.version.observe(operation.local_timestamp());
self.snapshot.version.observe(undo.timestamp);
let operation = Operation::Undo(undo);
self.history.push(operation.clone());
Ok(operation)
}
@ -1363,7 +1316,7 @@ impl Buffer {
pub fn wait_for_edits(
&mut self,
edit_ids: impl IntoIterator<Item = clock::Local>,
edit_ids: impl IntoIterator<Item = clock::Lamport>,
) -> impl 'static + Future<Output = Result<()>> {
let mut futures = Vec::new();
for edit_id in edit_ids {
@ -1435,7 +1388,7 @@ impl Buffer {
self.wait_for_version_txs.clear();
}
fn resolve_edit(&mut self, edit_id: clock::Local) {
fn resolve_edit(&mut self, edit_id: clock::Lamport) {
for mut tx in self
.edit_id_resolvers
.remove(&edit_id)
@ -1513,7 +1466,7 @@ impl Buffer {
.insertions
.get(
&InsertionFragmentKey {
timestamp: fragment.insertion_timestamp.local(),
timestamp: fragment.timestamp,
split_offset: fragment.insertion_offset,
},
&(),
@ -1996,7 +1949,7 @@ impl BufferSnapshot {
let fragment = fragment_cursor.item().unwrap();
let overshoot = offset - *fragment_cursor.start();
Anchor {
timestamp: fragment.insertion_timestamp.local(),
timestamp: fragment.timestamp,
offset: fragment.insertion_offset + overshoot,
bias,
buffer_id: Some(self.remote_id),
@ -2188,15 +2141,14 @@ impl<'a, D: TextDimension + Ord, F: FnMut(&FragmentSummary) -> bool> Iterator fo
break;
}
let timestamp = fragment.insertion_timestamp.local();
let start_anchor = Anchor {
timestamp,
timestamp: fragment.timestamp,
offset: fragment.insertion_offset,
bias: Bias::Right,
buffer_id: Some(self.buffer_id),
};
let end_anchor = Anchor {
timestamp,
timestamp: fragment.timestamp,
offset: fragment.insertion_offset + fragment.len,
bias: Bias::Left,
buffer_id: Some(self.buffer_id),
@ -2269,19 +2221,17 @@ impl<'a, D: TextDimension + Ord, F: FnMut(&FragmentSummary) -> bool> Iterator fo
impl Fragment {
fn insertion_slice(&self) -> InsertionSlice {
InsertionSlice {
insertion_id: self.insertion_timestamp.local(),
insertion_id: self.timestamp,
range: self.insertion_offset..self.insertion_offset + self.len,
}
}
fn is_visible(&self, undos: &UndoMap) -> bool {
!undos.is_undone(self.insertion_timestamp.local())
&& self.deletions.iter().all(|d| undos.is_undone(*d))
!undos.is_undone(self.timestamp) && self.deletions.iter().all(|d| undos.is_undone(*d))
}
fn was_visible(&self, version: &clock::Global, undos: &UndoMap) -> bool {
(version.observed(self.insertion_timestamp.local())
&& !undos.was_undone(self.insertion_timestamp.local(), version))
(version.observed(self.timestamp) && !undos.was_undone(self.timestamp, version))
&& self
.deletions
.iter()
@ -2294,14 +2244,14 @@ impl sum_tree::Item for Fragment {
fn summary(&self) -> Self::Summary {
let mut max_version = clock::Global::new();
max_version.observe(self.insertion_timestamp.local());
max_version.observe(self.timestamp);
for deletion in &self.deletions {
max_version.observe(*deletion);
}
max_version.join(&self.max_undos);
let mut min_insertion_version = clock::Global::new();
min_insertion_version.observe(self.insertion_timestamp.local());
min_insertion_version.observe(self.timestamp);
let max_insertion_version = min_insertion_version.clone();
if self.visible {
FragmentSummary {
@ -2378,7 +2328,7 @@ impl sum_tree::KeyedItem for InsertionFragment {
impl InsertionFragment {
fn new(fragment: &Fragment) -> Self {
Self {
timestamp: fragment.insertion_timestamp.local(),
timestamp: fragment.timestamp,
split_offset: fragment.insertion_offset,
fragment_id: fragment.id.clone(),
}
@ -2501,10 +2451,10 @@ impl Operation {
operation_queue::Operation::lamport_timestamp(self).replica_id
}
pub fn local_timestamp(&self) -> clock::Local {
pub fn timestamp(&self) -> clock::Lamport {
match self {
Operation::Edit(edit) => edit.timestamp.local(),
Operation::Undo { undo, .. } => undo.id,
Operation::Edit(edit) => edit.timestamp,
Operation::Undo(undo) => undo.timestamp,
}
}
@ -2523,10 +2473,8 @@ impl Operation {
impl operation_queue::Operation for Operation {
fn lamport_timestamp(&self) -> clock::Lamport {
match self {
Operation::Edit(edit) => edit.timestamp.lamport(),
Operation::Undo {
lamport_timestamp, ..
} => *lamport_timestamp,
Operation::Edit(edit) => edit.timestamp,
Operation::Undo(undo) => undo.timestamp,
}
}
}