Move apply_operations to Repo

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Antonio Scandurra 2023-07-19 19:40:58 +02:00
parent 9e03e9d6df
commit 2c27e875e5

View file

@ -251,7 +251,7 @@ impl<E: Executor, N: ClientNetwork> Checkout<E, N> {
let operations = self let operations = self
.repo .repo
.operations_since(&(&response.max_operation_ids).into()); .read(|snapshot| snapshot.operations_since(&(&response.max_operation_ids).into()));
for chunk in operations.chunks(CHUNK_SIZE) { for chunk in operations.chunks(CHUNK_SIZE) {
client client
@ -368,16 +368,8 @@ impl<E: Executor, N: ClientNetwork> Client<E, N> {
} }
fn handle_remote_operation(self, repo_id: RepoId, operation: Operation) { fn handle_remote_operation(self, repo_id: RepoId, operation: Operation) {
let update = self let repo = self.db.repo(repo_id).expect("repo must exist");
.db repo.apply_operations([operation]);
.snapshot
.lock()
.repos
.update(&repo_id, |repo| repo.apply_operation(operation));
assert!(
update.is_some(),
"received an operation for an unknown repo"
);
} }
fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>> { fn request<R: Request>(&self, request: R) -> BoxFuture<Result<R::Response>> {
@ -430,6 +422,7 @@ impl<N: ServerNetwork> Server<N> {
this.handle_requests(Self::handle_publish_repo); this.handle_requests(Self::handle_publish_repo);
this.handle_requests(Self::handle_clone_repo); this.handle_requests(Self::handle_clone_repo);
this.handle_requests(Self::handle_sync_repo); this.handle_requests(Self::handle_sync_repo);
this.handle_requests(Self::handle_publish_operations);
let request_handlers = this.request_handlers.clone(); let request_handlers = this.request_handlers.clone();
network.handle_requests(move |user, request_bytes| { network.handle_requests(move |user, request_bytes| {
@ -531,18 +524,29 @@ impl<N: ServerNetwork> Server<N> {
) -> Result<messages::SyncRepoResponse> { ) -> Result<messages::SyncRepoResponse> {
let repo = self let repo = self
.db .db
.snapshot .repo(request.id)
.lock() .ok_or_else(|| anyhow!("repo not found"))?;
.repos
.get(&request.id)
.ok_or_else(|| anyhow!("repo not found"))?
.clone();
Ok(messages::SyncRepoResponse { repo.read(|snapshot| {
operations: repo.operations_since(&(&request.max_operation_ids).into()), Ok(messages::SyncRepoResponse {
max_operation_ids: (&repo.max_operation_ids).into(), operations: snapshot.operations_since(&(&request.max_operation_ids).into()),
max_operation_ids: (&snapshot.max_operation_ids).into(),
})
}) })
} }
async fn handle_publish_operations(
self,
_user: User,
request: messages::PublishOperations,
) -> Result<()> {
let repo = self
.db
.repo(request.repo_id)
.ok_or_else(|| anyhow!("repo not found"))?;
repo.apply_operations(request.operations);
Ok(())
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -593,10 +597,6 @@ impl Repo {
} }
} }
fn operations_since(&self, version: &TreeMap<ReplicaId, OperationCount>) -> Vec<Operation> {
self.read(|repo| repo.operations_since(version))
}
fn read<F, T>(&self, f: F) -> T fn read<F, T>(&self, f: F) -> T
where where
F: FnOnce(&RepoSnapshot) -> T, F: FnOnce(&RepoSnapshot) -> T,
@ -626,10 +626,19 @@ impl Repo {
if let Some(local_operation_created) = self.db.local_operation_created.as_ref() { if let Some(local_operation_created) = self.db.local_operation_created.as_ref() {
local_operation_created(self.id, operation); local_operation_created(self.id, operation);
} }
result result
}) })
.expect("repo must exist") .expect("repo must exist")
} }
fn apply_operations(&self, operations: impl IntoIterator<Item = Operation>) {
self.db
.snapshot
.lock()
.repos
.update(&self.id, |repo| todo!());
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -1280,10 +1289,6 @@ impl RepoSnapshot {
(operation, branch_id) (operation, branch_id)
} }
fn apply_operation(&mut self, operation: Operation) {
todo!()
}
fn operations_since(&self, version: &TreeMap<ReplicaId, OperationCount>) -> Vec<Operation> { fn operations_since(&self, version: &TreeMap<ReplicaId, OperationCount>) -> Vec<Operation> {
let mut new_operations = Vec::new(); let mut new_operations = Vec::new();
for (replica_id, end_op_count) in self.max_operation_ids.iter() { for (replica_id, end_op_count) in self.max_operation_ids.iter() {
@ -1364,7 +1369,7 @@ mod tests {
where where
F: 'static + Send + Future<Output = ()>, F: 'static + Send + Future<Output = ()>,
{ {
todo!() Background::spawn(self, future).detach();
} }
} }
} }