diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index 243d275e13..4d87ca9ccc 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -7,9 +7,10 @@ use anyhow::{anyhow, Result}; use call::ActiveCall; use client::RECEIVE_TIMEOUT; use collections::{BTreeMap, HashSet}; +use editor::Bias; use fs::{FakeFs, Fs as _}; use futures::StreamExt as _; -use gpui::{executor::Deterministic, ModelHandle, TestAppContext}; +use gpui::{executor::Deterministic, ModelHandle, Task, TestAppContext}; use language::{range_to_lsp, FakeLspAdapter, Language, LanguageConfig, PointUtf16}; use lsp::FakeLanguageServer; use parking_lot::Mutex; @@ -21,7 +22,10 @@ use std::{ ops::Range, path::{Path, PathBuf}, rc::Rc, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, }; use util::ResultExt; @@ -101,147 +105,21 @@ async fn test_random_collaboration( let mut next_entity_id = 100000; loop { - let Some(next_operation) = plan.lock().next_operation(&clients).await else { break }; - match next_operation { - Operation::AddConnection { user_id } => { - let username = { - let mut plan = plan.lock(); - let mut user = plan.user(user_id); - user.online = true; - user.username.clone() - }; - log::info!("Adding new connection for {}", username); - next_entity_id += 100000; - let mut client_cx = TestAppContext::new( - cx.foreground_platform(), - cx.platform(), - deterministic.build_foreground(next_entity_id), - deterministic.build_background(), - cx.font_cache(), - cx.leak_detector(), - next_entity_id, - cx.function_name.clone(), - ); - - let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded(); - let client = Rc::new(server.create_client(&mut client_cx, &username).await); - operation_channels.push(operation_tx); - clients.push((client.clone(), client_cx.clone())); - client_tasks.push(client_cx.foreground().spawn(simulate_client( - client, - operation_rx, - plan.clone(), - client_cx, - ))); - - log::info!("Added connection for {}", username); - } - - Operation::RemoveConnection { user_id } => { - log::info!("Simulating full disconnection of user {}", user_id); - let client_ix = clients - .iter() - .position(|(client, cx)| client.current_user_id(cx) == user_id) - .unwrap(); - let user_connection_ids = server - .connection_pool - .lock() - .user_connection_ids(user_id) - .collect::>(); - assert_eq!(user_connection_ids.len(), 1); - let removed_peer_id = user_connection_ids[0].into(); - let (client, mut client_cx) = clients.remove(client_ix); - let client_task = client_tasks.remove(client_ix); - operation_channels.remove(client_ix); - server.forbid_connections(); - server.disconnect_client(removed_peer_id); - deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); - deterministic.start_waiting(); - log::info!("Waiting for user {} to exit...", user_id); - client_task.await; - deterministic.finish_waiting(); - server.allow_connections(); - - for project in client.remote_projects().iter() { - project.read_with(&client_cx, |project, _| { - assert!( - project.is_read_only(), - "project {:?} should be read only", - project.remote_id() - ) - }); - } - - for (client, cx) in &clients { - let contacts = server - .app_state - .db - .get_contacts(client.current_user_id(cx)) - .await - .unwrap(); - let pool = server.connection_pool.lock(); - for contact in contacts { - if let db::Contact::Accepted { user_id: id, .. } = contact { - if pool.is_user_online(id) { - assert_ne!( - id, user_id, - "removed client is still a contact of another peer" - ); - } - } - } - } - - log::info!("{} removed", client.username); - plan.lock().user(user_id).online = false; - client_cx.update(|cx| { - cx.clear_globals(); - drop(client); - }); - } - - Operation::BounceConnection { user_id } => { - log::info!("Simulating temporary disconnection of user {}", user_id); - let user_connection_ids = server - .connection_pool - .lock() - .user_connection_ids(user_id) - .collect::>(); - assert_eq!(user_connection_ids.len(), 1); - let peer_id = user_connection_ids[0].into(); - server.disconnect_client(peer_id); - deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); - } - - Operation::RestartServer => { - log::info!("Simulating server restart"); - server.reset().await; - deterministic.advance_clock(RECEIVE_TIMEOUT); - server.start().await.unwrap(); - deterministic.advance_clock(CLEANUP_TIMEOUT); - let environment = &server.app_state.config.zed_environment; - let stale_room_ids = server - .app_state - .db - .stale_room_ids(environment, server.id()) - .await - .unwrap(); - assert_eq!(stale_room_ids, vec![]); - } - - Operation::MutateClients { user_ids, quiesce } => { - for user_id in user_ids { - let client_ix = clients - .iter() - .position(|(client, cx)| client.current_user_id(cx) == user_id) - .unwrap(); - operation_channels[client_ix].unbounded_send(()).unwrap(); - } - - if quiesce { - deterministic.run_until_parked(); - } - } + let Some((next_operation, skipped)) = plan.lock().next_server_operation(&clients) else { break }; + let applied = apply_server_operation( + deterministic.clone(), + &mut server, + &mut clients, + &mut client_tasks, + &mut operation_channels, + &mut next_entity_id, + plan.clone(), + next_operation, + cx, + ) + .await; + if !applied { + skipped.store(false, SeqCst); } } @@ -430,39 +308,216 @@ async fn test_random_collaboration( } } +async fn apply_server_operation( + deterministic: Arc, + server: &mut TestServer, + clients: &mut Vec<(Rc, TestAppContext)>, + client_tasks: &mut Vec>, + operation_channels: &mut Vec>, + next_entity_id: &mut usize, + plan: Arc>, + operation: Operation, + cx: &mut TestAppContext, +) -> bool { + match operation { + Operation::AddConnection { user_id } => { + let username; + { + let mut plan = plan.lock(); + let mut user = plan.user(user_id); + if user.online { + return false; + } + user.online = true; + username = user.username.clone(); + }; + log::info!("Adding new connection for {}", username); + *next_entity_id += 100000; + let mut client_cx = TestAppContext::new( + cx.foreground_platform(), + cx.platform(), + deterministic.build_foreground(*next_entity_id), + deterministic.build_background(), + cx.font_cache(), + cx.leak_detector(), + *next_entity_id, + cx.function_name.clone(), + ); + + let (operation_tx, operation_rx) = futures::channel::mpsc::unbounded(); + let client = Rc::new(server.create_client(&mut client_cx, &username).await); + operation_channels.push(operation_tx); + clients.push((client.clone(), client_cx.clone())); + client_tasks.push(client_cx.foreground().spawn(simulate_client( + client, + operation_rx, + plan.clone(), + client_cx, + ))); + + log::info!("Added connection for {}", username); + } + + Operation::RemoveConnection { user_id } => { + log::info!("Simulating full disconnection of user {}", user_id); + let client_ix = clients + .iter() + .position(|(client, cx)| client.current_user_id(cx) == user_id); + let Some(client_ix) = client_ix else { return false }; + let user_connection_ids = server + .connection_pool + .lock() + .user_connection_ids(user_id) + .collect::>(); + assert_eq!(user_connection_ids.len(), 1); + let removed_peer_id = user_connection_ids[0].into(); + let (client, mut client_cx) = clients.remove(client_ix); + let client_task = client_tasks.remove(client_ix); + operation_channels.remove(client_ix); + server.forbid_connections(); + server.disconnect_client(removed_peer_id); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + deterministic.start_waiting(); + log::info!("Waiting for user {} to exit...", user_id); + client_task.await; + deterministic.finish_waiting(); + server.allow_connections(); + + for project in client.remote_projects().iter() { + project.read_with(&client_cx, |project, _| { + assert!( + project.is_read_only(), + "project {:?} should be read only", + project.remote_id() + ) + }); + } + + for (client, cx) in clients { + let contacts = server + .app_state + .db + .get_contacts(client.current_user_id(cx)) + .await + .unwrap(); + let pool = server.connection_pool.lock(); + for contact in contacts { + if let db::Contact::Accepted { user_id: id, .. } = contact { + if pool.is_user_online(id) { + assert_ne!( + id, user_id, + "removed client is still a contact of another peer" + ); + } + } + } + } + + log::info!("{} removed", client.username); + plan.lock().user(user_id).online = false; + client_cx.update(|cx| { + cx.clear_globals(); + drop(client); + }); + } + + Operation::BounceConnection { user_id } => { + log::info!("Simulating temporary disconnection of user {}", user_id); + let user_connection_ids = server + .connection_pool + .lock() + .user_connection_ids(user_id) + .collect::>(); + if user_connection_ids.is_empty() { + return false; + } + assert_eq!(user_connection_ids.len(), 1); + let peer_id = user_connection_ids[0].into(); + server.disconnect_client(peer_id); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + } + + Operation::RestartServer => { + log::info!("Simulating server restart"); + server.reset().await; + deterministic.advance_clock(RECEIVE_TIMEOUT); + server.start().await.unwrap(); + deterministic.advance_clock(CLEANUP_TIMEOUT); + let environment = &server.app_state.config.zed_environment; + let stale_room_ids = server + .app_state + .db + .stale_room_ids(environment, server.id()) + .await + .unwrap(); + assert_eq!(stale_room_ids, vec![]); + } + + Operation::MutateClients { user_ids, quiesce } => { + let mut applied = false; + for user_id in user_ids { + let client_ix = clients + .iter() + .position(|(client, cx)| client.current_user_id(cx) == user_id); + let Some(client_ix) = client_ix else { continue }; + applied = true; + if let Err(err) = operation_channels[client_ix].unbounded_send(()) { + // panic!("error signaling user {}, client {}", user_id, client_ix); + } + } + + if quiesce && applied { + deterministic.run_until_parked(); + } + + return applied; + } + } + true +} + async fn apply_client_operation( client: &TestClient, operation: ClientOperation, cx: &mut TestAppContext, -) -> Result<()> { +) -> Result { match operation { ClientOperation::AcceptIncomingCall => { - log::info!("{}: accepting incoming call", client.username); - let active_call = cx.read(ActiveCall::global); + if active_call.read_with(cx, |call, _| call.incoming().borrow().is_none()) { + return Ok(false); + } + + log::info!("{}: accepting incoming call", client.username); active_call .update(cx, |call, cx| call.accept_incoming(cx)) .await?; } ClientOperation::RejectIncomingCall => { - log::info!("{}: declining incoming call", client.username); - let active_call = cx.read(ActiveCall::global); + if active_call.read_with(cx, |call, _| call.incoming().borrow().is_none()) { + return Ok(false); + } + + log::info!("{}: declining incoming call", client.username); active_call.update(cx, |call, _| call.decline_incoming())?; } ClientOperation::LeaveCall => { - log::info!("{}: hanging up", client.username); - let active_call = cx.read(ActiveCall::global); + if active_call.read_with(cx, |call, _| call.room().is_none()) { + return Ok(false); + } + + log::info!("{}: hanging up", client.username); active_call.update(cx, |call, cx| call.hang_up(cx))?; } ClientOperation::InviteContactToCall { user_id } => { - log::info!("{}: inviting {}", client.username, user_id,); - let active_call = cx.read(ActiveCall::global); + + log::info!("{}: inviting {}", client.username, user_id,); active_call .update(cx, |call, cx| call.invite(user_id.to_proto(), None, cx)) .await @@ -492,6 +547,10 @@ async fn apply_client_operation( project_root_name, new_root_path, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false) + }; + log::info!( "{}: finding/creating local worktree at {:?} to project with root path {}", client.username, @@ -499,8 +558,6 @@ async fn apply_client_operation( project_root_name ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); ensure_project_shared(&project, client, cx).await; if !client.fs.paths().await.contains(&new_root_path) { client.fs.create_dir(&new_root_path).await.unwrap(); @@ -514,21 +571,56 @@ async fn apply_client_operation( } ClientOperation::CloseRemoteProject { project_root_name } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false) + }; + log::info!( "{}: closing remote project with root path {}", client.username, project_root_name, ); - let ix = project_ix_for_root_name(&*client.remote_projects(), &project_root_name, cx) - .expect("invalid project in test operation"); - cx.update(|_| client.remote_projects_mut().remove(ix)); + let ix = client + .remote_projects() + .iter() + .position(|p| p == &project) + .unwrap(); + cx.update(|_| { + client.remote_projects_mut().remove(ix); + drop(project); + }); } ClientOperation::OpenRemoteProject { host_id, first_root_name, } => { + let active_call = cx.read(ActiveCall::global); + let project = active_call.update(cx, |call, cx| { + let room = call.room().cloned()?; + let participant = room + .read(cx) + .remote_participants() + .get(&host_id.to_proto())?; + let project_id = participant + .projects + .iter() + .find(|project| project.worktree_root_names[0] == first_root_name)? + .id; + Some(room.update(cx, |room, cx| { + room.join_project( + project_id, + client.language_registry.clone(), + FakeFs::new(cx.background().clone()), + cx, + ) + })) + }); + let Some(project) = project else { + return Ok(false) + }; + log::info!( "{}: joining remote project of user {}, root name {}", client.username, @@ -536,30 +628,7 @@ async fn apply_client_operation( first_root_name, ); - let active_call = cx.read(ActiveCall::global); - let project = active_call - .update(cx, |call, cx| { - let room = call.room().cloned()?; - let participant = room - .read(cx) - .remote_participants() - .get(&host_id.to_proto())?; - let project_id = participant - .projects - .iter() - .find(|project| project.worktree_root_names[0] == first_root_name)? - .id; - Some(room.update(cx, |room, cx| { - room.join_project( - project_id, - client.language_registry.clone(), - FakeFs::new(cx.background().clone()), - cx, - ) - })) - }) - .expect("invalid project in test operation") - .await?; + let project = project.await?; client.remote_projects_mut().push(project.clone()); } @@ -569,6 +638,13 @@ async fn apply_client_operation( full_path, is_dir, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false); + }; + let Some(project_path) = project_path_for_full_path(&project, &full_path, cx) else { + return Ok(false); + }; + log::info!( "{}: creating {} at path {:?} in {} project {}", client.username, @@ -578,11 +654,7 @@ async fn apply_client_operation( project_root_name, ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); ensure_project_shared(&project, client, cx).await; - let project_path = project_path_for_full_path(&project, &full_path, cx) - .expect("invalid worktree path in test operation"); project .update(cx, |p, cx| p.create_entry(project_path, is_dir, cx)) .unwrap() @@ -594,6 +666,13 @@ async fn apply_client_operation( is_local, full_path, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false); + }; + let Some(project_path) = project_path_for_full_path(&project, &full_path, cx) else { + return Ok(false); + }; + log::info!( "{}: opening buffer {:?} in {} project {}", client.username, @@ -602,11 +681,7 @@ async fn apply_client_operation( project_root_name, ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); ensure_project_shared(&project, client, cx).await; - let project_path = project_path_for_full_path(&project, &full_path, cx) - .expect("invalid buffer path in test operation"); let buffer = project .update(cx, |project, cx| project.open_buffer(project_path, cx)) .await?; @@ -619,6 +694,14 @@ async fn apply_client_operation( full_path, edits, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false); + }; + let Some(buffer) = + buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else { + return Ok(false); + }; + log::info!( "{}: editing buffer {:?} in {} project {} with {:?}", client.username, @@ -628,14 +711,18 @@ async fn apply_client_operation( edits ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); ensure_project_shared(&project, client, cx).await; - let buffer = - buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) - .expect("invalid buffer path in test operation"); buffer.update(cx, |buffer, cx| { - buffer.edit(edits, None, cx); + let snapshot = buffer.snapshot(); + buffer.edit( + edits.into_iter().map(|(range, text)| { + let start = snapshot.clip_offset(range.start, Bias::Left); + let end = snapshot.clip_offset(range.end, Bias::Right); + (start..end, text) + }), + None, + cx, + ); }); } @@ -644,20 +731,23 @@ async fn apply_client_operation( is_local, full_path, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false); + }; + let Some(buffer) = + buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else { + return Ok(false); + }; + log::info!( - "{}: dropping buffer {:?} in {} project {}", + "{}: closing buffer {:?} in {} project {}", client.username, full_path, if is_local { "local" } else { "remote" }, project_root_name ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); ensure_project_shared(&project, client, cx).await; - let buffer = - buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) - .expect("invalid buffer path in test operation"); cx.update(|_| { client.buffers_for_project(&project).remove(&buffer); drop(buffer); @@ -670,6 +760,14 @@ async fn apply_client_operation( full_path, detach, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false); + }; + let Some(buffer) = + buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else { + return Ok(false); + }; + log::info!( "{}: saving buffer {:?} in {} project {}{}", client.username, @@ -679,12 +777,7 @@ async fn apply_client_operation( if detach { ", detaching" } else { ", awaiting" } ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); ensure_project_shared(&project, client, cx).await; - let buffer = - buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) - .expect("invalid buffer path in test operation"); let (requested_version, save) = buffer.update(cx, |buffer, cx| (buffer.version(), buffer.save(cx))); let save = cx.background().spawn(async move { @@ -710,6 +803,14 @@ async fn apply_client_operation( kind, detach, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false); + }; + let Some(buffer) = + buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) else { + return Ok(false); + }; + log::info!( "{}: request LSP {:?} for buffer {:?} in {} project {}{}", client.username, @@ -720,11 +821,7 @@ async fn apply_client_operation( if detach { ", detaching" } else { ", awaiting" } ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); - let buffer = - buffer_for_full_path(&*client.buffers_for_project(&project), &full_path, cx) - .expect("invalid buffer path in test operation"); + let offset = buffer.read_with(cx, |b, _| b.clip_offset(offset, Bias::Left)); let request = match kind { LspRequestKind::Rename => cx.spawn(|mut cx| async move { project @@ -770,6 +867,10 @@ async fn apply_client_operation( query, detach, } => { + let Some(project) = project_for_root_name(client, &project_root_name, cx) else { + return Ok(false); + }; + log::info!( "{}: search {} project {} for {:?}{}", client.username, @@ -778,8 +879,7 @@ async fn apply_client_operation( query, if detach { ", detaching" } else { ", awaiting" } ); - let project = project_for_root_name(client, &project_root_name, cx) - .expect("invalid project in test operation"); + let search = project.update(cx, |project, cx| { project.search(SearchQuery::text(query, false, false), cx) }); @@ -797,12 +897,17 @@ async fn apply_client_operation( } ClientOperation::CreateFsEntry { path, is_dir } => { + if client.fs.metadata(&path.parent().unwrap()).await?.is_none() { + return Ok(false); + } + log::info!( "{}: creating {} at {:?}", client.username, if is_dir { "dir" } else { "file" }, path ); + if is_dir { client.fs.create_dir(&path).await.unwrap(); } else { @@ -814,13 +919,13 @@ async fn apply_client_operation( } } } - Ok(()) + Ok(true) } struct TestPlan { rng: StdRng, replay: bool, - stored_operations: Vec, + stored_operations: Vec<(StoredOperation, Arc)>, max_operations: usize, operation_ix: usize, users: Vec, @@ -962,51 +1067,57 @@ impl TestPlan { fn load(&mut self, path: &Path) { let json = std::fs::read_to_string(path).unwrap(); self.replay = true; - self.stored_operations = serde_json::from_str(&json).unwrap(); + let stored_operations: Vec = serde_json::from_str(&json).unwrap(); + self.stored_operations = stored_operations + .into_iter() + .map(|operation| (operation, Arc::new(AtomicBool::new(false)))) + .collect() } fn save(&mut self, path: &Path) { // Format each operation as one line let mut json = Vec::new(); json.push(b'['); - for (i, stored_operation) in self.stored_operations.iter().enumerate() { - if i > 0 { + for (operation, skipped) in &self.stored_operations { + if skipped.load(SeqCst) { + continue; + } + if json.len() > 1 { json.push(b','); } json.extend_from_slice(b"\n "); - serde_json::to_writer(&mut json, stored_operation).unwrap(); + serde_json::to_writer(&mut json, operation).unwrap(); } json.extend_from_slice(b"\n]\n"); std::fs::write(path, &json).unwrap(); } - async fn next_operation( + fn next_server_operation( &mut self, clients: &[(Rc, TestAppContext)], - ) -> Option { + ) -> Option<(Operation, Arc)> { if self.replay { while let Some(stored_operation) = self.stored_operations.get(self.operation_ix) { self.operation_ix += 1; - if let StoredOperation::Server(operation) = stored_operation { - return Some(operation.clone()); + if let (StoredOperation::Server(operation), skipped) = stored_operation { + return Some((operation.clone(), skipped.clone())); } } None } else { - let operation = self.generate_operation(clients).await; - if let Some(operation) = &operation { - self.stored_operations - .push(StoredOperation::Server(operation.clone())) - } - operation + let operation = self.generate_server_operation(clients)?; + let skipped = Arc::new(AtomicBool::new(false)); + self.stored_operations + .push((StoredOperation::Server(operation.clone()), skipped.clone())); + Some((operation, skipped)) } } - async fn next_client_operation( + fn next_client_operation( &mut self, client: &TestClient, cx: &TestAppContext, - ) -> Option { + ) -> Option<(ClientOperation, Arc)> { let current_user_id = client.current_user_id(cx); let user_ix = self .users @@ -1018,28 +1129,29 @@ impl TestPlan { if self.replay { while let Some(stored_operation) = self.stored_operations.get(user_plan.operation_ix) { user_plan.operation_ix += 1; - if let StoredOperation::Client { user_id, operation } = stored_operation { + if let (StoredOperation::Client { user_id, operation }, skipped) = stored_operation + { if user_id == ¤t_user_id { - return Some(operation.clone()); + return Some((operation.clone(), skipped.clone())); } } } None } else { - let operation = self - .generate_client_operation(current_user_id, client, cx) - .await; - if let Some(operation) = &operation { - self.stored_operations.push(StoredOperation::Client { + let operation = self.generate_client_operation(current_user_id, client, cx)?; + let skipped = Arc::new(AtomicBool::new(false)); + self.stored_operations.push(( + StoredOperation::Client { user_id: current_user_id, operation: operation.clone(), - }) - } - operation + }, + skipped.clone(), + )); + Some((operation, skipped)) } } - async fn generate_operation( + fn generate_server_operation( &mut self, clients: &[(Rc, TestAppContext)], ) -> Option { @@ -1091,7 +1203,7 @@ impl TestPlan { .collect(); Operation::MutateClients { user_ids, - quiesce: self.rng.gen(), + quiesce: self.rng.gen_bool(0.7), } } _ => continue, @@ -1099,7 +1211,7 @@ impl TestPlan { }) } - async fn generate_client_operation( + fn generate_client_operation( &mut self, user_id: UserId, client: &TestClient, @@ -1221,11 +1333,11 @@ impl TestPlan { // Add a worktree to a local project 0..=50 => { let Some(project) = client - .local_projects() - .choose(&mut self.rng) - .cloned() else { continue }; + .local_projects() + .choose(&mut self.rng) + .cloned() else { continue }; let project_root_name = root_name_for_project(&project, cx); - let mut paths = client.fs.paths().await; + let mut paths = cx.background().block(client.fs.paths()); paths.remove(0); let new_root_path = if paths.is_empty() || self.rng.gen() { Path::new("/").join(&self.next_root_dir_name(user_id)) @@ -1396,10 +1508,9 @@ impl TestPlan { // Create a file or directory 96.. => { let is_dir = self.rng.gen::(); - let mut path = client - .fs - .directories() - .await + let mut path = cx + .background() + .block(client.fs.directories()) .choose(&mut self.rng) .unwrap() .clone(); @@ -1501,10 +1612,9 @@ async fn simulate_client( let plan = plan.clone(); async move { let files = fs.files().await; - let mut plan = plan.lock(); - let count = plan.rng.gen_range::(1..3); + let count = plan.lock().rng.gen_range::(1..3); let files = (0..count) - .map(|_| files.choose(&mut plan.rng).unwrap()) + .map(|_| files.choose(&mut plan.lock().rng).unwrap()) .collect::>(); log::info!("LSP: Returning definitions in files {:?}", &files); Ok(Some(lsp::GotoDefinitionResponse::Array( @@ -1552,9 +1662,16 @@ async fn simulate_client( client.language_registry.add(Arc::new(language)); while operation_rx.next().await.is_some() { - let Some(operation) = plan.lock().next_client_operation(&client, &cx).await else { break }; - if let Err(error) = apply_client_operation(&client, operation, &mut cx).await { - log::error!("{} error: {}", client.username, error); + let Some((operation, skipped)) = plan.lock().next_client_operation(&client, &cx) else { break }; + match apply_client_operation(&client, operation, &mut cx).await { + Err(error) => { + log::error!("{} error: {}", client.username, error); + } + Ok(applied) => { + if !applied { + skipped.store(true, SeqCst); + } + } } cx.background().simulate_random_delay().await; }