Introduce the ability of creating rooms on the server

This commit is contained in:
Antonio Scandurra 2022-09-23 15:05:32 +02:00
parent 0b1e372d11
commit ebb5ffcedc
12 changed files with 302 additions and 128 deletions

View file

@ -1,9 +1,11 @@
use crate::call::Call;
use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
use anyhow::{anyhow, Context, Result};
use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
use futures::{channel::mpsc, future, AsyncReadExt, Future, Stream, StreamExt};
use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
use postage::{prelude::Stream, sink::Sink, watch};
use postage::{broadcast, sink::Sink, watch};
use rpc::proto::{RequestMessage, UsersResponse};
use std::sync::{Arc, Weak};
use util::TryFutureExt as _;
@ -66,6 +68,7 @@ pub struct UserStore {
outgoing_contact_requests: Vec<Arc<User>>,
pending_contact_requests: HashMap<u64, usize>,
invite_info: Option<InviteInfo>,
incoming_calls: broadcast::Sender<Call>,
client: Weak<Client>,
http: Arc<dyn HttpClient>,
_maintain_contacts: Task<()>,
@ -116,6 +119,7 @@ impl UserStore {
client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
client.add_message_handler(cx.handle(), Self::handle_show_contacts),
];
let (incoming_calls, _) = broadcast::channel(32);
Self {
users: Default::default(),
current_user: current_user_rx,
@ -123,6 +127,7 @@ impl UserStore {
incoming_contact_requests: Default::default(),
outgoing_contact_requests: Default::default(),
invite_info: None,
incoming_calls,
client: Arc::downgrade(&client),
update_contacts_tx,
http,
@ -138,7 +143,7 @@ impl UserStore {
}),
_maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
let mut status = client.status();
while let Some(status) = status.recv().await {
while let Some(status) = status.next().await {
match status {
Status::Connected { .. } => {
if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
@ -198,6 +203,10 @@ impl UserStore {
self.invite_info.as_ref()
}
pub fn incoming_calls(&self) -> impl 'static + Stream<Item = Call> {
self.incoming_calls.subscribe()
}
async fn handle_update_contacts(
this: ModelHandle<Self>,
message: TypedEnvelope<proto::UpdateContacts>,
@ -493,7 +502,7 @@ impl UserStore {
.unbounded_send(UpdateContacts::Clear(tx))
.unwrap();
async move {
rx.recv().await;
rx.next().await;
}
}
@ -503,7 +512,7 @@ impl UserStore {
.unbounded_send(UpdateContacts::Wait(tx))
.unwrap();
async move {
rx.recv().await;
rx.next().await;
}
}