Remove 2 suffix for client, call, channel

Co-authored-by: Mikayla <mikayla@zed.dev>
This commit is contained in:
Max Brunsfeld 2024-01-03 12:02:14 -08:00
parent 9f99e58834
commit 53bdf6beb3
57 changed files with 962 additions and 9116 deletions

View file

@ -2,13 +2,12 @@ use super::{proto, Client, Status, TypedEnvelope};
use anyhow::{anyhow, Context, Result};
use collections::{hash_map::Entry, HashMap, HashSet};
use feature_flags::FeatureFlagAppExt;
use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
use futures::{channel::mpsc, Future, StreamExt};
use gpui::{AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, Task};
use postage::{sink::Sink, watch};
use rpc::proto::{RequestMessage, UsersResponse};
use std::sync::{Arc, Weak};
use text::ReplicaId;
use util::http::HttpClient;
use util::TryFutureExt as _;
pub type UserId = u64;
@ -20,7 +19,7 @@ pub struct ParticipantIndex(pub u32);
pub struct User {
pub id: UserId,
pub github_login: String,
pub avatar: Option<Arc<ImageData>>,
pub avatar_uri: SharedString,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@ -76,9 +75,8 @@ pub struct UserStore {
pending_contact_requests: HashMap<u64, usize>,
invite_info: Option<InviteInfo>,
client: Weak<Client>,
http: Arc<dyn HttpClient>,
_maintain_contacts: Task<()>,
_maintain_current_user: Task<()>,
_maintain_current_user: Task<Result<()>>,
}
#[derive(Clone)]
@ -103,9 +101,7 @@ pub enum ContactEventKind {
Cancelled,
}
impl Entity for UserStore {
type Event = Event;
}
impl EventEmitter<Event> for UserStore {}
enum UpdateContacts {
Update(proto::UpdateContacts),
@ -114,17 +110,13 @@ enum UpdateContacts {
}
impl UserStore {
pub fn new(
client: Arc<Client>,
http: Arc<dyn HttpClient>,
cx: &mut ModelContext<Self>,
) -> Self {
pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
let (mut current_user_tx, current_user_rx) = watch::channel();
let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
let rpc_subscriptions = vec![
client.add_message_handler(cx.handle(), Self::handle_update_contacts),
client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
client.add_message_handler(cx.handle(), Self::handle_show_contacts),
client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
];
Self {
users: Default::default(),
@ -136,76 +128,71 @@ impl UserStore {
invite_info: None,
client: Arc::downgrade(&client),
update_contacts_tx,
http,
_maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
_maintain_contacts: cx.spawn(|this, mut cx| async move {
let _subscriptions = rpc_subscriptions;
while let Some(message) = update_contacts_rx.next().await {
if let Some(this) = this.upgrade(&cx) {
if let Ok(task) =
this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
.log_err()
.await;
{
task.log_err().await;
} else {
break;
}
}
}),
_maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
_maintain_current_user: cx.spawn(|this, mut cx| async move {
let mut status = client.status();
while let Some(status) = status.next().await {
match status {
Status::Connected { .. } => {
if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
let fetch_user = this
.update(&mut cx, |this, cx| this.get_user(user_id, cx))
.log_err();
if let Some(user_id) = client.user_id() {
let fetch_user = if let Ok(fetch_user) = this
.update(&mut cx, |this, cx| {
this.get_user(user_id, cx).log_err()
}) {
fetch_user
} else {
break;
};
let fetch_metrics_id =
client.request(proto::GetPrivateUserInfo {}).log_err();
let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
if let Some(info) = info {
cx.update(|cx| {
cx.update(|cx| {
if let Some(info) = info {
cx.update_flags(info.staff, info.flags);
client.telemetry.set_authenticated_user_info(
Some(info.metrics_id.clone()),
info.staff,
cx,
)
});
} else {
cx.read(|cx| {
client
.telemetry
.set_authenticated_user_info(None, false, cx)
});
}
}
})?;
current_user_tx.send(user).await.ok();
this.update(&mut cx, |_, cx| {
cx.notify();
});
this.update(&mut cx, |_, cx| cx.notify())?;
}
}
Status::SignedOut => {
current_user_tx.send(None).await.ok();
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
cx.notify();
this.clear_contacts()
})
.await;
}
this.update(&mut cx, |this, cx| {
cx.notify();
this.clear_contacts()
})?
.await;
}
Status::ConnectionLost => {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
cx.notify();
this.clear_contacts()
})
.await;
}
this.update(&mut cx, |this, cx| {
cx.notify();
this.clear_contacts()
})?
.await;
}
_ => {}
}
}
Ok(())
}),
pending_contact_requests: Default::default(),
}
@ -217,7 +204,7 @@ impl UserStore {
}
async fn handle_update_invite_info(
this: ModelHandle<Self>,
this: Model<Self>,
message: TypedEnvelope<proto::UpdateInviteInfo>,
_: Arc<Client>,
mut cx: AsyncAppContext,
@ -228,17 +215,17 @@ impl UserStore {
count: message.payload.count,
});
cx.notify();
});
})?;
Ok(())
}
async fn handle_show_contacts(
this: ModelHandle<Self>,
this: Model<Self>,
_: TypedEnvelope<proto::ShowContacts>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
Ok(())
}
@ -247,7 +234,7 @@ impl UserStore {
}
async fn handle_update_contacts(
this: ModelHandle<Self>,
this: Model<Self>,
message: TypedEnvelope<proto::UpdateContacts>,
_: Arc<Client>,
mut cx: AsyncAppContext,
@ -256,7 +243,7 @@ impl UserStore {
this.update_contacts_tx
.unbounded_send(UpdateContacts::Update(message.payload))
.unwrap();
});
})?;
Ok(())
}
@ -292,6 +279,9 @@ impl UserStore {
// Users are fetched in parallel above and cached in call to get_users
// No need to paralellize here
let mut updated_contacts = Vec::new();
let this = this
.upgrade()
.ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
for contact in message.contacts {
updated_contacts.push(Arc::new(
Contact::from_proto(contact, &this, &mut cx).await?,
@ -300,18 +290,18 @@ impl UserStore {
let mut incoming_requests = Vec::new();
for request in message.incoming_requests {
incoming_requests.push(
incoming_requests.push({
this.update(&mut cx, |this, cx| {
this.get_user(request.requester_id, cx)
})
.await?,
);
})?
.await?
});
}
let mut outgoing_requests = Vec::new();
for requested_user_id in message.outgoing_requests {
outgoing_requests.push(
this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
.await?,
);
}
@ -378,7 +368,7 @@ impl UserStore {
}
cx.notify();
});
})?;
Ok(())
})
@ -400,12 +390,6 @@ impl UserStore {
&self.incoming_contact_requests
}
pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
self.incoming_contact_requests
.iter()
.any(|user| user.id == user_id)
}
pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
&self.outgoing_contact_requests
}
@ -454,6 +438,12 @@ impl UserStore {
self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
}
pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
self.incoming_contact_requests
.iter()
.any(|user| user.id == user_id)
}
pub fn respond_to_contact_request(
&mut self,
requester_id: u64,
@ -480,7 +470,7 @@ impl UserStore {
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
let client = self.client.upgrade();
cx.spawn_weak(|_, _| async move {
cx.spawn(move |_, _| async move {
client
.ok_or_else(|| anyhow!("can't upgrade client reference"))?
.request(proto::RespondToContactRequest {
@ -502,7 +492,7 @@ impl UserStore {
*self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
cx.notify();
cx.spawn(|this, mut cx| async move {
cx.spawn(move |this, mut cx| async move {
let response = client
.ok_or_else(|| anyhow!("can't upgrade client reference"))?
.request(request)
@ -517,7 +507,7 @@ impl UserStore {
}
}
cx.notify();
});
})?;
response?;
Ok(())
})
@ -560,11 +550,11 @@ impl UserStore {
},
cx,
)
})
})?
.await?;
}
this.read_with(&cx, |this, _| {
this.update(&mut cx, |this, _| {
user_ids
.iter()
.map(|user_id| {
@ -574,7 +564,7 @@ impl UserStore {
.ok_or_else(|| anyhow!("user {} not found", user_id))
})
.collect()
})
})?
})
}
@ -596,18 +586,18 @@ impl UserStore {
cx: &mut ModelContext<Self>,
) -> Task<Result<Arc<User>>> {
if let Some(user) = self.users.get(&user_id).cloned() {
return cx.foreground().spawn(async move { Ok(user) });
return Task::ready(Ok(user));
}
let load_users = self.get_users(vec![user_id], cx);
cx.spawn(|this, mut cx| async move {
cx.spawn(move |this, mut cx| async move {
load_users.await?;
this.update(&mut cx, |this, _| {
this.users
.get(&user_id)
.cloned()
.ok_or_else(|| anyhow!("server responded with no users"))
})
})?
})
}
@ -625,25 +615,22 @@ impl UserStore {
cx: &mut ModelContext<Self>,
) -> Task<Result<Vec<Arc<User>>>> {
let client = self.client.clone();
let http = self.http.clone();
cx.spawn_weak(|this, mut cx| async move {
cx.spawn(|this, mut cx| async move {
if let Some(rpc) = client.upgrade() {
let response = rpc.request(request).await.context("error loading users")?;
let users = future::join_all(
response
.users
.into_iter()
.map(|user| User::new(user, http.as_ref())),
)
.await;
let users = response
.users
.into_iter()
.map(|user| User::new(user))
.collect::<Vec<_>>();
this.update(&mut cx, |this, _| {
for user in &users {
this.users.insert(user.id, user.clone());
}
})
.ok();
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, _| {
for user in &users {
this.users.insert(user.id, user.clone());
}
});
}
Ok(users)
} else {
Ok(Vec::new())
@ -668,11 +655,11 @@ impl UserStore {
}
impl User {
async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
fn new(message: proto::User) -> Arc<Self> {
Arc::new(User {
id: message.id,
github_login: message.github_login,
avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
avatar_uri: message.avatar_url.into(),
})
}
}
@ -680,13 +667,13 @@ impl User {
impl Contact {
async fn from_proto(
contact: proto::Contact,
user_store: &ModelHandle<UserStore>,
user_store: &Model<UserStore>,
cx: &mut AsyncAppContext,
) -> Result<Self> {
let user = user_store
.update(cx, |user_store, cx| {
user_store.get_user(contact.user_id, cx)
})
})?
.await?;
Ok(Self {
user,
@ -705,24 +692,3 @@ impl Collaborator {
})
}
}
async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
let mut response = http
.get(url, Default::default(), true)
.await
.map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
if !response.status().is_success() {
return Err(anyhow!("avatar request failed {:?}", response.status()));
}
let mut body = Vec::new();
response
.body_mut()
.read_to_end(&mut body)
.await
.map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
let format = image::guess_format(&body)?;
let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
Ok(ImageData::new(image))
}