Move "async move" a few characters to the left in cx.spawn() (#26758)

This is the core change:
https://github.com/zed-industries/zed/pull/26758/files#diff-044302c0d57147af17e68a0009fee3e8dcdfb4f32c27a915e70cfa80e987f765R1052

TODO:
- [x] Use AsyncFn instead of Fn() -> Future in GPUI spawn methods
- [x] Implement it in the whole app
- [x] Implement it in the debugger 
- [x] Glance at the RPC crate, and see if those box future methods can
be switched over. Answer: It can't directly, as you can't make an
AsyncFn* into a trait object. There's ways around that, but they're all
more complex than just keeping the code as is.
- [ ] Fix platform specific code

Release Notes:

- N/A
This commit is contained in:
Mikayla Maki 2025-03-18 19:09:02 -07:00 committed by GitHub
parent 7f2e3fb5bd
commit 1aefa5178b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
256 changed files with 3110 additions and 3200 deletions

View file

@ -47,7 +47,7 @@ impl ChannelBuffer {
client: Arc<Client>,
user_store: Entity<UserStore>,
channel_store: Entity<ChannelStore>,
mut cx: AsyncApp,
cx: &mut AsyncApp,
) -> Result<Entity<Self>> {
let response = client
.request(proto::JoinChannelBuffer {
@ -66,7 +66,7 @@ impl ChannelBuffer {
let capability = channel_store.read(cx).channel_capability(channel.id);
language::Buffer::remote(buffer_id, response.replica_id as u16, capability, base_text)
})?;
buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
let subscription = client.subscribe_to_entity(channel.id.0)?;
@ -208,7 +208,7 @@ impl ChannelBuffer {
let client = self.client.clone();
let epoch = self.epoch();
self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
self.acknowledge_task = Some(cx.spawn(async move |_, cx| {
cx.background_executor()
.timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
.await;

View file

@ -106,7 +106,7 @@ impl ChannelChat {
channel_store: Entity<ChannelStore>,
user_store: Entity<UserStore>,
client: Arc<Client>,
mut cx: AsyncApp,
cx: &mut AsyncApp,
) -> Result<Entity<Self>> {
let channel_id = channel.id;
let subscription = client.subscribe_to_entity(channel_id.0).unwrap();
@ -132,7 +132,7 @@ impl ChannelChat {
last_acknowledged_id: None,
rng: StdRng::from_entropy(),
first_loaded_message_id: None,
_subscription: subscription.set_entity(&cx.entity(), &mut cx.to_async()),
_subscription: subscription.set_entity(&cx.entity(), &cx.to_async()),
}
})?;
Self::handle_loaded_messages(
@ -141,7 +141,7 @@ impl ChannelChat {
client,
response.messages,
response.done,
&mut cx,
cx,
)
.await?;
Ok(handle)
@ -205,7 +205,7 @@ impl ChannelChat {
let outgoing_messages_lock = self.outgoing_messages_lock.clone();
// todo - handle messages that fail to send (e.g. >1024 chars)
Ok(cx.spawn(move |this, mut cx| async move {
Ok(cx.spawn(async move |this, cx| {
let outgoing_message_guard = outgoing_messages_lock.lock().await;
let request = rpc.request(proto::SendChannelMessage {
channel_id: channel_id.0,
@ -218,8 +218,8 @@ impl ChannelChat {
drop(outgoing_message_guard);
let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
let id = response.id;
let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
this.update(&mut cx, |this, cx| {
let message = ChannelMessage::from_proto(response, &user_store, cx).await?;
this.update(cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx);
if this.first_loaded_message_id.is_none() {
this.first_loaded_message_id = Some(id);
@ -234,9 +234,9 @@ impl ChannelChat {
channel_id: self.channel_id.0,
message_id: id,
});
cx.spawn(move |this, mut cx| async move {
cx.spawn(async move |this, cx| {
response.await?;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
this.message_removed(id, cx);
})?;
Ok(())
@ -266,7 +266,7 @@ impl ChannelChat {
nonce: Some(nonce.into()),
mentions: mentions_to_proto(&message.mentions),
});
Ok(cx.spawn(move |_, _| async move {
Ok(cx.spawn(async move |_, _| {
request.await?;
Ok(())
}))
@ -281,7 +281,7 @@ impl ChannelChat {
let user_store = self.user_store.clone();
let channel_id = self.channel_id;
let before_message_id = self.first_loaded_message_id()?;
Some(cx.spawn(move |this, mut cx| {
Some(cx.spawn(async move |this, cx| {
async move {
let response = rpc
.request(proto::GetChannelMessages {
@ -295,13 +295,14 @@ impl ChannelChat {
rpc,
response.messages,
response.done,
&mut cx,
cx,
)
.await?;
anyhow::Ok(())
}
.log_err()
.await
}))
}
@ -439,7 +440,7 @@ impl ChannelChat {
let user_store = self.user_store.clone();
let rpc = self.rpc.clone();
let channel_id = self.channel_id;
cx.spawn(move |this, mut cx| {
cx.spawn(async move |this, cx| {
async move {
let response = rpc
.request(proto::JoinChannelChat {
@ -452,11 +453,11 @@ impl ChannelChat {
rpc.clone(),
response.messages,
response.done,
&mut cx,
cx,
)
.await?;
let pending_messages = this.update(&mut cx, |this, _| {
let pending_messages = this.update(cx, |this, _| {
this.pending_messages().cloned().collect::<Vec<_>>()
})?;
@ -472,10 +473,10 @@ impl ChannelChat {
let message = ChannelMessage::from_proto(
response.message.ok_or_else(|| anyhow!("invalid message"))?,
&user_store,
&mut cx,
cx,
)
.await?;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx);
})?;
}
@ -483,6 +484,7 @@ impl ChannelChat {
anyhow::Ok(())
}
.log_err()
.await
})
.detach();
}

View file

@ -164,22 +164,22 @@ impl ChannelStore {
let mut connection_status = client.status();
let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
let watch_connection_status = cx.spawn(|this, mut cx| async move {
let watch_connection_status = cx.spawn(async move |this, cx| {
while let Some(status) = connection_status.next().await {
let this = this.upgrade()?;
match status {
client::Status::Connected { .. } => {
this.update(&mut cx, |this, cx| this.handle_connect(cx))
this.update(cx, |this, cx| this.handle_connect(cx))
.ok()?
.await
.log_err()?;
}
client::Status::SignedOut | client::Status::UpgradeRequired => {
this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
this.update(cx, |this, cx| this.handle_disconnect(false, cx))
.ok();
}
_ => {
this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
this.update(cx, |this, cx| this.handle_disconnect(true, cx))
.ok();
}
}
@ -200,13 +200,12 @@ impl ChannelStore {
_rpc_subscriptions: rpc_subscriptions,
_watch_connection_status: watch_connection_status,
disconnect_channel_buffers_task: None,
_update_channels: cx.spawn(|this, mut cx| async move {
_update_channels: cx.spawn(async move |this, cx| {
maybe!(async move {
while let Some(update_channels) = update_channels_rx.next().await {
if let Some(this) = this.upgrade() {
let update_task = this.update(&mut cx, |this, cx| {
this.update_channels(update_channels, cx)
})?;
let update_task = this
.update(cx, |this, cx| this.update_channels(update_channels, cx))?;
if let Some(update_task) = update_task {
update_task.await.log_err();
}
@ -310,7 +309,9 @@ impl ChannelStore {
self.open_channel_resource(
channel_id,
|this| &mut this.opened_buffers,
|channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
async move |channel, cx| {
ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
},
cx,
)
}
@ -328,14 +329,14 @@ impl ChannelStore {
.request(proto::GetChannelMessagesById { message_ids }),
)
};
cx.spawn(|this, mut cx| async move {
cx.spawn(async move |this, cx| {
if let Some(request) = request {
let response = request.await?;
let this = this
.upgrade()
.ok_or_else(|| anyhow!("channel store dropped"))?;
let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
let user_store = this.update(cx, |this, _| this.user_store.clone())?;
ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
} else {
Ok(Vec::new())
}
@ -440,7 +441,7 @@ impl ChannelStore {
self.open_channel_resource(
channel_id,
|this| &mut this.opened_chats,
|channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
cx,
)
}
@ -450,7 +451,7 @@ impl ChannelStore {
/// Make sure that the resource is only opened once, even if this method
/// is called multiple times with the same channel id while the first task
/// is still running.
fn open_channel_resource<T, F, Fut>(
fn open_channel_resource<T, F>(
&mut self,
channel_id: ChannelId,
get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
@ -458,8 +459,7 @@ impl ChannelStore {
cx: &mut Context<Self>,
) -> Task<Result<Entity<T>>>
where
F: 'static + FnOnce(Arc<Channel>, AsyncApp) -> Fut,
Fut: Future<Output = Result<Entity<T>>>,
F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
T: 'static,
{
let task = loop {
@ -479,8 +479,8 @@ impl ChannelStore {
},
hash_map::Entry::Vacant(e) => {
let task = cx
.spawn(move |this, mut cx| async move {
let channel = this.update(&mut cx, |this, _| {
.spawn(async move |this, cx| {
let channel = this.update(cx, |this, _| {
this.channel_for_id(channel_id).cloned().ok_or_else(|| {
Arc::new(anyhow!("no channel for id: {}", channel_id))
})
@ -493,9 +493,9 @@ impl ChannelStore {
e.insert(OpenEntityHandle::Loading(task.clone()));
cx.spawn({
let task = task.clone();
move |this, mut cx| async move {
async move |this, cx| {
let result = task.await;
this.update(&mut cx, |this, _| match result {
this.update(cx, |this, _| match result {
Ok(model) => {
get_map(this).insert(
channel_id,
@ -570,7 +570,7 @@ impl ChannelStore {
) -> Task<Result<ChannelId>> {
let client = self.client.clone();
let name = name.trim_start_matches('#').to_owned();
cx.spawn(move |this, mut cx| async move {
cx.spawn(async move |this, cx| {
let response = client
.request(proto::CreateChannel {
name,
@ -583,7 +583,7 @@ impl ChannelStore {
.ok_or_else(|| anyhow!("missing channel in response"))?;
let channel_id = ChannelId(channel.id);
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
let task = this.update_channels(
proto::UpdateChannels {
channels: vec![channel],
@ -611,7 +611,7 @@ impl ChannelStore {
cx: &mut Context<Self>,
) -> Task<Result<()>> {
let client = self.client.clone();
cx.spawn(move |_, _| async move {
cx.spawn(async move |_, _| {
let _ = client
.request(proto::MoveChannel {
channel_id: channel_id.0,
@ -630,7 +630,7 @@ impl ChannelStore {
cx: &mut Context<Self>,
) -> Task<Result<()>> {
let client = self.client.clone();
cx.spawn(move |_, _| async move {
cx.spawn(async move |_, _| {
let _ = client
.request(proto::SetChannelVisibility {
channel_id: channel_id.0,
@ -655,7 +655,7 @@ impl ChannelStore {
cx.notify();
let client = self.client.clone();
cx.spawn(move |this, mut cx| async move {
cx.spawn(async move |this, cx| {
let result = client
.request(proto::InviteChannelMember {
channel_id: channel_id.0,
@ -664,7 +664,7 @@ impl ChannelStore {
})
.await;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
this.outgoing_invites.remove(&(channel_id, user_id));
cx.notify();
})?;
@ -687,7 +687,7 @@ impl ChannelStore {
cx.notify();
let client = self.client.clone();
cx.spawn(move |this, mut cx| async move {
cx.spawn(async move |this, cx| {
let result = client
.request(proto::RemoveChannelMember {
channel_id: channel_id.0,
@ -695,7 +695,7 @@ impl ChannelStore {
})
.await;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
this.outgoing_invites.remove(&(channel_id, user_id));
cx.notify();
})?;
@ -717,7 +717,7 @@ impl ChannelStore {
cx.notify();
let client = self.client.clone();
cx.spawn(move |this, mut cx| async move {
cx.spawn(async move |this, cx| {
let result = client
.request(proto::SetChannelMemberRole {
channel_id: channel_id.0,
@ -726,7 +726,7 @@ impl ChannelStore {
})
.await;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
this.outgoing_invites.remove(&(channel_id, user_id));
cx.notify();
})?;
@ -744,7 +744,7 @@ impl ChannelStore {
) -> Task<Result<()>> {
let client = self.client.clone();
let name = new_name.to_string();
cx.spawn(move |this, mut cx| async move {
cx.spawn(async move |this, cx| {
let channel = client
.request(proto::RenameChannel {
channel_id: channel_id.0,
@ -753,7 +753,7 @@ impl ChannelStore {
.await?
.channel
.ok_or_else(|| anyhow!("missing channel in response"))?;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
let task = this.update_channels(
proto::UpdateChannels {
channels: vec![channel],
@ -799,7 +799,7 @@ impl ChannelStore {
) -> Task<Result<Vec<ChannelMembership>>> {
let client = self.client.clone();
let user_store = self.user_store.downgrade();
cx.spawn(move |_, mut cx| async move {
cx.spawn(async move |_, cx| {
let response = client
.request(proto::GetChannelMembers {
channel_id: channel_id.0,
@ -807,7 +807,7 @@ impl ChannelStore {
limit: limit as u64,
})
.await?;
user_store.update(&mut cx, |user_store, _| {
user_store.update(cx, |user_store, _| {
user_store.insert(response.users);
response
.members
@ -931,10 +931,10 @@ impl ChannelStore {
buffers: buffer_versions,
});
cx.spawn(|this, mut cx| async move {
cx.spawn(async move |this, cx| {
let mut response = response.await?;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
this.opened_buffers.retain(|_, buffer| match buffer {
OpenEntityHandle::Open(channel_buffer) => {
let Some(channel_buffer) = channel_buffer.upgrade() else {
@ -1006,13 +1006,13 @@ impl ChannelStore {
cx.notify();
self.did_subscribe = false;
self.disconnect_channel_buffers_task.get_or_insert_with(|| {
cx.spawn(move |this, mut cx| async move {
cx.spawn(async move |this, cx| {
if wait_for_reconnect {
cx.background_executor().timer(RECONNECT_TIMEOUT).await;
}
if let Some(this) = this.upgrade() {
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
for (_, buffer) in this.opened_buffers.drain() {
if let OpenEntityHandle::Open(buffer) = buffer {
if let Some(buffer) = buffer.upgrade() {
@ -1136,10 +1136,10 @@ impl ChannelStore {
let users = self
.user_store
.update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
Some(cx.spawn(|this, mut cx| async move {
Some(cx.spawn(async move |this, cx| {
let users = users.await?;
this.update(&mut cx, |this, cx| {
this.update(cx, |this, cx| {
for entry in &channel_participants {
let mut participants: Vec<_> = entry
.participant_user_ids