Use the same background executor for spawning CPU intensive tasks
Co-Authored-By: Nathan Sobo <nathan@zed.dev> Co-Authored-By: Max Brunsfeld <max@zed.dev>
This commit is contained in:
parent
6abac39eaa
commit
c4e37dc47c
6 changed files with 83 additions and 101 deletions
|
@ -123,7 +123,6 @@ impl App {
|
|||
let cx = Rc::new(RefCell::new(MutableAppContext::new(
|
||||
foreground,
|
||||
Arc::new(executor::Background::new()),
|
||||
Arc::new(executor::Background::new()),
|
||||
Arc::new(platform),
|
||||
Rc::new(foreground_platform),
|
||||
(),
|
||||
|
@ -140,7 +139,6 @@ impl App {
|
|||
let app = Self(Rc::new(RefCell::new(MutableAppContext::new(
|
||||
foreground,
|
||||
Arc::new(executor::Background::new()),
|
||||
Arc::new(executor::Background::new()),
|
||||
platform.clone(),
|
||||
foreground_platform.clone(),
|
||||
asset_source,
|
||||
|
@ -247,7 +245,6 @@ impl TestAppContext {
|
|||
pub fn new(
|
||||
foreground: Rc<executor::Foreground>,
|
||||
background: Arc<executor::Background>,
|
||||
thread_pool: Arc<executor::Background>,
|
||||
first_entity_id: usize,
|
||||
) -> Self {
|
||||
let platform = Arc::new(platform::test::platform());
|
||||
|
@ -255,7 +252,6 @@ impl TestAppContext {
|
|||
let mut cx = MutableAppContext::new(
|
||||
foreground.clone(),
|
||||
background,
|
||||
thread_pool,
|
||||
platform,
|
||||
foreground_platform.clone(),
|
||||
(),
|
||||
|
@ -594,7 +590,6 @@ impl MutableAppContext {
|
|||
fn new(
|
||||
foreground: Rc<executor::Foreground>,
|
||||
background: Arc<executor::Background>,
|
||||
thread_pool: Arc<executor::Background>,
|
||||
platform: Arc<dyn platform::Platform>,
|
||||
foreground_platform: Rc<dyn platform::ForegroundPlatform>,
|
||||
asset_source: impl AssetSource,
|
||||
|
@ -612,7 +607,6 @@ impl MutableAppContext {
|
|||
values: Default::default(),
|
||||
ref_counts: Arc::new(Mutex::new(RefCounts::default())),
|
||||
background,
|
||||
thread_pool,
|
||||
font_cache: Arc::new(FontCache::new(fonts)),
|
||||
},
|
||||
actions: HashMap::new(),
|
||||
|
@ -1490,7 +1484,6 @@ pub struct AppContext {
|
|||
values: RwLock<HashMap<(TypeId, usize), Box<dyn Any>>>,
|
||||
background: Arc<executor::Background>,
|
||||
ref_counts: Arc<Mutex<RefCounts>>,
|
||||
thread_pool: Arc<executor::Background>,
|
||||
font_cache: Arc<FontCache>,
|
||||
}
|
||||
|
||||
|
@ -1535,10 +1528,6 @@ impl AppContext {
|
|||
&self.font_cache
|
||||
}
|
||||
|
||||
pub fn thread_pool(&self) -> &Arc<executor::Background> {
|
||||
&self.thread_pool
|
||||
}
|
||||
|
||||
pub fn value<Tag: 'static, T: 'static + Default>(&self, id: usize) -> ValueHandle<T> {
|
||||
let key = (TypeId::of::<Tag>(), id);
|
||||
let mut values = self.values.write();
|
||||
|
@ -1721,10 +1710,6 @@ impl<'a, T: Entity> ModelContext<'a, T> {
|
|||
&self.app.cx.background
|
||||
}
|
||||
|
||||
pub fn thread_pool(&self) -> &Arc<executor::Background> {
|
||||
&self.app.cx.thread_pool
|
||||
}
|
||||
|
||||
pub fn halt_stream(&mut self) {
|
||||
self.halt_stream = true;
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ pub enum Background {
|
|||
Deterministic(Arc<Deterministic>),
|
||||
Production {
|
||||
executor: Arc<smol::Executor<'static>>,
|
||||
threads: usize,
|
||||
_stop: channel::Sender<()>,
|
||||
},
|
||||
}
|
||||
|
@ -324,9 +323,8 @@ impl Background {
|
|||
pub fn new() -> Self {
|
||||
let executor = Arc::new(Executor::new());
|
||||
let stop = channel::unbounded::<()>();
|
||||
let threads = num_cpus::get();
|
||||
|
||||
for i in 0..threads {
|
||||
for i in 0..2 * num_cpus::get() {
|
||||
let executor = executor.clone();
|
||||
let stop = stop.1.clone();
|
||||
thread::Builder::new()
|
||||
|
@ -337,16 +335,12 @@ impl Background {
|
|||
|
||||
Self::Production {
|
||||
executor,
|
||||
threads,
|
||||
_stop: stop.0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn threads(&self) -> usize {
|
||||
match self {
|
||||
Self::Deterministic(_) => 1,
|
||||
Self::Production { threads, .. } => *threads,
|
||||
}
|
||||
pub fn num_cpus(&self) -> usize {
|
||||
num_cpus::get()
|
||||
}
|
||||
|
||||
pub fn spawn<T, F>(&self, future: F) -> Task<T>
|
||||
|
|
|
@ -60,7 +60,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
|
|||
let inner_fn_args = (0..inner_fn.sig.inputs.len())
|
||||
.map(|i| {
|
||||
let first_entity_id = i * 100_000;
|
||||
quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), background.clone(), #first_entity_id),)
|
||||
quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), #first_entity_id),)
|
||||
})
|
||||
.collect::<proc_macro2::TokenStream>();
|
||||
|
||||
|
|
|
@ -399,7 +399,7 @@ impl FileFinder {
|
|||
.map(|tree| tree.read(cx).snapshot())
|
||||
.collect::<Vec<_>>();
|
||||
let search_id = util::post_inc(&mut self.search_count);
|
||||
let pool = cx.as_ref().thread_pool().clone();
|
||||
let background = cx.as_ref().background().clone();
|
||||
self.cancel_flag.store(true, atomic::Ordering::Relaxed);
|
||||
self.cancel_flag = Arc::new(AtomicBool::new(false));
|
||||
let cancel_flag = self.cancel_flag.clone();
|
||||
|
@ -413,7 +413,7 @@ impl FileFinder {
|
|||
false,
|
||||
100,
|
||||
cancel_flag.clone(),
|
||||
pool,
|
||||
background,
|
||||
)
|
||||
.await;
|
||||
let did_cancel = cancel_flag.load(atomic::Ordering::Relaxed);
|
||||
|
|
|
@ -552,11 +552,11 @@ impl Worktree {
|
|||
let tree = tree.as_local_mut().unwrap();
|
||||
let abs_path = tree.snapshot.abs_path.clone();
|
||||
let background_snapshot = tree.background_snapshot.clone();
|
||||
let thread_pool = cx.thread_pool().clone();
|
||||
let background = cx.background().clone();
|
||||
tree._background_scanner_task = Some(cx.background().spawn(async move {
|
||||
let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
|
||||
let scanner =
|
||||
BackgroundScanner::new(background_snapshot, scan_states_tx, fs, thread_pool);
|
||||
BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
|
||||
scanner.run(events).await;
|
||||
}));
|
||||
});
|
||||
|
@ -2295,7 +2295,7 @@ impl BackgroundScanner {
|
|||
|
||||
self.executor
|
||||
.scoped(|scope| {
|
||||
for _ in 0..self.executor.threads() {
|
||||
for _ in 0..self.executor.num_cpus() {
|
||||
scope.spawn(async {
|
||||
while let Ok(job) = rx.recv().await {
|
||||
if let Err(err) = self
|
||||
|
@ -2487,7 +2487,7 @@ impl BackgroundScanner {
|
|||
drop(scan_queue_tx);
|
||||
self.executor
|
||||
.scoped(|scope| {
|
||||
for _ in 0..self.executor.threads() {
|
||||
for _ in 0..self.executor.num_cpus() {
|
||||
scope.spawn(async {
|
||||
while let Ok(job) = scan_queue_rx.recv().await {
|
||||
if let Err(err) = self
|
||||
|
@ -2555,7 +2555,7 @@ impl BackgroundScanner {
|
|||
|
||||
self.executor
|
||||
.scoped(|scope| {
|
||||
for _ in 0..self.executor.threads() {
|
||||
for _ in 0..self.executor.num_cpus() {
|
||||
scope.spawn(async {
|
||||
while let Ok(job) = ignore_queue_rx.recv().await {
|
||||
self.update_ignore_status(job, &snapshot).await;
|
||||
|
@ -3044,7 +3044,7 @@ mod tests {
|
|||
false,
|
||||
10,
|
||||
Default::default(),
|
||||
cx.thread_pool().clone(),
|
||||
cx.background().clone(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
|
|
@ -59,7 +59,7 @@ pub async fn match_paths<'a, T>(
|
|||
smart_case: bool,
|
||||
max_results: usize,
|
||||
cancel_flag: Arc<AtomicBool>,
|
||||
pool: Arc<executor::Background>,
|
||||
background: Arc<executor::Background>,
|
||||
) -> Vec<PathMatch>
|
||||
where
|
||||
T: Clone + Send + Iterator<Item = &'a Snapshot> + 'a,
|
||||
|
@ -77,82 +77,85 @@ where
|
|||
snapshots.clone().map(Snapshot::visible_file_count).sum()
|
||||
};
|
||||
|
||||
let segment_size = (path_count + pool.threads() - 1) / pool.threads();
|
||||
let mut segment_results = (0..pool.threads())
|
||||
let num_cpus = background.num_cpus().min(path_count);
|
||||
let segment_size = (path_count + num_cpus - 1) / num_cpus;
|
||||
let mut segment_results = (0..num_cpus)
|
||||
.map(|_| Vec::with_capacity(max_results))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
pool.scoped(|scope| {
|
||||
for (segment_idx, results) in segment_results.iter_mut().enumerate() {
|
||||
let snapshots = snapshots.clone();
|
||||
let cancel_flag = &cancel_flag;
|
||||
scope.spawn(async move {
|
||||
let segment_start = segment_idx * segment_size;
|
||||
let segment_end = segment_start + segment_size;
|
||||
background
|
||||
.scoped(|scope| {
|
||||
for (segment_idx, results) in segment_results.iter_mut().enumerate() {
|
||||
let snapshots = snapshots.clone();
|
||||
let cancel_flag = &cancel_flag;
|
||||
scope.spawn(async move {
|
||||
let segment_start = segment_idx * segment_size;
|
||||
let segment_end = segment_start + segment_size;
|
||||
|
||||
let mut min_score = 0.0;
|
||||
let mut last_positions = Vec::new();
|
||||
last_positions.resize(query.len(), 0);
|
||||
let mut match_positions = Vec::new();
|
||||
match_positions.resize(query.len(), 0);
|
||||
let mut score_matrix = Vec::new();
|
||||
let mut best_position_matrix = Vec::new();
|
||||
let mut min_score = 0.0;
|
||||
let mut last_positions = Vec::new();
|
||||
last_positions.resize(query.len(), 0);
|
||||
let mut match_positions = Vec::new();
|
||||
match_positions.resize(query.len(), 0);
|
||||
let mut score_matrix = Vec::new();
|
||||
let mut best_position_matrix = Vec::new();
|
||||
|
||||
let mut tree_start = 0;
|
||||
for snapshot in snapshots {
|
||||
let tree_end = if include_ignored {
|
||||
tree_start + snapshot.file_count()
|
||||
} else {
|
||||
tree_start + snapshot.visible_file_count()
|
||||
};
|
||||
|
||||
let include_root_name = include_root_name || snapshot.root_entry().is_file();
|
||||
if tree_start < segment_end && segment_start < tree_end {
|
||||
let start = max(tree_start, segment_start) - tree_start;
|
||||
let end = min(tree_end, segment_end) - tree_start;
|
||||
let entries = if include_ignored {
|
||||
snapshot.files(start).take(end - start)
|
||||
let mut tree_start = 0;
|
||||
for snapshot in snapshots {
|
||||
let tree_end = if include_ignored {
|
||||
tree_start + snapshot.file_count()
|
||||
} else {
|
||||
snapshot.visible_files(start).take(end - start)
|
||||
tree_start + snapshot.visible_file_count()
|
||||
};
|
||||
let paths = entries.map(|entry| {
|
||||
if let EntryKind::File(char_bag) = entry.kind {
|
||||
MatchCandidate {
|
||||
path: &entry.path,
|
||||
char_bag,
|
||||
}
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
});
|
||||
|
||||
match_single_tree_paths(
|
||||
snapshot,
|
||||
include_root_name,
|
||||
paths,
|
||||
query,
|
||||
lowercase_query,
|
||||
query_chars,
|
||||
smart_case,
|
||||
results,
|
||||
max_results,
|
||||
&mut min_score,
|
||||
&mut match_positions,
|
||||
&mut last_positions,
|
||||
&mut score_matrix,
|
||||
&mut best_position_matrix,
|
||||
&cancel_flag,
|
||||
);
|
||||
let include_root_name =
|
||||
include_root_name || snapshot.root_entry().is_file();
|
||||
if tree_start < segment_end && segment_start < tree_end {
|
||||
let start = max(tree_start, segment_start) - tree_start;
|
||||
let end = min(tree_end, segment_end) - tree_start;
|
||||
let entries = if include_ignored {
|
||||
snapshot.files(start).take(end - start)
|
||||
} else {
|
||||
snapshot.visible_files(start).take(end - start)
|
||||
};
|
||||
let paths = entries.map(|entry| {
|
||||
if let EntryKind::File(char_bag) = entry.kind {
|
||||
MatchCandidate {
|
||||
path: &entry.path,
|
||||
char_bag,
|
||||
}
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
});
|
||||
|
||||
match_single_tree_paths(
|
||||
snapshot,
|
||||
include_root_name,
|
||||
paths,
|
||||
query,
|
||||
lowercase_query,
|
||||
query_chars,
|
||||
smart_case,
|
||||
results,
|
||||
max_results,
|
||||
&mut min_score,
|
||||
&mut match_positions,
|
||||
&mut last_positions,
|
||||
&mut score_matrix,
|
||||
&mut best_position_matrix,
|
||||
&cancel_flag,
|
||||
);
|
||||
}
|
||||
if tree_end >= segment_end {
|
||||
break;
|
||||
}
|
||||
tree_start = tree_end;
|
||||
}
|
||||
if tree_end >= segment_end {
|
||||
break;
|
||||
}
|
||||
tree_start = tree_end;
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
.await;
|
||||
})
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
let mut results = Vec::new();
|
||||
for segment_result in segment_results {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue