From 50f507e38e16ac65a8833d8c42becd99514d0cdc Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 1 Aug 2023 15:07:32 +0200 Subject: [PATCH] Maintain ref counts for document handles --- crates/crdb/src/btree/map.rs | 20 +- crates/crdb/src/crdb.rs | 324 +++++++++++++++++++----------- crates/crdb/src/messages.rs | 10 +- crates/crdb/src/operations.rs | 16 +- crates/crdb/src/revision_cache.rs | 69 ------- crates/crdb/src/rope.rs | 4 + 6 files changed, 247 insertions(+), 196 deletions(-) delete mode 100644 crates/crdb/src/revision_cache.rs diff --git a/crates/crdb/src/btree/map.rs b/crates/crdb/src/btree/map.rs index 61523c1e66..fa0f18354d 100644 --- a/crates/crdb/src/btree/map.rs +++ b/crates/crdb/src/btree/map.rs @@ -6,11 +6,11 @@ use serde::{Deserialize, Serialize}; use std::{ cmp::Ordering, collections::BTreeMap, - fmt::Debug, + fmt::{self, Debug}, ops::{Bound, RangeBounds}, }; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub struct Map(Sequence>) where K: Clone + Debug + Ord, @@ -41,7 +41,7 @@ impl Default for MapKeyRef<'_, K> { } #[derive(Clone)] -pub struct TreeSet(Map) +pub struct Set(Map) where K: Clone + Debug + Ord; @@ -294,6 +294,16 @@ where } } +impl Debug for Map +where + K: Clone + Debug + Ord, + V: Clone + Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_map().entries(self.iter()).finish() + } +} + #[derive(Debug)] struct MapSeekTargetAdaptor<'a, T>(&'a T); @@ -382,7 +392,7 @@ where } } -impl Default for TreeSet +impl Default for Set where K: Clone + Debug + Ord, { @@ -391,7 +401,7 @@ where } } -impl TreeSet +impl Set where K: Clone + Debug + Ord, { diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index 62a8f4f85c..8ae4c9b0ae 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -3,7 +3,6 @@ mod dense_id; mod history; mod messages; mod operations; -mod revision_cache; mod rope; mod sync; #[cfg(test)] @@ -17,7 +16,6 @@ use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt}; use history::{History, SavedHistory}; use messages::{MessageEnvelope, Operation, RequestEnvelope}; use parking_lot::{Mutex, RwLock}; -use revision_cache::RevisionCache; use rope::Rope; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; @@ -182,6 +180,9 @@ impl btree::Summary for OperationId { } } +pub type BranchId = OperationId; +pub type DocumentId = OperationId; + #[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)] pub struct RoomName(Arc); @@ -388,17 +389,12 @@ impl Checkout { self.repo.apply_operations(response.operations).await?; let operations = self .repo - .update_async(|repo| { - let max_operation_ids = response.max_operation_ids.clone(); - let kv = self.repo.db.kv.clone(); - async move { - let operations = repo - .history - .operations_since(&(&max_operation_ids).into(), &*kv) - .await?; - Ok((None, operations)) - } - .boxed() + .update_atomic(|mut repo| async { + let operations = repo + .history + .operations_since(&(&response.max_operation_ids).into(), &*self.repo.db.kv) + .await?; + Ok((repo, operations)) }) .await?; @@ -473,8 +469,7 @@ impl Client { pub fn create_repo(&self) -> Repo { let id = RepoId::new(); - let revision_cache = RevisionCache::new(id, &*self.executor, self.db.kv.clone()); - let snapshot = RepoSnapshot::new(id, ReplicaId(0), revision_cache); + let snapshot = RepoSnapshot::new(id, ReplicaId(0)); let repo = Repo { id, db: self.db.clone(), @@ -496,11 +491,10 @@ impl Client { id: repo_id, db: this.db.clone(), }; - let revision_cache = RevisionCache::new(repo_id, &*this.executor, this.db.kv.clone()); - this.db.repos.write().insert( - repo_id, - RepoSnapshot::new(repo_id, response.replica_id, revision_cache), - ); + this.db + .repos + .write() + .insert(repo_id, RepoSnapshot::new(repo_id, response.replica_id)); let mut room = this.network.room(response.credentials); room.connect().await?; @@ -543,8 +537,7 @@ impl Client { return Ok(repo); } - let revision_cache = RevisionCache::new(id, &*this.executor, this.db.kv.clone()); - let repo = RepoSnapshot::load(id, revision_cache, &*this.db.kv).await?; + let repo = RepoSnapshot::load(id, &*this.db.kv).await?; this.db.repos.write().entry(id).or_insert(repo); Ok(Repo { @@ -687,10 +680,9 @@ impl Server { .network .grant_room_access(&room_name, user.login.as_ref()); - let revision_cache = RevisionCache::new(request.id, &*self.executor, self.db.kv.clone()); self.db.repos.write().insert( request.id, - RepoSnapshot::new(request.id, ReplicaId(u32::MAX), revision_cache), + RepoSnapshot::new(request.id, ReplicaId(u32::MAX)), ); self.next_replica_ids_by_repo_id .lock() @@ -740,23 +732,16 @@ impl Server { .repo(request.id) .ok_or_else(|| anyhow!("repo not found"))?; - repo.update_async(|snapshot| { - let request = request.clone(); - let kv = self.db.kv.clone(); - async move { - let operations = snapshot - .history - .operations_since(&(&request.max_operation_ids).into(), &*kv) - .await?; - Ok(( - None, - messages::SyncRepoResponse { - operations, - max_operation_ids: snapshot.history.max_operation_ids().into(), - }, - )) - } - .boxed() + repo.update_atomic(|mut repo| async { + let operations = repo + .history + .operations_since(&(&request.max_operation_ids).into(), &*self.db.kv) + .await?; + let response = messages::SyncRepoResponse { + operations, + max_operation_ids: repo.history.max_operation_ids().into(), + }; + Ok((repo, response)) }) .await } @@ -836,26 +821,17 @@ impl Repo { async move { let branch_id = this - .update_async(|repo| { - let this = this.clone(); - let name = name.clone(); - async move { - let branch_id = *repo - .branch_ids_by_name - .load(&name, &*this.db.kv) - .await? - .ok_or_else(|| anyhow!("branch not found"))?; - let head = repo - .branches - .load(&branch_id, &*this.db.kv) - .await? - .ok_or_else(|| anyhow!("branch not found"))? - .head - .clone(); - repo.build_revision(&head, &*this.db.kv).await?; - Ok((None, branch_id)) - } - .boxed() + .update_atomic(|mut repo| async { + let branch_id = *repo + .branch_ids_by_name + .load(&name, &*this.db.kv) + .await? + .ok_or_else(|| anyhow!("branch not found"))?; + repo.branches + .load(&branch_id, &*this.db.kv) + .await? + .ok_or_else(|| anyhow!("branch not found"))?; + Ok((repo, branch_id)) }) .await?; @@ -908,18 +884,19 @@ impl Repo { result } - async fn update_async(&self, mut f: F) -> Result + async fn update_atomic(&self, mut f: F) -> Result where - F: FnMut(&mut RepoSnapshot) -> BoxFuture<'_, Result<(Option, T)>>, + F: FnMut(RepoSnapshot) -> Fut, + Fut: Future>, { loop { let prev_snapshot = self.read(|repo| repo.clone()); - let mut new_snapshot = prev_snapshot.clone(); - let (operation, value) = f(&mut new_snapshot).await?; + let new_snapshot = prev_snapshot.clone(); + let (new_snapshot, value) = f(new_snapshot).await?; let updated = self.update(|latest_snapshot| { if RepoSnapshot::ptr_eq(&prev_snapshot, &latest_snapshot) { *latest_snapshot = new_snapshot; - (operation, true) + (None, true) } else { (None, false) } @@ -935,14 +912,11 @@ impl Repo { let mut operations = operations.into(); while let Some(operation) = operations.pop_front() { let flushed_operations = self - .update_async(|repo| { - let operation = operation.clone(); - let kv = self.db.kv.clone(); - async move { - let flushed_operations = repo.apply_operation(operation, &*kv).await?; - Ok((None, flushed_operations)) - } - .boxed() + .update_atomic(|mut repo| async { + let flushed_operations = repo + .apply_operation(operation.clone(), &*self.db.kv) + .await?; + Ok((repo, flushed_operations)) }) .await?; operations.extend(flushed_operations); @@ -953,7 +927,7 @@ impl Repo { #[derive(Clone)] struct Branch { - id: OperationId, + id: BranchId, repo: Repo, } @@ -980,17 +954,49 @@ impl Branch { }) } - pub fn load_document(&self, id: OperationId) -> Result { - self.read(|revision| { - revision - .document_metadata - .get(&id) - .ok_or_else(|| anyhow!("document not found"))?; - Ok(Document { - branch: self.clone(), - id, + pub async fn load_document(&self, document_id: DocumentId) -> Result { + self.repo + .update_atomic(|mut repo| async { + let document_key = (self.id, document_id); + if let Some(prev_count) = repo.document_ref_counts.get(&document_key).copied() { + repo.document_ref_counts + .insert(document_key, prev_count + 1); + } else { + let head = repo + .branches + .get(&self.id) + .expect("branch must exist") + .head + .clone(); + repo.document_ref_counts.insert(document_key, 1); + let mut revision = repo.load_revision(&head, &*self.repo.db.kv).await?; + revision + .load_documents([document_id], &*self.repo.db.kv) + .await?; + repo.revisions.insert(head, revision); + } + + Ok(( + repo, + Document { + branch: self.clone(), + id: document_id, + }, + )) }) - }) + .await?; + + todo!() + // self.read(|revision| { + // revision + // .document_metadata + // .get(&id) + // .ok_or_else(|| anyhow!("document not found"))?; + // Ok(Document { + // branch: self.clone(), + // id, + // }) + // }) } pub fn documents(&self) -> Vec { @@ -1017,12 +1023,16 @@ impl Branch { .expect("branch must exist") .head .clone(); - let mut revision = repo.revisions.get(&head).expect("head revision must exist"); + let mut new_revision = repo + .revisions + .get(&head) + .expect("head revision must exist") + .clone(); let operation_id = repo.history.next_operation_id(); - let (operation, result) = f(operation_id, head.clone(), &mut revision); + let (operation, result) = f(operation_id, head.clone(), &mut new_revision); repo.branches .update(&self.id, |branch| branch.head = operation_id.into()); - repo.revisions.save(&operation_id.into(), revision); + repo.revisions.insert(operation_id.into(), new_revision); (Some(operation), result) }) } @@ -1057,7 +1067,7 @@ struct DocumentMetadata { #[derive(Clone, Debug, Serialize, Deserialize)] struct DocumentFragment { - document_id: OperationId, + document_id: DocumentId, location: DenseId, insertion_id: OperationId, insertion_subrange: Range, @@ -1136,7 +1146,7 @@ impl btree::Item for DocumentFragment { pub struct DocumentFragmentSummary { visible_len: usize, hidden_len: usize, - max_document_id: OperationId, + max_document_id: DocumentId, max_location: DenseId, } @@ -1273,7 +1283,7 @@ impl<'a> btree::SeekTarget<'a, InsertionFragmentSummary, InsertionFragmentSummar struct Document { branch: Branch, - id: OperationId, + id: DocumentId, } impl Document { @@ -1537,11 +1547,30 @@ impl Document { } } +impl Drop for Document { + fn drop(&mut self) { + self.branch.repo.update(|repo| { + let document_key = (self.branch.id, self.id); + let ref_count = repo + .document_ref_counts + .update(&document_key, |ref_count| { + *ref_count -= 1; + *ref_count + }) + .expect("document must exist"); + if ref_count == 0 { + repo.document_ref_counts.remove(&document_key); + } + (None, ()) + }); + } +} + #[derive(Clone, Debug, Default)] pub struct LocalEditDimension { visible_len: usize, hidden_len: usize, - max_document_id: OperationId, + max_document_id: DocumentId, } impl<'a> btree::Dimension<'a, DocumentFragmentSummary> for LocalEditDimension { @@ -1698,7 +1727,8 @@ pub struct RepoSnapshot { history: History, branches: btree::Map, branch_ids_by_name: btree::Map, OperationId>, - revisions: RevisionCache, + revisions: btree::Map, + document_ref_counts: btree::Map<(BranchId, DocumentId), usize>, } #[derive(Serialize, Deserialize)] @@ -1709,13 +1739,17 @@ struct SavedRepoSnapshot { } impl RepoSnapshot { - fn new(id: RepoId, replica_id: ReplicaId, revisions: RevisionCache) -> Self { + fn new(id: RepoId, replica_id: ReplicaId) -> Self { Self { id, history: History::new(replica_id), branches: Default::default(), branch_ids_by_name: Default::default(), - revisions, + revisions: btree::Map::from_ordered_entries([( + RevisionId::default(), + Revision::default(), + )]), + document_ref_counts: Default::default(), } } @@ -1724,9 +1758,11 @@ impl RepoSnapshot { && btree::Map::ptr_eq(&this.branches, &other.branches) && btree::Map::ptr_eq(&this.branch_ids_by_name, &other.branch_ids_by_name) && History::ptr_eq(&this.history, &other.history) + && btree::Map::ptr_eq(&this.revisions, &other.revisions) + && btree::Map::ptr_eq(&this.document_ref_counts, &other.document_ref_counts) } - async fn load(id: RepoId, revisions: RevisionCache, kv: &dyn KvStore) -> Result { + async fn load(id: RepoId, kv: &dyn KvStore) -> Result { let repo_bytes = kv.load(id.to_be_bytes(), "root".into()).await?; let saved_repo = serde_bare::from_slice::(&repo_bytes)?; Ok(Self { @@ -1734,7 +1770,11 @@ impl RepoSnapshot { history: History::load(saved_repo.history, kv).await?, branches: btree::Map::load_root(saved_repo.branches, kv).await?, branch_ids_by_name: btree::Map::load_root(saved_repo.branch_ids_by_name, kv).await?, - revisions, + revisions: btree::Map::from_ordered_entries([( + RevisionId::default(), + Revision::default(), + )]), + document_ref_counts: Default::default(), }) } @@ -1761,7 +1801,7 @@ impl RepoSnapshot { }, ); self.branch_ids_by_name.insert(name.clone(), branch_id); - self.revisions.save(&branch_id.into(), Default::default()); + self.revisions.insert(branch_id.into(), Default::default()); ( Operation::CreateBranch(operations::CreateBranch { @@ -1785,6 +1825,7 @@ impl RepoSnapshot { return Ok(Default::default()); } + let branch_id = operation.branch_id(); let mut new_head; match &operation { Operation::CreateBranch(op) => { @@ -1826,26 +1867,62 @@ impl RepoSnapshot { let flushed_operations = self.history.insert(operation, kv).await?; // The following ensures that a revision for the branch head is always present. - #[cfg(not(any(test, feature = "test-support")))] - self.build_revision(&new_head, kv).await?; - #[cfg(any(test, feature = "test-support"))] - self.build_revision(&new_head, kv).await.unwrap(); + let mut revision = if cfg!(any(test, feature = "test-support")) { + self.load_revision(&new_head, kv).await.unwrap() + } else { + self.load_revision(&new_head, kv).await? + }; + revision + .load_documents( + self.document_ref_counts + .iter() + .filter_map(|((doc_branch_id, doc_id), _)| { + if *doc_branch_id == branch_id { + Some(*doc_id) + } else { + None + } + }), + kv, + ) + .await?; + self.revisions.insert(new_head, revision); Ok(flushed_operations) } - async fn build_revision( + async fn cached_revision( + repo_id: RepoId, + revision_id: &RevisionId, + revisions: &mut btree::Map, + kv: &dyn KvStore, + ) -> Option { + if let Some(revision) = revisions.get(revision_id) { + Some(revision.clone()) + } else if let Ok(revision) = Revision::load(repo_id, revision_id, kv).await { + revisions.insert(revision_id.clone(), revision.clone()); + Some(revision) + } else { + None + } + } + + async fn load_revision( &mut self, revision_id: &RevisionId, kv: &dyn KvStore, ) -> Result { - if let Some(revision) = self.revisions.load(revision_id, kv).await { + if let Some(revision) = + Self::cached_revision(self.id, revision_id, &mut self.revisions, kv).await + { Ok(revision) } else { let mut new_revisions = HashMap::default(); let mut rewind = self.history.rewind(revision_id, kv).await?; while let Some(ancestor_id) = rewind.next(kv).await? { - if let Some(ancestor_revision) = self.revisions.load(&ancestor_id, kv).await { + if let Some(ancestor_revision) = + Self::cached_revision(self.id, &ancestor_id, &mut self.revisions, kv).await + { new_revisions.insert(ancestor_id, ancestor_revision); for replay_op in rewind.replay() { let parent_revision = new_revisions[&replay_op.parent_revision_id].clone(); @@ -1873,11 +1950,16 @@ impl RepoSnapshot { } } - for (revision_id, revision) in new_revisions.drain() { - self.revisions.save(&revision_id, revision); + for (new_revision_id, revision) in new_revisions.drain() { + if self.revisions.contains_key(&new_revision_id) { + continue; + } + + revision.save(self.id, &new_revision_id, kv).await?; + self.revisions.insert(new_revision_id, revision); } - Ok(self.revisions.get(revision_id).unwrap()) + Ok(self.revisions.get(revision_id).unwrap().clone()) } } } @@ -1951,6 +2033,14 @@ pub struct SavedRevision { } impl Revision { + fn ptr_eq(this: &Self, other: &Self) -> bool { + btree::Map::ptr_eq(&this.document_metadata, &other.document_metadata) + && btree::Sequence::ptr_eq(&this.document_fragments, &other.document_fragments) + && btree::Sequence::ptr_eq(&this.insertion_fragments, &other.insertion_fragments) + && Rope::ptr_eq(&this.visible_text, &other.visible_text) + && Rope::ptr_eq(&this.hidden_text, &other.hidden_text) + } + async fn exists(repo_id: RepoId, id: &RevisionId, kv: &dyn KvStore) -> bool { kv.load(repo_id.to_be_bytes(), id.db_key()).await.is_ok() } @@ -1969,6 +2059,14 @@ impl Revision { }) } + async fn load_documents( + &mut self, + documents: impl IntoIterator, + kv: &dyn KvStore, + ) -> Result<()> { + todo!() + } + async fn save(&self, repo_id: RepoId, id: &RevisionId, kv: &dyn KvStore) -> Result<()> { let saved_revision = SavedRevision { document_metadata: self.document_metadata.save(kv).await?, @@ -2189,8 +2287,8 @@ mod tests { deterministic.run_until_parked(); let branch_b = repo_b.load_branch("main").await.unwrap(); - let doc1_b = branch_b.load_document(doc1_a.id).unwrap(); - let doc2_b = branch_b.load_document(doc2_a.id).unwrap(); + let doc1_b = branch_b.load_document(doc1_a.id).await.unwrap(); + let doc2_b = branch_b.load_document(doc2_a.id).await.unwrap(); assert_eq!(doc1_b.text().to_string(), "abc"); assert_eq!(doc2_b.text().to_string(), "def"); @@ -2211,7 +2309,7 @@ mod tests { ); let repo_a2 = client_a2.repo(repo_b.id).await.unwrap(); let branch_a2 = repo_a2.load_branch("main").await.unwrap(); - let doc1_a2 = branch_a2.load_document(doc1_a.id).unwrap(); + let doc1_a2 = branch_a2.load_document(doc1_a.id).await.unwrap(); assert_eq!(doc1_a2.text().to_string(), "aghic"); } @@ -2405,7 +2503,7 @@ mod tests { client_id: usize, repo_id: RepoId, branch_name: Arc, - document_id: OperationId, + document_id: DocumentId, edits: Vec<(Range, String)>, }, } @@ -2602,7 +2700,7 @@ mod tests { .ok_or_else(|| anyhow!("client not found"))?; let repo = client.repo(*repo_id).await?; let branch = repo.load_branch(branch_name.clone()).await?; - let document = branch.load_document(*document_id)?; + let document = branch.load_document(*document_id).await?; document.edit(edits.iter().cloned()); } } diff --git a/crates/crdb/src/messages.rs b/crates/crdb/src/messages.rs index 1a5ea2b696..4e5b31dcf9 100644 --- a/crates/crdb/src/messages.rs +++ b/crates/crdb/src/messages.rs @@ -1,6 +1,6 @@ use crate::{ operations::{CreateBranch, CreateDocument, Edit}, - OperationCount, OperationId, ReplicaId, RepoId, Request, RevisionId, RoomCredentials, + BranchId, OperationCount, OperationId, ReplicaId, RepoId, Request, RevisionId, RoomCredentials, }; use collections::BTreeMap; use serde::{Deserialize, Serialize}; @@ -141,6 +141,14 @@ impl Operation { } } + pub fn branch_id(&self) -> BranchId { + match self { + Operation::CreateBranch(op) => op.id, + Operation::CreateDocument(op) => op.branch_id, + Operation::Edit(op) => op.branch_id, + } + } + pub fn parent(&self) -> &RevisionId { match self { Operation::CreateDocument(op) => &op.parent, diff --git a/crates/crdb/src/operations.rs b/crates/crdb/src/operations.rs index eaa2c21aed..7c5d719f56 100644 --- a/crates/crdb/src/operations.rs +++ b/crates/crdb/src/operations.rs @@ -1,8 +1,8 @@ use crate::{ btree::{self, Bias}, dense_id::DenseId, - AnchorRange, DocumentFragment, DocumentFragmentSummary, DocumentMetadata, InsertionFragment, - OperationId, Revision, RevisionId, RopeBuilder, Tombstone, + AnchorRange, BranchId, DocumentFragment, DocumentFragmentSummary, DocumentId, DocumentMetadata, + InsertionFragment, OperationId, Revision, RevisionId, RopeBuilder, Tombstone, }; use anyhow::Result; use serde::{Deserialize, Serialize}; @@ -11,21 +11,21 @@ use std::{cmp, sync::Arc}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CreateBranch { - pub id: OperationId, + pub id: BranchId, pub parent: RevisionId, pub name: Arc, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CreateDocument { - pub id: OperationId, - pub branch_id: OperationId, + pub id: DocumentId, + pub branch_id: BranchId, pub parent: RevisionId, } impl CreateDocument { pub fn apply(self, revision: &mut Revision) { - let mut cursor = revision.document_fragments.cursor::(); + let mut cursor = revision.document_fragments.cursor::(); let mut new_document_fragments = cursor.slice(&self.id, Bias::Right, &()); new_document_fragments.push( DocumentFragment { @@ -63,8 +63,8 @@ impl CreateDocument { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Edit { pub id: OperationId, - pub document_id: OperationId, - pub branch_id: OperationId, + pub document_id: DocumentId, + pub branch_id: BranchId, pub parent: RevisionId, pub edits: SmallVec<[(AnchorRange, Arc); 2]>, } diff --git a/crates/crdb/src/revision_cache.rs b/crates/crdb/src/revision_cache.rs deleted file mode 100644 index 6117598c2e..0000000000 --- a/crates/crdb/src/revision_cache.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::{btree::KvStore, Executor, RepoId, Revision, RevisionId}; -use collections::HashMap; -use futures::{channel::mpsc, StreamExt}; -use parking_lot::Mutex; -use std::sync::Arc; -use util::ResultExt; - -#[derive(Clone, Debug)] -pub struct RevisionCache { - repo_id: RepoId, - revisions: Arc>>, - revisions_to_save: mpsc::UnboundedSender<(RevisionId, Revision)>, -} - -impl RevisionCache { - pub fn new(repo_id: RepoId, executor: &E, kv: Arc) -> Self { - let (revisions_to_save_tx, mut revisions_to_save_rx) = - mpsc::unbounded::<(RevisionId, Revision)>(); - executor.spawn(async move { - while let Some((revision_id, revision)) = revisions_to_save_rx.next().await { - if !Revision::exists(repo_id, &revision_id, &*kv).await { - revision.save(repo_id, &revision_id, &*kv).await.log_err(); - } - } - }); - - Self { - repo_id, - // Always consider the empty revision as cached. - revisions: Arc::new(Mutex::new(HashMap::from_iter([( - RevisionId::default(), - Revision::default(), - )]))), - revisions_to_save: revisions_to_save_tx, - } - } - - pub fn get(&self, revision_id: &RevisionId) -> Option { - self.revisions.lock().get(revision_id).cloned() - } - - pub async fn load(&self, revision_id: &RevisionId, kv: &dyn KvStore) -> Option { - if let Some(revision) = self.get(revision_id) { - Some(revision) - } else if let Some(revision) = Revision::load(self.repo_id, revision_id, kv).await.ok() { - Some( - self.revisions - .lock() - .entry(revision_id.clone()) - .or_insert(revision) - .clone(), - ) - } else { - None - } - } - - pub fn save(&self, revision_id: &RevisionId, revision: Revision) { - self.revisions - .lock() - .entry(revision_id.clone()) - .or_insert_with(|| { - let _ = self - .revisions_to_save - .unbounded_send((revision_id.clone(), revision.clone())); - revision - }); - } -} diff --git a/crates/crdb/src/rope.rs b/crates/crdb/src/rope.rs index 2ce4bfc352..d8eee67b0c 100644 --- a/crates/crdb/src/rope.rs +++ b/crates/crdb/src/rope.rs @@ -43,6 +43,10 @@ impl Rope { Self::default() } + pub fn ptr_eq(this: &Self, other: &Self) -> bool { + Sequence::ptr_eq(&this.chunks, &other.chunks) + } + pub async fn load_root(id: SavedId, kv: &dyn KvStore) -> Result { let chunks = Sequence::load_root(id, kv).await?; Ok(Self { chunks })