diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index cd51a2e1f8..0c2f7ce288 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -322,7 +322,7 @@ async fn apply_server_operation( server: &mut TestServer, clients: &mut Vec<(Rc, TestAppContext)>, client_tasks: &mut Vec>, - operation_channels: &mut Vec>, + operation_channels: &mut Vec>, next_entity_id: &mut usize, plan: Arc>, operation: Operation, @@ -462,7 +462,11 @@ async fn apply_server_operation( assert_eq!(stale_room_ids, vec![]); } - Operation::MutateClients { user_ids, quiesce } => { + Operation::MutateClients { + user_ids, + batch_id, + quiesce, + } => { let mut applied = false; for user_id in user_ids { let client_ix = clients @@ -470,7 +474,7 @@ async fn apply_server_operation( .position(|(client, cx)| client.current_user_id(cx) == user_id); let Some(client_ix) = client_ix else { continue }; applied = true; - if let Err(err) = operation_channels[client_ix].unbounded_send(()) { + if let Err(err) = operation_channels[client_ix].unbounded_send(batch_id) { // panic!("error signaling user {}, client {}", user_id, client_ix); } } @@ -970,6 +974,7 @@ struct TestPlan { max_operations: usize, operation_ix: usize, users: Vec, + next_batch_id: usize, allow_server_restarts: bool, allow_client_reconnection: bool, allow_client_disconnection: bool, @@ -989,6 +994,7 @@ enum StoredOperation { Server(Operation), Client { user_id: UserId, + batch_id: usize, operation: ClientOperation, }, } @@ -1006,6 +1012,9 @@ enum Operation { }, RestartServer, MutateClients { + batch_id: usize, + #[serde(skip_serializing)] + #[serde(skip_deserializing)] user_ids: Vec, quiesce: bool, }, @@ -1103,6 +1112,7 @@ impl TestPlan { allow_client_disconnection: rng.gen_bool(0.1), stored_operations: Vec::new(), operation_ix: 0, + next_batch_id: 0, max_operations, users, rng, @@ -1114,8 +1124,32 @@ impl TestPlan { self.replay = true; let stored_operations: Vec = serde_json::from_str(&json).unwrap(); self.stored_operations = stored_operations - .into_iter() - .map(|operation| (operation, Arc::new(AtomicBool::new(false)))) + .iter() + .cloned() + .enumerate() + .map(|(i, mut operation)| { + if let StoredOperation::Server(Operation::MutateClients { + batch_id: current_batch_id, + user_ids, + .. + }) = &mut operation + { + assert!(user_ids.is_empty()); + user_ids.extend(stored_operations[i + 1..].iter().filter_map(|operation| { + if let StoredOperation::Client { + user_id, batch_id, .. + } = operation + { + if batch_id == current_batch_id { + return Some(user_id); + } + } + None + })); + user_ids.sort_unstable(); + } + (operation, Arc::new(AtomicBool::new(false))) + }) .collect() } @@ -1161,6 +1195,7 @@ impl TestPlan { fn next_client_operation( &mut self, client: &TestClient, + current_batch_id: usize, cx: &TestAppContext, ) -> Option<(ClientOperation, Arc)> { let current_user_id = client.current_user_id(cx); @@ -1174,7 +1209,12 @@ impl TestPlan { if self.replay { while let Some(stored_operation) = self.stored_operations.get(user_plan.operation_ix) { user_plan.operation_ix += 1; - if let (StoredOperation::Client { user_id, operation }, skipped) = stored_operation + if let ( + StoredOperation::Client { + user_id, operation, .. + }, + skipped, + ) = stored_operation { if user_id == ¤t_user_id { return Some((operation.clone(), skipped.clone())); @@ -1188,6 +1228,7 @@ impl TestPlan { self.stored_operations.push(( StoredOperation::Client { user_id: current_user_id, + batch_id: current_batch_id, operation: operation.clone(), }, skipped.clone(), @@ -1239,15 +1280,18 @@ impl TestPlan { .rng .gen_range(1..10) .min(self.max_operations - self.operation_ix); - let user_ids = (0..count) + let batch_id = util::post_inc(&mut self.next_batch_id); + let mut user_ids = (0..count) .map(|_| { let ix = self.rng.gen_range(0..clients.len()); let (client, cx) = &clients[ix]; client.current_user_id(cx) }) - .collect(); + .collect::>(); + user_ids.sort_unstable(); Operation::MutateClients { user_ids, + batch_id, quiesce: self.rng.gen_bool(0.7), } } @@ -1625,7 +1669,7 @@ impl TestPlan { async fn simulate_client( client: Rc, - mut operation_rx: futures::channel::mpsc::UnboundedReceiver<()>, + mut operation_rx: futures::channel::mpsc::UnboundedReceiver, plan: Arc>, mut cx: TestAppContext, ) { @@ -1740,8 +1784,8 @@ async fn simulate_client( .await; client.language_registry.add(Arc::new(language)); - while operation_rx.next().await.is_some() { - let Some((operation, skipped)) = plan.lock().next_client_operation(&client, &cx) else { break }; + while let Some(batch_id) = operation_rx.next().await { + let Some((operation, skipped)) = plan.lock().next_client_operation(&client, batch_id, &cx) else { break }; match apply_client_operation(&client, operation, &mut cx).await { Err(error) => { log::error!("{} error: {}", client.username, error);