Use the same background executor for spawning CPU intensive tasks

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>
This commit is contained in:
Antonio Scandurra 2021-07-13 18:13:25 +02:00
parent 6abac39eaa
commit c4e37dc47c
6 changed files with 83 additions and 101 deletions

View file

@ -123,7 +123,6 @@ impl App {
let cx = Rc::new(RefCell::new(MutableAppContext::new(
foreground,
Arc::new(executor::Background::new()),
Arc::new(executor::Background::new()),
Arc::new(platform),
Rc::new(foreground_platform),
(),
@ -140,7 +139,6 @@ impl App {
let app = Self(Rc::new(RefCell::new(MutableAppContext::new(
foreground,
Arc::new(executor::Background::new()),
Arc::new(executor::Background::new()),
platform.clone(),
foreground_platform.clone(),
asset_source,
@ -247,7 +245,6 @@ impl TestAppContext {
pub fn new(
foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>,
thread_pool: Arc<executor::Background>,
first_entity_id: usize,
) -> Self {
let platform = Arc::new(platform::test::platform());
@ -255,7 +252,6 @@ impl TestAppContext {
let mut cx = MutableAppContext::new(
foreground.clone(),
background,
thread_pool,
platform,
foreground_platform.clone(),
(),
@ -594,7 +590,6 @@ impl MutableAppContext {
fn new(
foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>,
thread_pool: Arc<executor::Background>,
platform: Arc<dyn platform::Platform>,
foreground_platform: Rc<dyn platform::ForegroundPlatform>,
asset_source: impl AssetSource,
@ -612,7 +607,6 @@ impl MutableAppContext {
values: Default::default(),
ref_counts: Arc::new(Mutex::new(RefCounts::default())),
background,
thread_pool,
font_cache: Arc::new(FontCache::new(fonts)),
},
actions: HashMap::new(),
@ -1490,7 +1484,6 @@ pub struct AppContext {
values: RwLock<HashMap<(TypeId, usize), Box<dyn Any>>>,
background: Arc<executor::Background>,
ref_counts: Arc<Mutex<RefCounts>>,
thread_pool: Arc<executor::Background>,
font_cache: Arc<FontCache>,
}
@ -1535,10 +1528,6 @@ impl AppContext {
&self.font_cache
}
pub fn thread_pool(&self) -> &Arc<executor::Background> {
&self.thread_pool
}
pub fn value<Tag: 'static, T: 'static + Default>(&self, id: usize) -> ValueHandle<T> {
let key = (TypeId::of::<Tag>(), id);
let mut values = self.values.write();
@ -1721,10 +1710,6 @@ impl<'a, T: Entity> ModelContext<'a, T> {
&self.app.cx.background
}
pub fn thread_pool(&self) -> &Arc<executor::Background> {
&self.app.cx.thread_pool
}
pub fn halt_stream(&mut self) {
self.halt_stream = true;
}

View file

@ -34,7 +34,6 @@ pub enum Background {
Deterministic(Arc<Deterministic>),
Production {
executor: Arc<smol::Executor<'static>>,
threads: usize,
_stop: channel::Sender<()>,
},
}
@ -324,9 +323,8 @@ impl Background {
pub fn new() -> Self {
let executor = Arc::new(Executor::new());
let stop = channel::unbounded::<()>();
let threads = num_cpus::get();
for i in 0..threads {
for i in 0..2 * num_cpus::get() {
let executor = executor.clone();
let stop = stop.1.clone();
thread::Builder::new()
@ -337,16 +335,12 @@ impl Background {
Self::Production {
executor,
threads,
_stop: stop.0,
}
}
pub fn threads(&self) -> usize {
match self {
Self::Deterministic(_) => 1,
Self::Production { threads, .. } => *threads,
}
pub fn num_cpus(&self) -> usize {
num_cpus::get()
}
pub fn spawn<T, F>(&self, future: F) -> Task<T>

View file

@ -60,7 +60,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
let inner_fn_args = (0..inner_fn.sig.inputs.len())
.map(|i| {
let first_entity_id = i * 100_000;
quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), background.clone(), #first_entity_id),)
quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), #first_entity_id),)
})
.collect::<proc_macro2::TokenStream>();

View file

@ -399,7 +399,7 @@ impl FileFinder {
.map(|tree| tree.read(cx).snapshot())
.collect::<Vec<_>>();
let search_id = util::post_inc(&mut self.search_count);
let pool = cx.as_ref().thread_pool().clone();
let background = cx.as_ref().background().clone();
self.cancel_flag.store(true, atomic::Ordering::Relaxed);
self.cancel_flag = Arc::new(AtomicBool::new(false));
let cancel_flag = self.cancel_flag.clone();
@ -413,7 +413,7 @@ impl FileFinder {
false,
100,
cancel_flag.clone(),
pool,
background,
)
.await;
let did_cancel = cancel_flag.load(atomic::Ordering::Relaxed);

View file

@ -552,11 +552,11 @@ impl Worktree {
let tree = tree.as_local_mut().unwrap();
let abs_path = tree.snapshot.abs_path.clone();
let background_snapshot = tree.background_snapshot.clone();
let thread_pool = cx.thread_pool().clone();
let background = cx.background().clone();
tree._background_scanner_task = Some(cx.background().spawn(async move {
let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
let scanner =
BackgroundScanner::new(background_snapshot, scan_states_tx, fs, thread_pool);
BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
scanner.run(events).await;
}));
});
@ -2295,7 +2295,7 @@ impl BackgroundScanner {
self.executor
.scoped(|scope| {
for _ in 0..self.executor.threads() {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
while let Ok(job) = rx.recv().await {
if let Err(err) = self
@ -2487,7 +2487,7 @@ impl BackgroundScanner {
drop(scan_queue_tx);
self.executor
.scoped(|scope| {
for _ in 0..self.executor.threads() {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
while let Ok(job) = scan_queue_rx.recv().await {
if let Err(err) = self
@ -2555,7 +2555,7 @@ impl BackgroundScanner {
self.executor
.scoped(|scope| {
for _ in 0..self.executor.threads() {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
while let Ok(job) = ignore_queue_rx.recv().await {
self.update_ignore_status(job, &snapshot).await;
@ -3044,7 +3044,7 @@ mod tests {
false,
10,
Default::default(),
cx.thread_pool().clone(),
cx.background().clone(),
)
})
.await;

View file

@ -59,7 +59,7 @@ pub async fn match_paths<'a, T>(
smart_case: bool,
max_results: usize,
cancel_flag: Arc<AtomicBool>,
pool: Arc<executor::Background>,
background: Arc<executor::Background>,
) -> Vec<PathMatch>
where
T: Clone + Send + Iterator<Item = &'a Snapshot> + 'a,
@ -77,82 +77,85 @@ where
snapshots.clone().map(Snapshot::visible_file_count).sum()
};
let segment_size = (path_count + pool.threads() - 1) / pool.threads();
let mut segment_results = (0..pool.threads())
let num_cpus = background.num_cpus().min(path_count);
let segment_size = (path_count + num_cpus - 1) / num_cpus;
let mut segment_results = (0..num_cpus)
.map(|_| Vec::with_capacity(max_results))
.collect::<Vec<_>>();
pool.scoped(|scope| {
for (segment_idx, results) in segment_results.iter_mut().enumerate() {
let snapshots = snapshots.clone();
let cancel_flag = &cancel_flag;
scope.spawn(async move {
let segment_start = segment_idx * segment_size;
let segment_end = segment_start + segment_size;
background
.scoped(|scope| {
for (segment_idx, results) in segment_results.iter_mut().enumerate() {
let snapshots = snapshots.clone();
let cancel_flag = &cancel_flag;
scope.spawn(async move {
let segment_start = segment_idx * segment_size;
let segment_end = segment_start + segment_size;
let mut min_score = 0.0;
let mut last_positions = Vec::new();
last_positions.resize(query.len(), 0);
let mut match_positions = Vec::new();
match_positions.resize(query.len(), 0);
let mut score_matrix = Vec::new();
let mut best_position_matrix = Vec::new();
let mut min_score = 0.0;
let mut last_positions = Vec::new();
last_positions.resize(query.len(), 0);
let mut match_positions = Vec::new();
match_positions.resize(query.len(), 0);
let mut score_matrix = Vec::new();
let mut best_position_matrix = Vec::new();
let mut tree_start = 0;
for snapshot in snapshots {
let tree_end = if include_ignored {
tree_start + snapshot.file_count()
} else {
tree_start + snapshot.visible_file_count()
};
let include_root_name = include_root_name || snapshot.root_entry().is_file();
if tree_start < segment_end && segment_start < tree_end {
let start = max(tree_start, segment_start) - tree_start;
let end = min(tree_end, segment_end) - tree_start;
let entries = if include_ignored {
snapshot.files(start).take(end - start)
let mut tree_start = 0;
for snapshot in snapshots {
let tree_end = if include_ignored {
tree_start + snapshot.file_count()
} else {
snapshot.visible_files(start).take(end - start)
tree_start + snapshot.visible_file_count()
};
let paths = entries.map(|entry| {
if let EntryKind::File(char_bag) = entry.kind {
MatchCandidate {
path: &entry.path,
char_bag,
}
} else {
unreachable!()
}
});
match_single_tree_paths(
snapshot,
include_root_name,
paths,
query,
lowercase_query,
query_chars,
smart_case,
results,
max_results,
&mut min_score,
&mut match_positions,
&mut last_positions,
&mut score_matrix,
&mut best_position_matrix,
&cancel_flag,
);
let include_root_name =
include_root_name || snapshot.root_entry().is_file();
if tree_start < segment_end && segment_start < tree_end {
let start = max(tree_start, segment_start) - tree_start;
let end = min(tree_end, segment_end) - tree_start;
let entries = if include_ignored {
snapshot.files(start).take(end - start)
} else {
snapshot.visible_files(start).take(end - start)
};
let paths = entries.map(|entry| {
if let EntryKind::File(char_bag) = entry.kind {
MatchCandidate {
path: &entry.path,
char_bag,
}
} else {
unreachable!()
}
});
match_single_tree_paths(
snapshot,
include_root_name,
paths,
query,
lowercase_query,
query_chars,
smart_case,
results,
max_results,
&mut min_score,
&mut match_positions,
&mut last_positions,
&mut score_matrix,
&mut best_position_matrix,
&cancel_flag,
);
}
if tree_end >= segment_end {
break;
}
tree_start = tree_end;
}
if tree_end >= segment_end {
break;
}
tree_start = tree_end;
}
})
}
})
.await;
})
}
})
.await;
let mut results = Vec::new();
for segment_result in segment_results {