diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index c8af454935..9f63489804 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -251,7 +251,7 @@ impl Checkout { let operations = self .repo - .operations_since(&(&response.max_operation_ids).into()); + .read(|snapshot| snapshot.operations_since(&(&response.max_operation_ids).into())); for chunk in operations.chunks(CHUNK_SIZE) { client @@ -368,16 +368,8 @@ impl Client { } fn handle_remote_operation(self, repo_id: RepoId, operation: Operation) { - let update = self - .db - .snapshot - .lock() - .repos - .update(&repo_id, |repo| repo.apply_operation(operation)); - assert!( - update.is_some(), - "received an operation for an unknown repo" - ); + let repo = self.db.repo(repo_id).expect("repo must exist"); + repo.apply_operations([operation]); } fn request(&self, request: R) -> BoxFuture> { @@ -430,6 +422,7 @@ impl Server { this.handle_requests(Self::handle_publish_repo); this.handle_requests(Self::handle_clone_repo); this.handle_requests(Self::handle_sync_repo); + this.handle_requests(Self::handle_publish_operations); let request_handlers = this.request_handlers.clone(); network.handle_requests(move |user, request_bytes| { @@ -531,18 +524,29 @@ impl Server { ) -> Result { let repo = self .db - .snapshot - .lock() - .repos - .get(&request.id) - .ok_or_else(|| anyhow!("repo not found"))? - .clone(); + .repo(request.id) + .ok_or_else(|| anyhow!("repo not found"))?; - Ok(messages::SyncRepoResponse { - operations: repo.operations_since(&(&request.max_operation_ids).into()), - max_operation_ids: (&repo.max_operation_ids).into(), + repo.read(|snapshot| { + Ok(messages::SyncRepoResponse { + operations: snapshot.operations_since(&(&request.max_operation_ids).into()), + max_operation_ids: (&snapshot.max_operation_ids).into(), + }) }) } + + async fn handle_publish_operations( + self, + _user: User, + request: messages::PublishOperations, + ) -> Result<()> { + let repo = self + .db + .repo(request.repo_id) + .ok_or_else(|| anyhow!("repo not found"))?; + repo.apply_operations(request.operations); + Ok(()) + } } #[derive(Clone)] @@ -593,10 +597,6 @@ impl Repo { } } - fn operations_since(&self, version: &TreeMap) -> Vec { - self.read(|repo| repo.operations_since(version)) - } - fn read(&self, f: F) -> T where F: FnOnce(&RepoSnapshot) -> T, @@ -626,10 +626,19 @@ impl Repo { if let Some(local_operation_created) = self.db.local_operation_created.as_ref() { local_operation_created(self.id, operation); } + result }) .expect("repo must exist") } + + fn apply_operations(&self, operations: impl IntoIterator) { + self.db + .snapshot + .lock() + .repos + .update(&self.id, |repo| todo!()); + } } #[derive(Clone)] @@ -1280,10 +1289,6 @@ impl RepoSnapshot { (operation, branch_id) } - fn apply_operation(&mut self, operation: Operation) { - todo!() - } - fn operations_since(&self, version: &TreeMap) -> Vec { let mut new_operations = Vec::new(); for (replica_id, end_op_count) in self.max_operation_ids.iter() { @@ -1364,7 +1369,7 @@ mod tests { where F: 'static + Send + Future, { - todo!() + Background::spawn(self, future).detach(); } } }