deps: Bump smol to 2.0 (#22956)

The collateral of this is that code size is increased by ~300kB, but I
think we can stomach it.

Release Notes:

- N/A
This commit is contained in:
Piotr Osiewicz 2025-01-10 14:38:00 +01:00 committed by GitHub
parent 1f84c1b6c7
commit 9e113bccd0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 187 additions and 330 deletions

View file

@ -1958,8 +1958,8 @@ mod tests {
});
let server = FakeServer::for_client(user_id, &client, cx).await;
let (done_tx1, mut done_rx1) = smol::channel::unbounded();
let (done_tx2, mut done_rx2) = smol::channel::unbounded();
let (done_tx1, done_rx1) = smol::channel::unbounded();
let (done_tx2, done_rx2) = smol::channel::unbounded();
AnyProtoClient::from(client.clone()).add_model_message_handler(
move |model: Model<TestModel>, _: TypedEnvelope<proto::JoinProject>, mut cx| {
match model.update(&mut cx, |model, _| model.id).unwrap() {
@ -2001,8 +2001,8 @@ mod tests {
server.send(proto::JoinProject { project_id: 1 });
server.send(proto::JoinProject { project_id: 2 });
done_rx1.next().await.unwrap();
done_rx2.next().await.unwrap();
done_rx1.recv().await.unwrap();
done_rx2.recv().await.unwrap();
}
#[gpui::test]
@ -2020,7 +2020,7 @@ mod tests {
let model = cx.new_model(|_| TestModel::default());
let (done_tx1, _done_rx1) = smol::channel::unbounded();
let (done_tx2, mut done_rx2) = smol::channel::unbounded();
let (done_tx2, done_rx2) = smol::channel::unbounded();
let subscription1 = client.add_message_handler(
model.downgrade(),
move |_, _: TypedEnvelope<proto::Ping>, _| {
@ -2037,7 +2037,7 @@ mod tests {
},
);
server.send(proto::Ping {});
done_rx2.next().await.unwrap();
done_rx2.recv().await.unwrap();
}
#[gpui::test]
@ -2054,7 +2054,7 @@ mod tests {
let server = FakeServer::for_client(user_id, &client, cx).await;
let model = cx.new_model(|_| TestModel::default());
let (done_tx, mut done_rx) = smol::channel::unbounded();
let (done_tx, done_rx) = smol::channel::unbounded();
let subscription = client.add_message_handler(
model.clone().downgrade(),
move |model: Model<TestModel>, _: TypedEnvelope<proto::Ping>, mut cx| {
@ -2069,7 +2069,7 @@ mod tests {
model.subscription = Some(subscription);
});
server.send(proto::Ping {});
done_rx.next().await.unwrap();
done_rx.recv().await.unwrap();
}
#[derive(Default)]

View file

@ -4936,7 +4936,7 @@ async fn test_project_search(
// Perform a search as the guest.
let mut results = HashMap::default();
let mut search_rx = project_b.update(cx_b, |project, cx| {
let search_rx = project_b.update(cx_b, |project, cx| {
project.search(
SearchQuery::text(
"world",
@ -4951,7 +4951,7 @@ async fn test_project_search(
cx,
)
});
while let Some(result) = search_rx.next().await {
while let Ok(result) = search_rx.recv().await {
match result {
SearchResult::Buffer { buffer, ranges } => {
results.entry(buffer).or_insert(ranges);

View file

@ -6,7 +6,6 @@ use call::ActiveCall;
use collections::{BTreeMap, HashMap};
use editor::Bias;
use fs::{FakeFs, Fs as _};
use futures::StreamExt;
use git::repository::GitFileStatus;
use gpui::{BackgroundExecutor, Model, TestAppContext};
use language::{
@ -873,7 +872,7 @@ impl RandomizedTest for ProjectCollaborationTest {
if detach { "detaching" } else { "awaiting" }
);
let mut search = project.update(cx, |project, cx| {
let search = project.update(cx, |project, cx| {
project.search(
SearchQuery::text(
query,
@ -891,7 +890,7 @@ impl RandomizedTest for ProjectCollaborationTest {
drop(project);
let search = cx.executor().spawn(async move {
let mut results = HashMap::default();
while let Some(result) = search.next().await {
while let Ok(result) = search.recv().await {
if let SearchResult::Buffer { buffer, ranges } = result {
results.entry(buffer).or_insert(ranges);
}

View file

@ -1007,7 +1007,7 @@ impl FakeFs {
const SYSTEMTIME_INTERVAL: Duration = Duration::from_nanos(100);
pub fn new(executor: gpui::BackgroundExecutor) -> Arc<Self> {
let (tx, mut rx) = smol::channel::bounded::<PathBuf>(10);
let (tx, rx) = smol::channel::bounded::<PathBuf>(10);
let this = Arc::new_cyclic(|this| Self {
this: this.clone(),
@ -1035,7 +1035,7 @@ impl FakeFs {
executor.spawn({
let this = this.clone();
async move {
while let Some(git_event) = rx.next().await {
while let Ok(git_event) = rx.recv().await {
if let Some(mut state) = this.state.try_lock() {
state.emit_event([(git_event, None)]);
} else {

View file

@ -118,7 +118,7 @@ util = { workspace = true, features = ["test-support"] }
http_client = { workspace = true, features = ["test-support"] }
unicode-segmentation.workspace = true
[build-dependencies]
[target.'cfg(target_os = "windows")'.build-dependencies]
embed-resource = "3.0"
[target.'cfg(target_os = "macos")'.build-dependencies]

View file

@ -13,6 +13,7 @@ fn main() {
#[cfg(target_os = "macos")]
macos::build();
}
#[cfg(target_os = "windows")]
Ok("windows") => {
let manifest = std::path::Path::new("resources/windows/gpui.manifest.xml");
let rc_file = std::path::Path::new("resources/windows/gpui.rc");

View file

@ -32,6 +32,7 @@ use smol::channel;
use std::{
env,
panic::{self, RefUnwindSafe},
pin::Pin,
};
/// Run the given test function with the configured parameters.
@ -85,7 +86,7 @@ pub fn run_test(
/// A test struct for converting an observation callback into a stream.
pub struct Observation<T> {
rx: channel::Receiver<T>,
rx: Pin<Box<channel::Receiver<T>>>,
_subscription: Subscription,
}
@ -108,6 +109,7 @@ pub fn observe<T: 'static>(entity: &impl Entity<T>, cx: &mut TestAppContext) ->
let _ = smol::block_on(tx.send(()));
})
});
let rx = Box::pin(rx);
Observation { rx, _subscription }
}

View file

@ -1396,10 +1396,8 @@ impl FakeLanguageServer {
pub async fn try_receive_notification<T: notification::Notification>(
&mut self,
) -> Option<T::Params> {
use futures::StreamExt as _;
loop {
let (method, params) = self.notifications_rx.next().await?;
let (method, params) = self.notifications_rx.recv().await.ok()?;
if method == T::METHOD {
return Some(serde_json::from_str::<T::Params>(&params).unwrap());
} else {

View file

@ -30,6 +30,7 @@ use std::{
io,
ops::Range,
path::{Path, PathBuf},
pin::pin,
str::FromStr as _,
sync::Arc,
time::Instant,
@ -1483,7 +1484,7 @@ impl BufferStore {
}
const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
let mut project_paths_rx = self
let project_paths_rx = self
.worktree_store
.update(cx, |worktree_store, cx| {
worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
@ -1495,6 +1496,7 @@ impl BufferStore {
tx.send(buffer).await.ok();
}
let mut project_paths_rx = pin!(project_paths_rx);
while let Some(project_paths) = project_paths_rx.next().await {
let buffers = this.update(&mut cx, |this, cx| {
project_paths

View file

@ -818,7 +818,7 @@ impl LocalLspStore {
let name = name.to_string();
async move {
let actions = params.actions.unwrap_or_default();
let (tx, mut rx) = smol::channel::bounded(1);
let (tx, rx) = smol::channel::bounded(1);
let request = LanguageServerPromptRequest {
level: match params.typ {
lsp::MessageType::ERROR => PromptLevel::Critical,
@ -837,9 +837,9 @@ impl LocalLspStore {
})
.is_ok();
if did_update {
let response = rx.next().await;
let response = rx.recv().await?;
Ok(response)
Ok(Some(response))
} else {
Ok(None)
}

View file

@ -79,6 +79,7 @@ use std::{
borrow::Cow,
ops::Range,
path::{Component, Path, PathBuf},
pin::pin,
str,
sync::Arc,
time::Duration,
@ -3019,6 +3020,7 @@ impl Project {
// 64 buffers at a time to avoid overwhelming the main thread. For each
// opened buffer, we will spawn a background task that retrieves all the
// ranges in the buffer matched by the query.
let mut chunks = pin!(chunks);
'outer: while let Some(matching_buffer_chunk) = chunks.next().await {
let mut chunk_results = Vec::new();
for buffer in matching_buffer_chunk {
@ -3748,6 +3750,7 @@ impl Project {
// next `flush_effects()` call.
drop(this);
let mut rx = pin!(rx);
let answer = rx.next().await;
Ok(LanguageServerPromptResponse {
@ -3889,7 +3892,7 @@ impl Project {
.query
.ok_or_else(|| anyhow!("missing query field"))?,
)?;
let mut results = this.update(&mut cx, |this, cx| {
let results = this.update(&mut cx, |this, cx| {
this.find_search_candidate_buffers(&query, message.limit as _, cx)
})?;
@ -3897,7 +3900,7 @@ impl Project {
buffer_ids: Vec::new(),
};
while let Some(buffer) = results.next().await {
while let Ok(buffer) = results.recv().await {
this.update(&mut cx, |this, cx| {
let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
response.buffer_ids.push(buffer_id.to_proto());

View file

@ -5657,9 +5657,9 @@ async fn search(
query: SearchQuery,
cx: &mut gpui::TestAppContext,
) -> Result<HashMap<String, Vec<Range<usize>>>> {
let mut search_rx = project.update(cx, |project, cx| project.search(query, cx));
let search_rx = project.update(cx, |project, cx| project.search(query, cx));
let mut results = HashMap::default();
while let Some(search_result) = search_rx.next().await {
while let Ok(search_result) = search_rx.recv().await {
match search_result {
SearchResult::Buffer { buffer, ranges } => {
results.entry(buffer).or_insert(ranges);

View file

@ -1,5 +1,6 @@
use std::{
path::{Path, PathBuf},
pin::pin,
sync::{atomic::AtomicUsize, Arc},
};
@ -648,7 +649,7 @@ impl WorktreeStore {
// We spawn a number of workers that take items from the filter channel and check the query
// against the version of the file on disk.
let (filter_tx, filter_rx) = smol::channel::bounded(64);
let (output_tx, mut output_rx) = smol::channel::bounded(64);
let (output_tx, output_rx) = smol::channel::bounded(64);
let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
let input = cx.background_executor().spawn({
@ -685,7 +686,7 @@ impl WorktreeStore {
cx.background_executor()
.spawn(async move {
let mut matched = 0;
while let Some(mut receiver) = output_rx.next().await {
while let Ok(mut receiver) = output_rx.recv().await {
let Some(path) = receiver.next().await else {
continue;
};
@ -990,6 +991,7 @@ impl WorktreeStore {
mut input: Receiver<MatchingEntry>,
query: &SearchQuery,
) -> Result<()> {
let mut input = pin!(input);
while let Some(mut entry) = input.next().await {
let abs_path = entry.worktree_path.join(&entry.path.path);
let Some(file) = fs.open_sync(&abs_path).await.log_err() else {

View file

@ -515,7 +515,7 @@ impl HeadlessProject {
.query
.ok_or_else(|| anyhow!("missing query field"))?,
)?;
let mut results = this.update(&mut cx, |this, cx| {
let results = this.update(&mut cx, |this, cx| {
this.buffer_store.update(cx, |buffer_store, cx| {
buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
})
@ -527,7 +527,7 @@ impl HeadlessProject {
let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
while let Some(buffer) = results.next().await {
while let Ok(buffer) = results.recv().await {
let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
response.buffer_ids.push(buffer_id.to_proto());
buffer_store

View file

@ -187,7 +187,7 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
cx.run_until_parked();
async fn do_search(project: &Model<Project>, mut cx: TestAppContext) -> Model<Buffer> {
let mut receiver = project.update(&mut cx, |project, cx| {
let receiver = project.update(&mut cx, |project, cx| {
project.search(
SearchQuery::text(
"project",
@ -203,7 +203,7 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
)
});
let first_response = receiver.next().await.unwrap();
let first_response = receiver.recv().await.unwrap();
let SearchResult::Buffer { buffer, .. } = first_response else {
panic!("incorrect result");
};
@ -214,7 +214,7 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
)
});
assert!(receiver.next().await.is_none());
assert!(receiver.recv().await.is_err());
buffer
}

View file

@ -249,7 +249,7 @@ impl ServerListeners {
fn start_server(
listeners: ServerListeners,
mut log_rx: Receiver<Vec<u8>>,
log_rx: Receiver<Vec<u8>>,
cx: &mut AppContext,
) -> Arc<ChannelClient> {
// This is the server idle timeout. If no connection comes in in this timeout, the server will shut down.
@ -351,8 +351,8 @@ fn start_server(
}
}
log_message = log_rx.next().fuse() => {
if let Some(log_message) = log_message {
log_message = log_rx.recv().fuse() => {
if let Ok(log_message) = log_message {
if let Err(error) = stderr_stream.write_all(&log_message).await {
log::error!("failed to write log message to stderr: {:?}", error);
break;

View file

@ -29,6 +29,7 @@ use std::{
mem,
ops::{Not, Range},
path::Path,
pin::pin,
};
use theme::ThemeSettings;
use ui::{
@ -249,7 +250,7 @@ impl ProjectSearch {
self.active_query = Some(query);
self.match_ranges.clear();
self.pending_search = Some(cx.spawn(|this, mut cx| async move {
let mut matches = search.ready_chunks(1024);
let mut matches = pin!(search.ready_chunks(1024));
let this = this.upgrade()?;
this.update(&mut cx, |this, cx| {
this.match_ranges.clear();

View file

@ -8,7 +8,7 @@ use collections::Bound;
use feature_flags::FeatureFlagAppExt;
use fs::Fs;
use fs::MTime;
use futures::stream::StreamExt;
use futures::{stream::StreamExt, FutureExt as _};
use futures_batch::ChunksTimeoutStreamExt;
use gpui::{AppContext, Model, Task};
use heed::types::{SerdeBincode, Str};
@ -17,8 +17,7 @@ use log;
use project::{Entry, UpdatedEntriesSet, Worktree};
use serde::{Deserialize, Serialize};
use smol::channel;
use smol::future::FutureExt;
use std::{cmp::Ordering, future::Future, iter, path::Path, sync::Arc, time::Duration};
use std::{cmp::Ordering, future::Future, iter, path::Path, pin::pin, sync::Arc, time::Duration};
use util::ResultExt;
use worktree::Snapshot;
@ -284,7 +283,7 @@ impl EmbeddingIndex {
let (embedded_files_tx, embedded_files_rx) = channel::bounded(512);
let task = cx.background_executor().spawn(async move {
let mut chunked_file_batches =
chunked_files.chunks_timeout(512, Duration::from_secs(2));
pin!(chunked_files.chunks_timeout(512, Duration::from_secs(2)));
while let Some(chunked_files) = chunked_file_batches.next().await {
// View the batch of files as a vec of chunks
// Flatten out to a vec of chunks that we can subdivide into batch sized pieces
@ -358,14 +357,16 @@ impl EmbeddingIndex {
fn persist_embeddings(
&self,
mut deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
mut embedded_files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
embedded_files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
cx: &AppContext,
) -> Task<Result<()>> {
let db_connection = self.db_connection.clone();
let db = self.db;
cx.background_executor().spawn(async move {
let mut deleted_entry_ranges = pin!(deleted_entry_ranges);
let mut embedded_files = pin!(embedded_files);
loop {
// Interleave deletions and persists of embedded files
futures::select_biased! {

View file

@ -6,7 +6,7 @@ use crate::{
use anyhow::{anyhow, Context, Result};
use collections::HashMap;
use fs::Fs;
use futures::{stream::StreamExt, FutureExt};
use futures::FutureExt;
use gpui::{
AppContext, Entity, EntityId, EventEmitter, Model, ModelContext, Subscription, Task, WeakModel,
};
@ -80,7 +80,7 @@ impl ProjectIndex {
) -> Self {
let language_registry = project.read(cx).languages().clone();
let fs = project.read(cx).fs().clone();
let (status_tx, mut status_rx) = channel::unbounded();
let (status_tx, status_rx) = channel::unbounded();
let mut this = ProjectIndex {
db_connection,
project: project.downgrade(),
@ -92,7 +92,7 @@ impl ProjectIndex {
embedding_provider,
_subscription: cx.subscribe(&project, Self::handle_project_event),
_maintain_status: cx.spawn(|this, mut cx| async move {
while status_rx.next().await.is_some() {
while status_rx.recv().await.is_ok() {
if this
.update(&mut cx, |this, cx| this.update_status(cx))
.is_err()

View file

@ -278,7 +278,7 @@ mod tests {
use project::{Project, ProjectEntryId};
use serde_json::json;
use settings::SettingsStore;
use smol::{channel, stream::StreamExt};
use smol::channel;
use std::{future, path::Path, sync::Arc};
fn init_test(cx: &mut TestAppContext) {
@ -496,9 +496,9 @@ mod tests {
cx.update(|cx| EmbeddingIndex::embed_files(provider.clone(), chunked_files_rx, cx));
embed_files_task.task.await.unwrap();
let mut embedded_files_rx = embed_files_task.files;
let embedded_files_rx = embed_files_task.files;
let mut embedded_files = Vec::new();
while let Some((embedded_file, _)) = embedded_files_rx.next().await {
while let Ok((embedded_file, _)) = embedded_files_rx.recv().await {
embedded_files.push(embedded_file);
}

View file

@ -20,6 +20,7 @@ use smol::channel;
use std::{
future::Future,
path::Path,
pin::pin,
sync::Arc,
time::{Duration, Instant},
};
@ -247,13 +248,14 @@ impl SummaryIndex {
fn check_summary_cache(
&self,
mut might_need_summary: channel::Receiver<UnsummarizedFile>,
might_need_summary: channel::Receiver<UnsummarizedFile>,
cx: &AppContext,
) -> NeedsSummary {
let db_connection = self.db_connection.clone();
let db = self.summary_db;
let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
let task = cx.background_executor().spawn(async move {
let mut might_need_summary = pin!(might_need_summary);
while let Some(file) = might_need_summary.next().await {
let tx = db_connection
.read_txn()
@ -484,12 +486,12 @@ impl SummaryIndex {
fn summarize_files(
&self,
mut unsummarized_files: channel::Receiver<UnsummarizedFile>,
unsummarized_files: channel::Receiver<UnsummarizedFile>,
cx: &AppContext,
) -> SummarizeFiles {
let (summarized_tx, summarized_rx) = channel::bounded(512);
let task = cx.spawn(|cx| async move {
while let Some(file) = unsummarized_files.next().await {
while let Ok(file) = unsummarized_files.recv().await {
log::debug!("Summarizing {:?}", file);
let summary = cx
.update(|cx| Self::summarize_code(&file.contents, &file.path, cx))?
@ -607,7 +609,7 @@ impl SummaryIndex {
let digest_db = self.file_digest_db;
let summary_db = self.summary_db;
cx.background_executor().spawn(async move {
let mut summaries = summaries.chunks_timeout(4096, Duration::from_secs(2));
let mut summaries = pin!(summaries.chunks_timeout(4096, Duration::from_secs(2)));
while let Some(summaries) = summaries.next().await {
let mut txn = db_connection.write_txn()?;
for file in &summaries {

View file

@ -1731,9 +1731,9 @@ impl Terminal {
pub fn wait_for_completed_task(&self, cx: &AppContext) -> Task<()> {
if let Some(task) = self.task() {
if task.status == TaskStatus::Running {
let mut completion_receiver = task.completion_rx.clone();
let completion_receiver = task.completion_rx.clone();
return cx.spawn(|_| async move {
completion_receiver.next().await;
let _ = completion_receiver.recv().await;
});
}
}

View file

@ -4411,7 +4411,7 @@ impl BackgroundScanner {
}
async fn forcibly_load_paths(&self, paths: &[Arc<Path>]) -> bool {
let (scan_job_tx, mut scan_job_rx) = channel::unbounded();
let (scan_job_tx, scan_job_rx) = channel::unbounded();
{
let mut state = self.state.lock();
let root_path = state.snapshot.abs_path.clone();
@ -4429,7 +4429,7 @@ impl BackgroundScanner {
}
drop(scan_job_tx);
}
while let Some(job) = scan_job_rx.next().await {
while let Ok(job) = scan_job_rx.recv().await {
self.scan_dir(&job).await.log_err();
}