diff --git a/Cargo.lock b/Cargo.lock index 8ae33ebd67..cd9f17b73d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1740,9 +1740,12 @@ dependencies = [ "anyhow", "async-broadcast", "collections", + "ctor", + "env_logger 0.9.3", "futures 0.3.28", "gpui", "lazy_static", + "log", "parking_lot 0.11.2", "rand 0.8.5", "rope", diff --git a/crates/crdb/Cargo.toml b/crates/crdb/Cargo.toml index 078499275a..db365223ce 100644 --- a/crates/crdb/Cargo.toml +++ b/crates/crdb/Cargo.toml @@ -16,6 +16,7 @@ util = { path = "../util" } anyhow.workspace = true futures.workspace = true lazy_static.workspace = true +log.workspace = true parking_lot.workspace = true serde.workspace = true serde_bare = "0.5" @@ -23,6 +24,8 @@ smallvec.workspace = true uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] } [dev-dependencies] +ctor.workspace = true +env_logger.workspace = true gpui = { path = "../gpui", features = ["test-support"] } async-broadcast = "0.4" rand.workspace = true diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index 2fa22d8cc7..1c3e422f15 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -11,7 +11,6 @@ use collections::{btree_map, BTreeMap, Bound, HashMap, VecDeque}; use dense_id::DenseId; use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt}; use messages::{MessageEnvelope, Operation, RequestEnvelope}; -use operations::CreateBranch; use parking_lot::{Mutex, RwLock}; use rope::Rope; use serde::{Deserialize, Serialize}; @@ -646,41 +645,18 @@ struct Branch { impl Branch { pub fn create_document(&self) -> Document { self.update(|document_id, parent, revision| { - let mut cursor = revision.document_fragments.cursor::(); - let mut new_document_fragments = cursor.slice(&document_id, Bias::Right, &()); - new_document_fragments.push( - DocumentFragment { - document_id, - location: DenseId::min(), - insertion_id: document_id, - insertion_subrange: 0..0, - tombstones: Default::default(), - undo_count: 0, - }, - &(), - ); - new_document_fragments.append(cursor.suffix(&()), &()); - drop(cursor); - - revision.document_fragments = new_document_fragments; - revision.document_metadata.insert( - document_id, - DocumentMetadata { - path: None, - last_change: document_id, - }, - ); - - let operation = Operation::CreateDocument(operations::CreateDocument { + let operation = operations::CreateDocument { id: document_id, + branch_id: self.id, parent, - }); + }; + revision.apply_create_document(operation.clone()); let document = Document { id: document_id, branch: self.clone(), }; - (operation, document) + (Operation::CreateDocument(operation), document) }) } @@ -879,6 +855,7 @@ impl Document { let edits = edits.into_iter(); let mut edit_op = operations::Edit { id: operation_id, + branch_id: self.branch.id, parent: parent.clone(), edits: SmallVec::with_capacity(edits.len()), }; @@ -1247,7 +1224,7 @@ impl<'a> RopeBuilder<'a> { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct RepoSnapshot { last_operation_id: OperationId, branches: btree::Map, @@ -1257,6 +1234,22 @@ pub struct RepoSnapshot { deferred_operations: btree::Sequence, } +impl Default for RepoSnapshot { + fn default() -> Self { + Self { + last_operation_id: Default::default(), + branches: Default::default(), + operations: Default::default(), + revisions: btree::Map::from_ordered_entries([( + RevisionId::default(), + Revision::default(), + )]), + max_operation_ids: Default::default(), + deferred_operations: Default::default(), + } + } +} + impl RepoSnapshot { fn new(replica_id: ReplicaId) -> Self { Self { @@ -1268,21 +1261,17 @@ impl RepoSnapshot { fn create_empty_branch(&mut self, name: impl Into>) -> (Operation, OperationId) { let name = name.into(); let branch_id = self.last_operation_id.tick(); - self.branches.insert( - branch_id, - BranchSnapshot { - name: name.clone(), - head: smallvec![branch_id], - }, - ); - self.revisions - .insert(smallvec![branch_id], Default::default()); - let operation = Operation::CreateBranch(CreateBranch { + let operation = operations::CreateBranch { id: branch_id, name, parent: Default::default(), - }); - (operation, branch_id) + }; + operation + .clone() + .apply(self) + .expect("can always create empty branch"); + + (Operation::CreateBranch(operation), branch_id) } fn operations_since(&self, version: &btree::Map) -> Vec { @@ -1315,27 +1304,30 @@ impl RepoSnapshot { } /// Apply the given operations and any deferred operations that are now applicable. - fn apply_operations(&mut self, operations: impl Into>) -> Result<()> { + fn apply_operations(&mut self, operations: impl Into>) { let mut operations = operations.into(); - while let Some(operation) = operations.pop_front() { self.save_operation(&operation); if operation - .revision() + .parent() .iter() .all(|parent| self.operations.contains_key(&parent)) { let operation_id = operation.id(); - match operation { + let result = match operation { Operation::CreateDocument(op) => op.apply(self), Operation::Edit(op) => op.apply(self), Operation::CreateBranch(op) => op.apply(self), - }?; - - self.flush_deferred_operations(operation_id, &mut operations); + }; + match result { + Ok(_) => self.flush_deferred_operations(operation_id, &mut operations), + Err(error) => { + log::error!("error applying operation {:?}: {:?}", operation_id, error) + } + } } else { - for parent in operation.revision() { + for parent in operation.parent() { self.deferred_operations.insert_or_replace( DeferredOperation { parent: *parent, @@ -1346,7 +1338,6 @@ impl RepoSnapshot { } } } - Ok(()) } /// Remove any operations deferred on the given parent and add them to the @@ -1447,12 +1438,52 @@ struct Revision { hidden_text: Rope, } +impl Revision { + fn apply_create_document(&mut self, operation: operations::CreateDocument) { + let mut cursor = self.document_fragments.cursor::(); + let mut new_document_fragments = cursor.slice(&operation.id, Bias::Right, &()); + new_document_fragments.push( + DocumentFragment { + document_id: operation.id, + location: DenseId::min(), + insertion_id: operation.id, + insertion_subrange: 0..0, + tombstones: Default::default(), + undo_count: 0, + }, + &(), + ); + new_document_fragments.append(cursor.suffix(&()), &()); + drop(cursor); + + self.document_fragments = new_document_fragments; + self.document_metadata.insert( + operation.id, + DocumentMetadata { + path: None, + last_change: operation.id, + }, + ); + } +} + +#[cfg(test)] +#[ctor::ctor] +fn init_logger() { + if std::env::var("RUST_LOG").is_ok() { + env_logger::init(); + } else { + env_logger::builder() + .filter_level(log::LevelFilter::Error) + .init(); + } +} + #[cfg(test)] mod tests { - use gpui::executor::{Background, Deterministic}; - use super::*; use crate::test::TestNetwork; + use gpui::executor::{Background, Deterministic}; #[gpui::test] async fn test_repo(deterministic: Arc) { diff --git a/crates/crdb/src/messages.rs b/crates/crdb/src/messages.rs index dad4e69fc3..b96ea8c321 100644 --- a/crates/crdb/src/messages.rs +++ b/crates/crdb/src/messages.rs @@ -142,7 +142,7 @@ impl Operation { } } - pub fn revision(&self) -> &RevisionId { + pub fn parent(&self) -> &RevisionId { match self { Operation::CreateDocument(op) => &op.parent, Operation::Edit(op) => &op.parent, diff --git a/crates/crdb/src/operations.rs b/crates/crdb/src/operations.rs index 5c08e794a0..cc6bd2eefe 100644 --- a/crates/crdb/src/operations.rs +++ b/crates/crdb/src/operations.rs @@ -1,8 +1,12 @@ -use crate::{AnchorRange, OperationId, RepoSnapshot, RevisionId}; -use anyhow::Result; +use crate::{ + dense_id::DenseId, AnchorRange, BranchSnapshot, DocumentFragment, DocumentMetadata, + OperationId, RepoSnapshot, Revision, RevisionId, +}; +use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; +use smallvec::{smallvec, SmallVec}; use std::sync::Arc; +use sum_tree::Bias; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CreateBranch { @@ -13,31 +17,76 @@ pub struct CreateBranch { impl CreateBranch { pub fn apply(self, repo: &mut RepoSnapshot) -> Result<()> { - todo!() + let revision = repo + .revisions + .get(&self.parent) + .ok_or_else(|| anyhow!("parent revision not found"))? + .clone(); + repo.branches.insert( + self.id, + BranchSnapshot { + name: self.name, + head: smallvec![self.id], + }, + ); + repo.revisions.insert(smallvec![self.id], revision); + Ok(()) } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CreateDocument { pub id: OperationId, + pub branch_id: OperationId, pub parent: RevisionId, } impl CreateDocument { pub fn apply(self, repo: &mut RepoSnapshot) -> Result<()> { - todo!() + let branch_id = self.branch_id; + let branch = repo + .branches + .get(&self.branch_id) + .ok_or_else(|| anyhow!("branch {:?} not found", self.branch_id))?; + + let mut revision = repo + .revisions + .get(&branch.head) + .ok_or_else(|| { + anyhow!( + "revision {:?} not found in branch {:?}", + branch.head, + self.branch_id + ) + })? + .clone(); + let new_head = if self.parent == branch.head { + smallvec![self.id] + } else { + let mut head = branch.head.clone(); + head.push(self.id); + head + }; + + revision.apply_create_document(self); + repo.branches + .update(&branch_id, |branch| branch.head = new_head.clone()); + repo.revisions.insert(new_head, revision); + + Ok(()) } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Edit { pub id: OperationId, + pub branch_id: OperationId, pub parent: RevisionId, pub edits: SmallVec<[(AnchorRange, Arc); 2]>, } impl Edit { pub fn apply(self, repo: &mut RepoSnapshot) -> Result<()> { - todo!() + Err(anyhow!("not implemented")) } }