Implement ExcerptList::subscribe

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>
This commit is contained in:
Nathan Sobo 2021-12-06 13:37:17 -07:00
parent a02a29944c
commit 4578938ea1
5 changed files with 119 additions and 54 deletions

View file

@ -7,6 +7,7 @@ mod point_utf16;
pub mod random_char_iter;
pub mod rope;
mod selection;
pub mod subscription;
#[cfg(test)]
mod tests;
@ -15,7 +16,6 @@ use anyhow::{anyhow, Result};
use clock::ReplicaId;
use collections::{HashMap, HashSet};
use operation_queue::OperationQueue;
use parking_lot::Mutex;
pub use patch::Patch;
pub use point::*;
pub use point_utf16::*;
@ -29,9 +29,10 @@ use std::{
iter::Iterator,
ops::{self, Deref, Range, Sub},
str,
sync::{Arc, Weak},
sync::Arc,
time::{Duration, Instant},
};
use subscription::{Subscription, Topic};
pub use sum_tree::Bias;
use sum_tree::{FilterCursor, SumTree};
@ -46,7 +47,7 @@ pub struct Buffer {
remote_id: u64,
local_clock: clock::Local,
lamport_clock: clock::Lamport,
subscriptions: Vec<Weak<Mutex<Vec<Patch<usize>>>>>,
subscriptions: Topic,
}
#[derive(Clone, Debug)]
@ -343,20 +344,6 @@ impl<D1, D2> Edit<(D1, D2)> {
}
}
#[derive(Clone, Default)]
pub struct Subscription(Arc<Mutex<Vec<Patch<usize>>>>);
impl Subscription {
pub fn consume(&self) -> Patch<usize> {
let mut patches = self.0.lock();
let mut changes = Patch::default();
for patch in patches.drain(..) {
changes = changes.compose(&patch);
}
changes
}
}
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct InsertionTimestamp {
pub replica_id: ReplicaId,
@ -699,7 +686,7 @@ impl Buffer {
self.snapshot.fragments = new_fragments;
self.snapshot.visible_text = visible_text;
self.snapshot.deleted_text = deleted_text;
self.update_subscriptions(edits);
self.subscriptions.publish_mut(&edits);
edit_op.new_text = new_text;
edit_op
}
@ -955,7 +942,7 @@ impl Buffer {
self.snapshot.deleted_text = deleted_text;
self.local_clock.observe(timestamp.local());
self.lamport_clock.observe(timestamp.lamport());
self.update_subscriptions(edits);
self.subscriptions.publish_mut(&edits);
}
fn apply_undo(&mut self, undo: &UndoOperation) -> Result<()> {
@ -1045,7 +1032,7 @@ impl Buffer {
self.snapshot.fragments = new_fragments;
self.snapshot.visible_text = visible_text;
self.snapshot.deleted_text = deleted_text;
self.update_subscriptions(edits);
self.subscriptions.publish_mut(&edits);
Ok(())
}
@ -1203,20 +1190,7 @@ impl Buffer {
}
pub fn subscribe(&mut self) -> Subscription {
let subscription = Subscription(Default::default());
self.subscriptions.push(Arc::downgrade(&subscription.0));
subscription
}
fn update_subscriptions(&mut self, edits: Patch<usize>) {
self.subscriptions.retain(|subscription| {
if let Some(subscription) = subscription.upgrade() {
subscription.lock().push(edits.clone());
true
} else {
false
}
});
self.subscriptions.subscribe()
}
pub fn selection_set(&self, set_id: SelectionSetId) -> Result<&SelectionSet> {