From 2495d6581efcef080cc33eaa3928ee9d487dbc34 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Sat, 26 Aug 2023 01:31:52 +0200 Subject: [PATCH] Un serialize project search (#2857) This is the first batch of improvements to current project search. There are few things we can do better still, but I want to get this out in next Preview. Most of the slowness at this point seems to stem from updating UI too often. Release Notes: - Improved project search by making it report results sooner. --------- Co-authored-by: Julia Risley --- crates/collab/src/tests/integration_tests.rs | 21 +- .../src/tests/randomized_integration_tests.rs | 18 +- crates/editor/src/multi_buffer.rs | 94 ++- crates/project/src/project.rs | 583 ++++++++++++------ crates/project/src/project_tests.rs | 11 +- crates/search/src/project_search.rs | 62 +- 6 files changed, 489 insertions(+), 300 deletions(-) diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 9bee8d434c..b1227b9501 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -4,7 +4,7 @@ use crate::{ }; use call::{room, ActiveCall, ParticipantLocation, Room}; use client::{User, RECEIVE_TIMEOUT}; -use collections::HashSet; +use collections::{HashMap, HashSet}; use editor::{ test::editor_test_context::EditorTestContext, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, ExcerptRange, MultiBuffer, Redo, Rename, ToggleCodeActions, Undo, @@ -4821,15 +4821,16 @@ async fn test_project_search( let project_b = client_b.build_remote_project(project_id, cx_b).await; // Perform a search as the guest. - let results = project_b - .update(cx_b, |project, cx| { - project.search( - SearchQuery::text("world", false, false, Vec::new(), Vec::new()), - cx, - ) - }) - .await - .unwrap(); + let mut results = HashMap::default(); + let mut search_rx = project_b.update(cx_b, |project, cx| { + project.search( + SearchQuery::text("world", false, false, Vec::new(), Vec::new()), + cx, + ) + }); + while let Some((buffer, ranges)) = search_rx.next().await { + results.entry(buffer).or_insert(ranges); + } let mut ranges_by_path = results .into_iter() diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index 3557843828..814f248b6d 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -6,7 +6,7 @@ use crate::{ use anyhow::{anyhow, Result}; use call::ActiveCall; use client::RECEIVE_TIMEOUT; -use collections::BTreeMap; +use collections::{BTreeMap, HashMap}; use editor::Bias; use fs::{repository::GitFileStatus, FakeFs, Fs as _}; use futures::StreamExt as _; @@ -722,7 +722,7 @@ async fn apply_client_operation( if detach { "detaching" } else { "awaiting" } ); - let search = project.update(cx, |project, cx| { + let mut search = project.update(cx, |project, cx| { project.search( SearchQuery::text(query, false, false, Vec::new(), Vec::new()), cx, @@ -730,15 +730,13 @@ async fn apply_client_operation( }); drop(project); let search = cx.background().spawn(async move { - search - .await - .map_err(|err| anyhow!("search request failed: {:?}", err)) + let mut results = HashMap::default(); + while let Some((buffer, ranges)) = search.next().await { + results.entry(buffer).or_insert(ranges); + } + results }); - if detach { - cx.update(|cx| search.detach_and_log_err(cx)); - } else { - search.await?; - } + search.await; } ClientOperation::WriteFsEntry { diff --git a/crates/editor/src/multi_buffer.rs b/crates/editor/src/multi_buffer.rs index b9bf4f52a1..5c0d8b641c 100644 --- a/crates/editor/src/multi_buffer.rs +++ b/crates/editor/src/multi_buffer.rs @@ -6,7 +6,7 @@ use clock::ReplicaId; use collections::{BTreeMap, Bound, HashMap, HashSet}; use futures::{channel::mpsc, SinkExt}; use git::diff::DiffHunk; -use gpui::{AppContext, Entity, ModelContext, ModelHandle, Task}; +use gpui::{AppContext, Entity, ModelContext, ModelHandle}; pub use language::Completion; use language::{ char_kind, @@ -788,59 +788,59 @@ impl MultiBuffer { pub fn stream_excerpts_with_context_lines( &mut self, - excerpts: Vec<(ModelHandle, Vec>)>, + buffer: ModelHandle, + ranges: Vec>, context_line_count: u32, cx: &mut ModelContext, - ) -> (Task<()>, mpsc::Receiver>) { + ) -> mpsc::Receiver> { let (mut tx, rx) = mpsc::channel(256); - let task = cx.spawn(|this, mut cx| async move { - for (buffer, ranges) in excerpts { - let (buffer_id, buffer_snapshot) = - buffer.read_with(&cx, |buffer, _| (buffer.remote_id(), buffer.snapshot())); + cx.spawn(|this, mut cx| async move { + let (buffer_id, buffer_snapshot) = + buffer.read_with(&cx, |buffer, _| (buffer.remote_id(), buffer.snapshot())); - let mut excerpt_ranges = Vec::new(); - let mut range_counts = Vec::new(); - cx.background() - .scoped(|scope| { - scope.spawn(async { - let (ranges, counts) = - build_excerpt_ranges(&buffer_snapshot, &ranges, context_line_count); - excerpt_ranges = ranges; - range_counts = counts; - }); - }) - .await; - - let mut ranges = ranges.into_iter(); - let mut range_counts = range_counts.into_iter(); - for excerpt_ranges in excerpt_ranges.chunks(100) { - let excerpt_ids = this.update(&mut cx, |this, cx| { - this.push_excerpts(buffer.clone(), excerpt_ranges.iter().cloned(), cx) + let mut excerpt_ranges = Vec::new(); + let mut range_counts = Vec::new(); + cx.background() + .scoped(|scope| { + scope.spawn(async { + let (ranges, counts) = + build_excerpt_ranges(&buffer_snapshot, &ranges, context_line_count); + excerpt_ranges = ranges; + range_counts = counts; }); + }) + .await; - for (excerpt_id, range_count) in - excerpt_ids.into_iter().zip(range_counts.by_ref()) - { - for range in ranges.by_ref().take(range_count) { - let start = Anchor { - buffer_id: Some(buffer_id), - excerpt_id: excerpt_id.clone(), - text_anchor: range.start, - }; - let end = Anchor { - buffer_id: Some(buffer_id), - excerpt_id: excerpt_id.clone(), - text_anchor: range.end, - }; - if tx.send(start..end).await.is_err() { - break; - } + let mut ranges = ranges.into_iter(); + let mut range_counts = range_counts.into_iter(); + for excerpt_ranges in excerpt_ranges.chunks(100) { + let excerpt_ids = this.update(&mut cx, |this, cx| { + this.push_excerpts(buffer.clone(), excerpt_ranges.iter().cloned(), cx) + }); + + for (excerpt_id, range_count) in excerpt_ids.into_iter().zip(range_counts.by_ref()) + { + for range in ranges.by_ref().take(range_count) { + let start = Anchor { + buffer_id: Some(buffer_id), + excerpt_id: excerpt_id.clone(), + text_anchor: range.start, + }; + let end = Anchor { + buffer_id: Some(buffer_id), + excerpt_id: excerpt_id.clone(), + text_anchor: range.end, + }; + if tx.send(start..end).await.is_err() { + break; } } } } - }); - (task, rx) + }) + .detach(); + + rx } pub fn push_excerpts( @@ -4438,7 +4438,7 @@ mod tests { async fn test_stream_excerpts_with_context_lines(cx: &mut TestAppContext) { let buffer = cx.add_model(|cx| Buffer::new(0, sample_text(20, 3, 'a'), cx)); let multibuffer = cx.add_model(|_| MultiBuffer::new(0)); - let (task, anchor_ranges) = multibuffer.update(cx, |multibuffer, cx| { + let anchor_ranges = multibuffer.update(cx, |multibuffer, cx| { let snapshot = buffer.read(cx); let ranges = vec![ snapshot.anchor_before(Point::new(3, 2))..snapshot.anchor_before(Point::new(4, 2)), @@ -4446,12 +4446,10 @@ mod tests { snapshot.anchor_before(Point::new(15, 0)) ..snapshot.anchor_before(Point::new(15, 0)), ]; - multibuffer.stream_excerpts_with_context_lines(vec![(buffer.clone(), ranges)], 2, cx) + multibuffer.stream_excerpts_with_context_lines(buffer.clone(), ranges, 2, cx) }); let anchor_ranges = anchor_ranges.collect::>().await; - // Ensure task is finished when stream completes. - task.await; let snapshot = multibuffer.read_with(cx, |multibuffer, cx| multibuffer.snapshot(cx)); assert_eq!( diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index c7765bf55a..bb18a41ad4 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -26,8 +26,8 @@ use futures::{ }; use globset::{Glob, GlobSet, GlobSetBuilder}; use gpui::{ - AnyModelHandle, AppContext, AsyncAppContext, BorrowAppContext, Entity, ModelContext, - ModelHandle, Task, WeakModelHandle, + executor::Background, AnyModelHandle, AppContext, AsyncAppContext, BorrowAppContext, Entity, + ModelContext, ModelHandle, Task, WeakModelHandle, }; use itertools::Itertools; use language::{ @@ -37,11 +37,11 @@ use language::{ deserialize_anchor, deserialize_fingerprint, deserialize_line_ending, deserialize_version, serialize_anchor, serialize_version, }, - range_from_lsp, range_to_lsp, Bias, Buffer, CachedLspAdapter, CodeAction, CodeLabel, - Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, File as _, - Language, LanguageRegistry, LanguageServerName, LocalFile, LspAdapterDelegate, OffsetRangeExt, - Operation, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot, ToOffset, - ToPointUtf16, Transaction, Unclipped, + range_from_lsp, range_to_lsp, Bias, Buffer, BufferSnapshot, CachedLspAdapter, CodeAction, + CodeLabel, Completion, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Event as BufferEvent, + File as _, Language, LanguageRegistry, LanguageServerName, LocalFile, LspAdapterDelegate, + OffsetRangeExt, Operation, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot, + ToOffset, ToPointUtf16, Transaction, Unclipped, }; use log::error; use lsp::{ @@ -57,8 +57,8 @@ use serde::Serialize; use settings::SettingsStore; use sha2::{Digest, Sha256}; use similar::{ChangeTag, TextDiff}; +use smol::channel::{Receiver, Sender}; use std::{ - cell::RefCell, cmp::{self, Ordering}, convert::TryInto, hash::Hash, @@ -67,7 +67,6 @@ use std::{ ops::Range, path::{self, Component, Path, PathBuf}, process::Stdio, - rc::Rc, str, sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, @@ -525,6 +524,28 @@ impl FormatTrigger { } } } +#[derive(Clone, Debug, PartialEq)] +enum SearchMatchCandidate { + OpenBuffer { + buffer: ModelHandle, + // This might be an unnamed file without representation on filesystem + path: Option>, + }, + Path { + worktree_id: WorktreeId, + path: Arc, + }, +} + +type SearchMatchCandidateIndex = usize; +impl SearchMatchCandidate { + fn path(&self) -> Option> { + match self { + SearchMatchCandidate::OpenBuffer { path, .. } => path.clone(), + SearchMatchCandidate::Path { path, .. } => Some(path.clone()), + } + } +} impl Project { pub fn init_settings(cx: &mut AppContext) { @@ -5099,187 +5120,11 @@ impl Project { &self, query: SearchQuery, cx: &mut ModelContext, - ) -> Task, Vec>>>> { + ) -> Receiver<(ModelHandle, Vec>)> { if self.is_local() { - let snapshots = self - .visible_worktrees(cx) - .filter_map(|tree| { - let tree = tree.read(cx).as_local()?; - Some(tree.snapshot()) - }) - .collect::>(); - - let background = cx.background().clone(); - let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum(); - if path_count == 0 { - return Task::ready(Ok(Default::default())); - } - let workers = background.num_cpus().min(path_count); - let (matching_paths_tx, mut matching_paths_rx) = smol::channel::bounded(1024); - cx.background() - .spawn({ - let fs = self.fs.clone(); - let background = cx.background().clone(); - let query = query.clone(); - async move { - let fs = &fs; - let query = &query; - let matching_paths_tx = &matching_paths_tx; - let paths_per_worker = (path_count + workers - 1) / workers; - let snapshots = &snapshots; - background - .scoped(|scope| { - for worker_ix in 0..workers { - let worker_start_ix = worker_ix * paths_per_worker; - let worker_end_ix = worker_start_ix + paths_per_worker; - scope.spawn(async move { - let mut snapshot_start_ix = 0; - let mut abs_path = PathBuf::new(); - for snapshot in snapshots { - let snapshot_end_ix = - snapshot_start_ix + snapshot.visible_file_count(); - if worker_end_ix <= snapshot_start_ix { - break; - } else if worker_start_ix > snapshot_end_ix { - snapshot_start_ix = snapshot_end_ix; - continue; - } else { - let start_in_snapshot = worker_start_ix - .saturating_sub(snapshot_start_ix); - let end_in_snapshot = - cmp::min(worker_end_ix, snapshot_end_ix) - - snapshot_start_ix; - - for entry in snapshot - .files(false, start_in_snapshot) - .take(end_in_snapshot - start_in_snapshot) - { - if matching_paths_tx.is_closed() { - break; - } - let matches = if query - .file_matches(Some(&entry.path)) - { - abs_path.clear(); - abs_path.push(&snapshot.abs_path()); - abs_path.push(&entry.path); - if let Some(file) = - fs.open_sync(&abs_path).await.log_err() - { - query.detect(file).unwrap_or(false) - } else { - false - } - } else { - false - }; - - if matches { - let project_path = - (snapshot.id(), entry.path.clone()); - if matching_paths_tx - .send(project_path) - .await - .is_err() - { - break; - } - } - } - - snapshot_start_ix = snapshot_end_ix; - } - } - }); - } - }) - .await; - } - }) - .detach(); - - let (buffers_tx, buffers_rx) = smol::channel::bounded(1024); - let open_buffers = self - .opened_buffers - .values() - .filter_map(|b| b.upgrade(cx)) - .collect::>(); - cx.spawn(|this, cx| async move { - for buffer in &open_buffers { - let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot()); - buffers_tx.send((buffer.clone(), snapshot)).await?; - } - - let open_buffers = Rc::new(RefCell::new(open_buffers)); - while let Some(project_path) = matching_paths_rx.next().await { - if buffers_tx.is_closed() { - break; - } - - let this = this.clone(); - let open_buffers = open_buffers.clone(); - let buffers_tx = buffers_tx.clone(); - cx.spawn(|mut cx| async move { - if let Some(buffer) = this - .update(&mut cx, |this, cx| this.open_buffer(project_path, cx)) - .await - .log_err() - { - if open_buffers.borrow_mut().insert(buffer.clone()) { - let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot()); - buffers_tx.send((buffer, snapshot)).await?; - } - } - - Ok::<_, anyhow::Error>(()) - }) - .detach(); - } - - Ok::<_, anyhow::Error>(()) - }) - .detach_and_log_err(cx); - - let background = cx.background().clone(); - cx.background().spawn(async move { - let query = &query; - let mut matched_buffers = Vec::new(); - for _ in 0..workers { - matched_buffers.push(HashMap::default()); - } - background - .scoped(|scope| { - for worker_matched_buffers in matched_buffers.iter_mut() { - let mut buffers_rx = buffers_rx.clone(); - scope.spawn(async move { - while let Some((buffer, snapshot)) = buffers_rx.next().await { - let buffer_matches = if query.file_matches( - snapshot.file().map(|file| file.path().as_ref()), - ) { - query - .search(&snapshot, None) - .await - .iter() - .map(|range| { - snapshot.anchor_before(range.start) - ..snapshot.anchor_after(range.end) - }) - .collect() - } else { - Vec::new() - }; - if !buffer_matches.is_empty() { - worker_matched_buffers - .insert(buffer.clone(), buffer_matches); - } - } - }); - } - }) - .await; - Ok(matched_buffers.into_iter().flatten().collect()) - }) + self.search_local(query, cx) } else if let Some(project_id) = self.remote_id() { + let (tx, rx) = smol::channel::unbounded(); let request = self.client.request(query.to_proto(project_id)); cx.spawn(|this, mut cx| async move { let response = request.await?; @@ -5303,13 +5148,303 @@ impl Project { .or_insert(Vec::new()) .push(start..end) } - Ok(result) + for (buffer, ranges) in result { + let _ = tx.send((buffer, ranges)).await; + } + Result::<(), anyhow::Error>::Ok(()) }) + .detach_and_log_err(cx); + rx } else { - Task::ready(Ok(Default::default())) + unimplemented!(); } } + pub fn search_local( + &self, + query: SearchQuery, + cx: &mut ModelContext, + ) -> Receiver<(ModelHandle, Vec>)> { + // Local search is split into several phases. + // TL;DR is that we do 2 passes; initial pass to pick files which contain at least one match + // and the second phase that finds positions of all the matches found in the candidate files. + // The Receiver obtained from this function returns matches sorted by buffer path. Files without a buffer path are reported first. + // + // It gets a bit hairy though, because we must account for files that do not have a persistent representation + // on FS. Namely, if you have an untitled buffer or unsaved changes in a buffer, we want to scan that too. + // + // 1. We initialize a queue of match candidates and feed all opened buffers into it (== unsaved files / untitled buffers). + // Then, we go through a worktree and check for files that do match a predicate. If the file had an opened version, we skip the scan + // of FS version for that file altogether - after all, what we have in memory is more up-to-date than what's in FS. + // 2. At this point, we have a list of all potentially matching buffers/files. + // We sort that list by buffer path - this list is retained for later use. + // We ensure that all buffers are now opened and available in project. + // 3. We run a scan over all the candidate buffers on multiple background threads. + // We cannot assume that there will even be a match - while at least one match + // is guaranteed for files obtained from FS, the buffers we got from memory (unsaved files/unnamed buffers) might not have a match at all. + // There is also an auxilliary background thread responsible for result gathering. + // This is where the sorted list of buffers comes into play to maintain sorted order; Whenever this background thread receives a notification (buffer has/doesn't have matches), + // it keeps it around. It reports matches in sorted order, though it accepts them in unsorted order as well. + // As soon as the match info on next position in sorted order becomes available, it reports it (if it's a match) or skips to the next + // entry - which might already be available thanks to out-of-order processing. + // + // We could also report matches fully out-of-order, without maintaining a sorted list of matching paths. + // This however would mean that project search (that is the main user of this function) would have to do the sorting itself, on the go. + // This isn't as straightforward as running an insertion sort sadly, and would also mean that it would have to care about maintaining match index + // in face of constantly updating list of sorted matches. + // Meanwhile, this implementation offers index stability, since the matches are already reported in a sorted order. + let snapshots = self + .visible_worktrees(cx) + .filter_map(|tree| { + let tree = tree.read(cx).as_local()?; + Some(tree.snapshot()) + }) + .collect::>(); + + let background = cx.background().clone(); + let path_count: usize = snapshots.iter().map(|s| s.visible_file_count()).sum(); + if path_count == 0 { + let (_, rx) = smol::channel::bounded(1024); + return rx; + } + let workers = background.num_cpus().min(path_count); + let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024); + let mut unnamed_files = vec![]; + let opened_buffers = self + .opened_buffers + .iter() + .filter_map(|(_, b)| { + let buffer = b.upgrade(cx)?; + let snapshot = buffer.read_with(cx, |buffer, _| buffer.snapshot()); + if let Some(path) = snapshot.file().map(|file| file.path()) { + Some((path.clone(), (buffer, snapshot))) + } else { + unnamed_files.push(buffer); + None + } + }) + .collect(); + cx.background() + .spawn(Self::background_search( + unnamed_files, + opened_buffers, + cx.background().clone(), + self.fs.clone(), + workers, + query.clone(), + path_count, + snapshots, + matching_paths_tx, + )) + .detach(); + + let (buffers, buffers_rx) = Self::sort_candidates_and_open_buffers(matching_paths_rx, cx); + let background = cx.background().clone(); + let (result_tx, result_rx) = smol::channel::bounded(1024); + cx.background() + .spawn(async move { + let Ok(buffers) = buffers.await else { + return; + }; + + let buffers_len = buffers.len(); + if buffers_len == 0 { + return; + } + let query = &query; + let (finished_tx, mut finished_rx) = smol::channel::unbounded(); + background + .scoped(|scope| { + #[derive(Clone)] + struct FinishedStatus { + entry: Option<(ModelHandle, Vec>)>, + buffer_index: SearchMatchCandidateIndex, + } + + for _ in 0..workers { + let finished_tx = finished_tx.clone(); + let mut buffers_rx = buffers_rx.clone(); + scope.spawn(async move { + while let Some((entry, buffer_index)) = buffers_rx.next().await { + let buffer_matches = if let Some((_, snapshot)) = entry.as_ref() + { + if query.file_matches( + snapshot.file().map(|file| file.path().as_ref()), + ) { + query + .search(&snapshot, None) + .await + .iter() + .map(|range| { + snapshot.anchor_before(range.start) + ..snapshot.anchor_after(range.end) + }) + .collect() + } else { + Vec::new() + } + } else { + Vec::new() + }; + + let status = if !buffer_matches.is_empty() { + let entry = if let Some((buffer, _)) = entry.as_ref() { + Some((buffer.clone(), buffer_matches)) + } else { + None + }; + FinishedStatus { + entry, + buffer_index, + } + } else { + FinishedStatus { + entry: None, + buffer_index, + } + }; + if finished_tx.send(status).await.is_err() { + break; + } + } + }); + } + // Report sorted matches + scope.spawn(async move { + let mut current_index = 0; + let mut scratch = vec![None; buffers_len]; + while let Some(status) = finished_rx.next().await { + debug_assert!( + scratch[status.buffer_index].is_none(), + "Got match status of position {} twice", + status.buffer_index + ); + let index = status.buffer_index; + scratch[index] = Some(status); + while current_index < buffers_len { + let Some(current_entry) = scratch[current_index].take() else { + // We intentionally **do not** increment `current_index` here. When next element arrives + // from `finished_rx`, we will inspect the same position again, hoping for it to be Some(_) + // this time. + break; + }; + if let Some(entry) = current_entry.entry { + result_tx.send(entry).await.log_err(); + } + current_index += 1; + } + if current_index == buffers_len { + break; + } + } + }); + }) + .await; + }) + .detach(); + result_rx + } + /// Pick paths that might potentially contain a match of a given search query. + async fn background_search( + unnamed_buffers: Vec>, + opened_buffers: HashMap, (ModelHandle, BufferSnapshot)>, + background: Arc, + fs: Arc, + workers: usize, + query: SearchQuery, + path_count: usize, + snapshots: Vec, + matching_paths_tx: Sender, + ) { + let fs = &fs; + let query = &query; + let matching_paths_tx = &matching_paths_tx; + let snapshots = &snapshots; + let paths_per_worker = (path_count + workers - 1) / workers; + for buffer in unnamed_buffers { + matching_paths_tx + .send(SearchMatchCandidate::OpenBuffer { + buffer: buffer.clone(), + path: None, + }) + .await + .log_err(); + } + for (path, (buffer, _)) in opened_buffers.iter() { + matching_paths_tx + .send(SearchMatchCandidate::OpenBuffer { + buffer: buffer.clone(), + path: Some(path.clone()), + }) + .await + .log_err(); + } + background + .scoped(|scope| { + for worker_ix in 0..workers { + let worker_start_ix = worker_ix * paths_per_worker; + let worker_end_ix = worker_start_ix + paths_per_worker; + let unnamed_buffers = opened_buffers.clone(); + scope.spawn(async move { + let mut snapshot_start_ix = 0; + let mut abs_path = PathBuf::new(); + for snapshot in snapshots { + let snapshot_end_ix = snapshot_start_ix + snapshot.visible_file_count(); + if worker_end_ix <= snapshot_start_ix { + break; + } else if worker_start_ix > snapshot_end_ix { + snapshot_start_ix = snapshot_end_ix; + continue; + } else { + let start_in_snapshot = + worker_start_ix.saturating_sub(snapshot_start_ix); + let end_in_snapshot = + cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix; + + for entry in snapshot + .files(false, start_in_snapshot) + .take(end_in_snapshot - start_in_snapshot) + { + if matching_paths_tx.is_closed() { + break; + } + if unnamed_buffers.contains_key(&entry.path) { + continue; + } + let matches = if query.file_matches(Some(&entry.path)) { + abs_path.clear(); + abs_path.push(&snapshot.abs_path()); + abs_path.push(&entry.path); + if let Some(file) = fs.open_sync(&abs_path).await.log_err() + { + query.detect(file).unwrap_or(false) + } else { + false + } + } else { + false + }; + + if matches { + let project_path = SearchMatchCandidate::Path { + worktree_id: snapshot.id(), + path: entry.path.clone(), + }; + if matching_paths_tx.send(project_path).await.is_err() { + break; + } + } + } + + snapshot_start_ix = snapshot_end_ix; + } + } + }); + } + }) + .await; + } + // TODO: Wire this up to allow selecting a server? fn request_lsp( &self, @@ -5384,6 +5519,61 @@ impl Project { Task::ready(Ok(Default::default())) } + fn sort_candidates_and_open_buffers( + mut matching_paths_rx: Receiver, + cx: &mut ModelContext, + ) -> ( + futures::channel::oneshot::Receiver>, + Receiver<( + Option<(ModelHandle, BufferSnapshot)>, + SearchMatchCandidateIndex, + )>, + ) { + let (buffers_tx, buffers_rx) = smol::channel::bounded(1024); + let (sorted_buffers_tx, sorted_buffers_rx) = futures::channel::oneshot::channel(); + cx.spawn(|this, cx| async move { + let mut buffers = vec![]; + while let Some(entry) = matching_paths_rx.next().await { + buffers.push(entry); + } + buffers.sort_by_key(|candidate| candidate.path()); + let matching_paths = buffers.clone(); + let _ = sorted_buffers_tx.send(buffers); + for (index, candidate) in matching_paths.into_iter().enumerate() { + if buffers_tx.is_closed() { + break; + } + let this = this.clone(); + let buffers_tx = buffers_tx.clone(); + cx.spawn(|mut cx| async move { + let buffer = match candidate { + SearchMatchCandidate::OpenBuffer { buffer, .. } => Some(buffer), + SearchMatchCandidate::Path { worktree_id, path } => this + .update(&mut cx, |this, cx| { + this.open_buffer((worktree_id, path), cx) + }) + .await + .log_err(), + }; + if let Some(buffer) = buffer { + let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot()); + buffers_tx + .send((Some((buffer, snapshot)), index)) + .await + .log_err(); + } else { + buffers_tx.send((None, index)).await.log_err(); + } + + Ok::<_, anyhow::Error>(()) + }) + .detach(); + } + }) + .detach(); + (sorted_buffers_rx, buffers_rx) + } + pub fn find_or_create_local_worktree( &mut self, abs_path: impl AsRef, @@ -7006,17 +7196,17 @@ impl Project { ) -> Result { let peer_id = envelope.original_sender_id()?; let query = SearchQuery::from_proto(envelope.payload)?; - let result = this - .update(&mut cx, |this, cx| this.search(query, cx)) - .await?; + let mut result = this.update(&mut cx, |this, cx| this.search(query, cx)); - this.update(&mut cx, |this, cx| { + cx.spawn(|mut cx| async move { let mut locations = Vec::new(); - for (buffer, ranges) in result { + while let Some((buffer, ranges)) = result.next().await { for range in ranges { let start = serialize_anchor(&range.start); let end = serialize_anchor(&range.end); - let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx); + let buffer_id = this.update(&mut cx, |this, cx| { + this.create_buffer_for_peer(&buffer, peer_id, cx) + }); locations.push(proto::Location { buffer_id, start: Some(start), @@ -7026,6 +7216,7 @@ impl Project { } Ok(proto::SearchProjectResponse { locations }) }) + .await } async fn handle_open_buffer_for_symbol( diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index a504900c83..7c5983a0a9 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -3953,11 +3953,12 @@ async fn search( query: SearchQuery, cx: &mut gpui::TestAppContext, ) -> Result>>> { - let results = project - .update(cx, |project, cx| project.search(query, cx)) - .await?; - - Ok(results + let mut search_rx = project.update(cx, |project, cx| project.search(query, cx)); + let mut result = HashMap::default(); + while let Some((buffer, range)) = search_rx.next().await { + result.entry(buffer).or_insert(range); + } + Ok(result .into_iter() .map(|(buffer, ranges)| { buffer.read_with(cx, |buffer, _| { diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index d7633a45e4..6364183877 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -185,28 +185,26 @@ impl ProjectSearch { self.active_query = Some(query); self.match_ranges.clear(); self.pending_search = Some(cx.spawn_weak(|this, mut cx| async move { - let matches = search.await.log_err()?; + let mut matches = search; let this = this.upgrade(&cx)?; - let mut matches = matches.into_iter().collect::>(); - let (_task, mut match_ranges) = this.update(&mut cx, |this, cx| { + this.update(&mut cx, |this, cx| { this.match_ranges.clear(); + this.excerpts.update(cx, |this, cx| this.clear(cx)); this.no_results = Some(true); - matches.sort_by_key(|(buffer, _)| buffer.read(cx).file().map(|file| file.path())); - this.excerpts.update(cx, |excerpts, cx| { - excerpts.clear(cx); - excerpts.stream_excerpts_with_context_lines(matches, 1, cx) - }) }); - while let Some(match_range) = match_ranges.next().await { - this.update(&mut cx, |this, cx| { - this.match_ranges.push(match_range); - while let Ok(Some(match_range)) = match_ranges.try_next() { - this.match_ranges.push(match_range); - } + while let Some((buffer, anchors)) = matches.next().await { + let mut ranges = this.update(&mut cx, |this, cx| { this.no_results = Some(false); - cx.notify(); + this.excerpts.update(cx, |excerpts, cx| { + excerpts.stream_excerpts_with_context_lines(buffer, anchors, 1, cx) + }) }); + + while let Some(range) = ranges.next().await { + this.update(&mut cx, |this, _| this.match_ranges.push(range)); + } + this.update(&mut cx, |_, cx| cx.notify()); } this.update(&mut cx, |this, cx| { @@ -238,29 +236,31 @@ impl ProjectSearch { self.no_results = Some(true); self.pending_search = Some(cx.spawn(|this, mut cx| async move { let results = search?.await.log_err()?; + let matches = results + .into_iter() + .map(|result| (result.buffer, vec![result.range.start..result.range.start])); - let (_task, mut match_ranges) = this.update(&mut cx, |this, cx| { + this.update(&mut cx, |this, cx| { this.excerpts.update(cx, |excerpts, cx| { excerpts.clear(cx); - - let matches = results - .into_iter() - .map(|result| (result.buffer, vec![result.range.start..result.range.start])) - .collect(); - - excerpts.stream_excerpts_with_context_lines(matches, 3, cx) }) }); - - while let Some(match_range) = match_ranges.next().await { - this.update(&mut cx, |this, cx| { - this.match_ranges.push(match_range); - while let Ok(Some(match_range)) = match_ranges.try_next() { - this.match_ranges.push(match_range); - } + for (buffer, ranges) in matches { + let mut match_ranges = this.update(&mut cx, |this, cx| { this.no_results = Some(false); - cx.notify(); + this.excerpts.update(cx, |excerpts, cx| { + excerpts.stream_excerpts_with_context_lines(buffer, ranges, 3, cx) + }) }); + while let Some(match_range) = match_ranges.next().await { + this.update(&mut cx, |this, cx| { + this.match_ranges.push(match_range); + while let Ok(Some(match_range)) = match_ranges.try_next() { + this.match_ranges.push(match_range); + } + cx.notify(); + }); + } } this.update(&mut cx, |this, cx| {