Use the same background executor for spawning CPU intensive tasks

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>
This commit is contained in:
Antonio Scandurra 2021-07-13 18:13:25 +02:00
parent 6abac39eaa
commit c4e37dc47c
6 changed files with 83 additions and 101 deletions

View file

@ -123,7 +123,6 @@ impl App {
let cx = Rc::new(RefCell::new(MutableAppContext::new( let cx = Rc::new(RefCell::new(MutableAppContext::new(
foreground, foreground,
Arc::new(executor::Background::new()), Arc::new(executor::Background::new()),
Arc::new(executor::Background::new()),
Arc::new(platform), Arc::new(platform),
Rc::new(foreground_platform), Rc::new(foreground_platform),
(), (),
@ -140,7 +139,6 @@ impl App {
let app = Self(Rc::new(RefCell::new(MutableAppContext::new( let app = Self(Rc::new(RefCell::new(MutableAppContext::new(
foreground, foreground,
Arc::new(executor::Background::new()), Arc::new(executor::Background::new()),
Arc::new(executor::Background::new()),
platform.clone(), platform.clone(),
foreground_platform.clone(), foreground_platform.clone(),
asset_source, asset_source,
@ -247,7 +245,6 @@ impl TestAppContext {
pub fn new( pub fn new(
foreground: Rc<executor::Foreground>, foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>, background: Arc<executor::Background>,
thread_pool: Arc<executor::Background>,
first_entity_id: usize, first_entity_id: usize,
) -> Self { ) -> Self {
let platform = Arc::new(platform::test::platform()); let platform = Arc::new(platform::test::platform());
@ -255,7 +252,6 @@ impl TestAppContext {
let mut cx = MutableAppContext::new( let mut cx = MutableAppContext::new(
foreground.clone(), foreground.clone(),
background, background,
thread_pool,
platform, platform,
foreground_platform.clone(), foreground_platform.clone(),
(), (),
@ -594,7 +590,6 @@ impl MutableAppContext {
fn new( fn new(
foreground: Rc<executor::Foreground>, foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>, background: Arc<executor::Background>,
thread_pool: Arc<executor::Background>,
platform: Arc<dyn platform::Platform>, platform: Arc<dyn platform::Platform>,
foreground_platform: Rc<dyn platform::ForegroundPlatform>, foreground_platform: Rc<dyn platform::ForegroundPlatform>,
asset_source: impl AssetSource, asset_source: impl AssetSource,
@ -612,7 +607,6 @@ impl MutableAppContext {
values: Default::default(), values: Default::default(),
ref_counts: Arc::new(Mutex::new(RefCounts::default())), ref_counts: Arc::new(Mutex::new(RefCounts::default())),
background, background,
thread_pool,
font_cache: Arc::new(FontCache::new(fonts)), font_cache: Arc::new(FontCache::new(fonts)),
}, },
actions: HashMap::new(), actions: HashMap::new(),
@ -1490,7 +1484,6 @@ pub struct AppContext {
values: RwLock<HashMap<(TypeId, usize), Box<dyn Any>>>, values: RwLock<HashMap<(TypeId, usize), Box<dyn Any>>>,
background: Arc<executor::Background>, background: Arc<executor::Background>,
ref_counts: Arc<Mutex<RefCounts>>, ref_counts: Arc<Mutex<RefCounts>>,
thread_pool: Arc<executor::Background>,
font_cache: Arc<FontCache>, font_cache: Arc<FontCache>,
} }
@ -1535,10 +1528,6 @@ impl AppContext {
&self.font_cache &self.font_cache
} }
pub fn thread_pool(&self) -> &Arc<executor::Background> {
&self.thread_pool
}
pub fn value<Tag: 'static, T: 'static + Default>(&self, id: usize) -> ValueHandle<T> { pub fn value<Tag: 'static, T: 'static + Default>(&self, id: usize) -> ValueHandle<T> {
let key = (TypeId::of::<Tag>(), id); let key = (TypeId::of::<Tag>(), id);
let mut values = self.values.write(); let mut values = self.values.write();
@ -1721,10 +1710,6 @@ impl<'a, T: Entity> ModelContext<'a, T> {
&self.app.cx.background &self.app.cx.background
} }
pub fn thread_pool(&self) -> &Arc<executor::Background> {
&self.app.cx.thread_pool
}
pub fn halt_stream(&mut self) { pub fn halt_stream(&mut self) {
self.halt_stream = true; self.halt_stream = true;
} }

View file

@ -34,7 +34,6 @@ pub enum Background {
Deterministic(Arc<Deterministic>), Deterministic(Arc<Deterministic>),
Production { Production {
executor: Arc<smol::Executor<'static>>, executor: Arc<smol::Executor<'static>>,
threads: usize,
_stop: channel::Sender<()>, _stop: channel::Sender<()>,
}, },
} }
@ -324,9 +323,8 @@ impl Background {
pub fn new() -> Self { pub fn new() -> Self {
let executor = Arc::new(Executor::new()); let executor = Arc::new(Executor::new());
let stop = channel::unbounded::<()>(); let stop = channel::unbounded::<()>();
let threads = num_cpus::get();
for i in 0..threads { for i in 0..2 * num_cpus::get() {
let executor = executor.clone(); let executor = executor.clone();
let stop = stop.1.clone(); let stop = stop.1.clone();
thread::Builder::new() thread::Builder::new()
@ -337,16 +335,12 @@ impl Background {
Self::Production { Self::Production {
executor, executor,
threads,
_stop: stop.0, _stop: stop.0,
} }
} }
pub fn threads(&self) -> usize { pub fn num_cpus(&self) -> usize {
match self { num_cpus::get()
Self::Deterministic(_) => 1,
Self::Production { threads, .. } => *threads,
}
} }
pub fn spawn<T, F>(&self, future: F) -> Task<T> pub fn spawn<T, F>(&self, future: F) -> Task<T>

View file

@ -60,7 +60,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
let inner_fn_args = (0..inner_fn.sig.inputs.len()) let inner_fn_args = (0..inner_fn.sig.inputs.len())
.map(|i| { .map(|i| {
let first_entity_id = i * 100_000; let first_entity_id = i * 100_000;
quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), background.clone(), #first_entity_id),) quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), #first_entity_id),)
}) })
.collect::<proc_macro2::TokenStream>(); .collect::<proc_macro2::TokenStream>();

View file

@ -399,7 +399,7 @@ impl FileFinder {
.map(|tree| tree.read(cx).snapshot()) .map(|tree| tree.read(cx).snapshot())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let search_id = util::post_inc(&mut self.search_count); let search_id = util::post_inc(&mut self.search_count);
let pool = cx.as_ref().thread_pool().clone(); let background = cx.as_ref().background().clone();
self.cancel_flag.store(true, atomic::Ordering::Relaxed); self.cancel_flag.store(true, atomic::Ordering::Relaxed);
self.cancel_flag = Arc::new(AtomicBool::new(false)); self.cancel_flag = Arc::new(AtomicBool::new(false));
let cancel_flag = self.cancel_flag.clone(); let cancel_flag = self.cancel_flag.clone();
@ -413,7 +413,7 @@ impl FileFinder {
false, false,
100, 100,
cancel_flag.clone(), cancel_flag.clone(),
pool, background,
) )
.await; .await;
let did_cancel = cancel_flag.load(atomic::Ordering::Relaxed); let did_cancel = cancel_flag.load(atomic::Ordering::Relaxed);

View file

@ -552,11 +552,11 @@ impl Worktree {
let tree = tree.as_local_mut().unwrap(); let tree = tree.as_local_mut().unwrap();
let abs_path = tree.snapshot.abs_path.clone(); let abs_path = tree.snapshot.abs_path.clone();
let background_snapshot = tree.background_snapshot.clone(); let background_snapshot = tree.background_snapshot.clone();
let thread_pool = cx.thread_pool().clone(); let background = cx.background().clone();
tree._background_scanner_task = Some(cx.background().spawn(async move { tree._background_scanner_task = Some(cx.background().spawn(async move {
let events = fs.watch(&abs_path, Duration::from_millis(100)).await; let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
let scanner = let scanner =
BackgroundScanner::new(background_snapshot, scan_states_tx, fs, thread_pool); BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
scanner.run(events).await; scanner.run(events).await;
})); }));
}); });
@ -2295,7 +2295,7 @@ impl BackgroundScanner {
self.executor self.executor
.scoped(|scope| { .scoped(|scope| {
for _ in 0..self.executor.threads() { for _ in 0..self.executor.num_cpus() {
scope.spawn(async { scope.spawn(async {
while let Ok(job) = rx.recv().await { while let Ok(job) = rx.recv().await {
if let Err(err) = self if let Err(err) = self
@ -2487,7 +2487,7 @@ impl BackgroundScanner {
drop(scan_queue_tx); drop(scan_queue_tx);
self.executor self.executor
.scoped(|scope| { .scoped(|scope| {
for _ in 0..self.executor.threads() { for _ in 0..self.executor.num_cpus() {
scope.spawn(async { scope.spawn(async {
while let Ok(job) = scan_queue_rx.recv().await { while let Ok(job) = scan_queue_rx.recv().await {
if let Err(err) = self if let Err(err) = self
@ -2555,7 +2555,7 @@ impl BackgroundScanner {
self.executor self.executor
.scoped(|scope| { .scoped(|scope| {
for _ in 0..self.executor.threads() { for _ in 0..self.executor.num_cpus() {
scope.spawn(async { scope.spawn(async {
while let Ok(job) = ignore_queue_rx.recv().await { while let Ok(job) = ignore_queue_rx.recv().await {
self.update_ignore_status(job, &snapshot).await; self.update_ignore_status(job, &snapshot).await;
@ -3044,7 +3044,7 @@ mod tests {
false, false,
10, 10,
Default::default(), Default::default(),
cx.thread_pool().clone(), cx.background().clone(),
) )
}) })
.await; .await;

View file

@ -59,7 +59,7 @@ pub async fn match_paths<'a, T>(
smart_case: bool, smart_case: bool,
max_results: usize, max_results: usize,
cancel_flag: Arc<AtomicBool>, cancel_flag: Arc<AtomicBool>,
pool: Arc<executor::Background>, background: Arc<executor::Background>,
) -> Vec<PathMatch> ) -> Vec<PathMatch>
where where
T: Clone + Send + Iterator<Item = &'a Snapshot> + 'a, T: Clone + Send + Iterator<Item = &'a Snapshot> + 'a,
@ -77,82 +77,85 @@ where
snapshots.clone().map(Snapshot::visible_file_count).sum() snapshots.clone().map(Snapshot::visible_file_count).sum()
}; };
let segment_size = (path_count + pool.threads() - 1) / pool.threads(); let num_cpus = background.num_cpus().min(path_count);
let mut segment_results = (0..pool.threads()) let segment_size = (path_count + num_cpus - 1) / num_cpus;
let mut segment_results = (0..num_cpus)
.map(|_| Vec::with_capacity(max_results)) .map(|_| Vec::with_capacity(max_results))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
pool.scoped(|scope| { background
for (segment_idx, results) in segment_results.iter_mut().enumerate() { .scoped(|scope| {
let snapshots = snapshots.clone(); for (segment_idx, results) in segment_results.iter_mut().enumerate() {
let cancel_flag = &cancel_flag; let snapshots = snapshots.clone();
scope.spawn(async move { let cancel_flag = &cancel_flag;
let segment_start = segment_idx * segment_size; scope.spawn(async move {
let segment_end = segment_start + segment_size; let segment_start = segment_idx * segment_size;
let segment_end = segment_start + segment_size;
let mut min_score = 0.0; let mut min_score = 0.0;
let mut last_positions = Vec::new(); let mut last_positions = Vec::new();
last_positions.resize(query.len(), 0); last_positions.resize(query.len(), 0);
let mut match_positions = Vec::new(); let mut match_positions = Vec::new();
match_positions.resize(query.len(), 0); match_positions.resize(query.len(), 0);
let mut score_matrix = Vec::new(); let mut score_matrix = Vec::new();
let mut best_position_matrix = Vec::new(); let mut best_position_matrix = Vec::new();
let mut tree_start = 0; let mut tree_start = 0;
for snapshot in snapshots { for snapshot in snapshots {
let tree_end = if include_ignored { let tree_end = if include_ignored {
tree_start + snapshot.file_count() tree_start + snapshot.file_count()
} else {
tree_start + snapshot.visible_file_count()
};
let include_root_name = include_root_name || snapshot.root_entry().is_file();
if tree_start < segment_end && segment_start < tree_end {
let start = max(tree_start, segment_start) - tree_start;
let end = min(tree_end, segment_end) - tree_start;
let entries = if include_ignored {
snapshot.files(start).take(end - start)
} else { } else {
snapshot.visible_files(start).take(end - start) tree_start + snapshot.visible_file_count()
}; };
let paths = entries.map(|entry| {
if let EntryKind::File(char_bag) = entry.kind {
MatchCandidate {
path: &entry.path,
char_bag,
}
} else {
unreachable!()
}
});
match_single_tree_paths( let include_root_name =
snapshot, include_root_name || snapshot.root_entry().is_file();
include_root_name, if tree_start < segment_end && segment_start < tree_end {
paths, let start = max(tree_start, segment_start) - tree_start;
query, let end = min(tree_end, segment_end) - tree_start;
lowercase_query, let entries = if include_ignored {
query_chars, snapshot.files(start).take(end - start)
smart_case, } else {
results, snapshot.visible_files(start).take(end - start)
max_results, };
&mut min_score, let paths = entries.map(|entry| {
&mut match_positions, if let EntryKind::File(char_bag) = entry.kind {
&mut last_positions, MatchCandidate {
&mut score_matrix, path: &entry.path,
&mut best_position_matrix, char_bag,
&cancel_flag, }
); } else {
unreachable!()
}
});
match_single_tree_paths(
snapshot,
include_root_name,
paths,
query,
lowercase_query,
query_chars,
smart_case,
results,
max_results,
&mut min_score,
&mut match_positions,
&mut last_positions,
&mut score_matrix,
&mut best_position_matrix,
&cancel_flag,
);
}
if tree_end >= segment_end {
break;
}
tree_start = tree_end;
} }
if tree_end >= segment_end { })
break; }
} })
tree_start = tree_end; .await;
}
})
}
})
.await;
let mut results = Vec::new(); let mut results = Vec::new();
for segment_result in segment_results { for segment_result in segment_results {