This commit is contained in:
Max Brunsfeld 2022-03-17 17:53:49 -07:00
parent eda06ee408
commit 0fdaa1d715
10 changed files with 245 additions and 58 deletions

View file

@ -13,8 +13,8 @@ use async_tungstenite::tungstenite::{
};
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use gpui::{
action, AnyModelHandle, AnyWeakModelHandle, AsyncAppContext, Entity, ModelContext, ModelHandle,
MutableAppContext, Task,
action, AnyModelHandle, AnyViewHandle, AnyWeakModelHandle, AnyWeakViewHandle, AsyncAppContext,
Entity, ModelContext, ModelHandle, MutableAppContext, Task, View, ViewContext, ViewHandle,
};
use http::HttpClient;
use lazy_static::lazy_static;
@ -139,16 +139,16 @@ struct ClientState {
entity_id_extractors: HashMap<TypeId, Box<dyn Send + Sync + Fn(&dyn AnyTypedEnvelope) -> u64>>,
_reconnect_task: Option<Task<()>>,
reconnect_interval: Duration,
models_by_entity_type_and_remote_id: HashMap<(TypeId, u64), AnyWeakModelHandle>,
entities_by_type_and_remote_id: HashMap<(TypeId, u64), AnyWeakEntityHandle>,
models_by_message_type: HashMap<TypeId, AnyWeakModelHandle>,
model_types_by_message_type: HashMap<TypeId, TypeId>,
entity_types_by_message_type: HashMap<TypeId, TypeId>,
message_handlers: HashMap<
TypeId,
Arc<
dyn Send
+ Sync
+ Fn(
AnyModelHandle,
AnyEntityHandle,
Box<dyn AnyTypedEnvelope>,
AsyncAppContext,
) -> LocalBoxFuture<'static, Result<()>>,
@ -156,6 +156,16 @@ struct ClientState {
>,
}
enum AnyWeakEntityHandle {
Model(AnyWeakModelHandle),
View(AnyWeakViewHandle),
}
enum AnyEntityHandle {
Model(AnyModelHandle),
View(AnyViewHandle),
}
#[derive(Clone, Debug)]
pub struct Credentials {
pub user_id: u64,
@ -171,8 +181,8 @@ impl Default for ClientState {
_reconnect_task: None,
reconnect_interval: Duration::from_secs(5),
models_by_message_type: Default::default(),
models_by_entity_type_and_remote_id: Default::default(),
model_types_by_message_type: Default::default(),
entities_by_type_and_remote_id: Default::default(),
entity_types_by_message_type: Default::default(),
message_handlers: Default::default(),
}
}
@ -195,13 +205,13 @@ impl Drop for Subscription {
Subscription::Entity { client, id } => {
if let Some(client) = client.upgrade() {
let mut state = client.state.write();
let _ = state.models_by_entity_type_and_remote_id.remove(id);
let _ = state.entities_by_type_and_remote_id.remove(id);
}
}
Subscription::Message { client, id } => {
if let Some(client) = client.upgrade() {
let mut state = client.state.write();
let _ = state.model_types_by_message_type.remove(id);
let _ = state.entity_types_by_message_type.remove(id);
let _ = state.message_handlers.remove(id);
}
}
@ -239,7 +249,7 @@ impl Client {
state._reconnect_task.take();
state.message_handlers.clear();
state.models_by_message_type.clear();
state.models_by_entity_type_and_remote_id.clear();
state.entities_by_type_and_remote_id.clear();
state.entity_id_extractors.clear();
self.peer.reset();
}
@ -313,6 +323,23 @@ impl Client {
}
}
pub fn add_view_for_remote_entity<T: View>(
self: &Arc<Self>,
remote_id: u64,
cx: &mut ViewContext<T>,
) -> Subscription {
let handle = AnyViewHandle::from(cx.handle());
let mut state = self.state.write();
let id = (TypeId::of::<T>(), remote_id);
state
.entities_by_type_and_remote_id
.insert(id, AnyWeakEntityHandle::View(handle.downgrade()));
Subscription::Entity {
client: Arc::downgrade(self),
id,
}
}
pub fn add_model_for_remote_entity<T: Entity>(
self: &Arc<Self>,
remote_id: u64,
@ -322,8 +349,8 @@ impl Client {
let mut state = self.state.write();
let id = (TypeId::of::<T>(), remote_id);
state
.models_by_entity_type_and_remote_id
.insert(id, handle.downgrade());
.entities_by_type_and_remote_id
.insert(id, AnyWeakEntityHandle::Model(handle.downgrade()));
Subscription::Entity {
client: Arc::downgrade(self),
id,
@ -355,6 +382,11 @@ impl Client {
let prev_handler = state.message_handlers.insert(
message_type_id,
Arc::new(move |handle, envelope, cx| {
let handle = if let AnyEntityHandle::Model(handle) = handle {
handle
} else {
unreachable!();
};
let model = handle.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
if let Some(client) = client.upgrade() {
@ -374,7 +406,60 @@ impl Client {
}
}
pub fn add_entity_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
pub fn add_view_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
where
M: EntityMessage,
E: View,
H: 'static
+ Send
+ Sync
+ Fn(ViewHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
F: 'static + Future<Output = Result<()>>,
{
let entity_type_id = TypeId::of::<E>();
let message_type_id = TypeId::of::<M>();
let client = Arc::downgrade(self);
let mut state = self.state.write();
state
.entity_types_by_message_type
.insert(message_type_id, entity_type_id);
state
.entity_id_extractors
.entry(message_type_id)
.or_insert_with(|| {
Box::new(|envelope| {
let envelope = envelope
.as_any()
.downcast_ref::<TypedEnvelope<M>>()
.unwrap();
envelope.payload.remote_entity_id()
})
});
let prev_handler = state.message_handlers.insert(
message_type_id,
Arc::new(move |handle, envelope, cx| {
let handle = if let AnyEntityHandle::View(handle) = handle {
handle
} else {
unreachable!();
};
let model = handle.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
if let Some(client) = client.upgrade() {
handler(model, *envelope, client.clone(), cx).boxed_local()
} else {
async move { Ok(()) }.boxed_local()
}
}),
);
if prev_handler.is_some() {
panic!("registered handler for the same message twice");
}
}
pub fn add_model_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
where
M: EntityMessage,
E: Entity,
@ -390,7 +475,7 @@ impl Client {
let client = Arc::downgrade(self);
let mut state = self.state.write();
state
.model_types_by_message_type
.entity_types_by_message_type
.insert(message_type_id, model_type_id);
state
.entity_id_extractors
@ -408,9 +493,15 @@ impl Client {
let prev_handler = state.message_handlers.insert(
message_type_id,
Arc::new(move |handle, envelope, cx| {
let model = handle.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
if let Some(client) = client.upgrade() {
let model = handle.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
let handle = if let AnyEntityHandle::Model(handle) = handle {
handle
} else {
unreachable!();
};
handler(model, *envelope, client.clone(), cx).boxed_local()
} else {
async move { Ok(()) }.boxed_local()
@ -432,7 +523,7 @@ impl Client {
+ Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
F: 'static + Future<Output = Result<M::Response>>,
{
self.add_entity_message_handler(move |model, envelope, client, cx| {
self.add_model_message_handler(move |model, envelope, client, cx| {
let receipt = envelope.receipt();
let response = handler(model, envelope, client.clone(), cx);
async move {
@ -561,24 +652,26 @@ impl Client {
.models_by_message_type
.get(&payload_type_id)
.and_then(|model| model.upgrade(&cx))
.map(AnyEntityHandle::Model)
.or_else(|| {
let model_type_id =
*state.model_types_by_message_type.get(&payload_type_id)?;
let entity_type_id =
*state.entity_types_by_message_type.get(&payload_type_id)?;
let entity_id = state
.entity_id_extractors
.get(&message.payload_type_id())
.map(|extract_entity_id| {
(extract_entity_id)(message.as_ref())
})?;
let model = state
.models_by_entity_type_and_remote_id
.get(&(model_type_id, entity_id))?;
if let Some(model) = model.upgrade(&cx) {
Some(model)
let entity = state
.entities_by_type_and_remote_id
.get(&(entity_type_id, entity_id))?;
if let Some(entity) = entity.upgrade(&cx) {
Some(entity)
} else {
state
.models_by_entity_type_and_remote_id
.remove(&(model_type_id, entity_id));
.entities_by_type_and_remote_id
.remove(&(entity_type_id, entity_id));
None
}
});
@ -891,6 +984,15 @@ impl Client {
}
}
impl AnyWeakEntityHandle {
fn upgrade(&self, cx: &AsyncAppContext) -> Option<AnyEntityHandle> {
match self {
AnyWeakEntityHandle::Model(handle) => handle.upgrade(cx).map(AnyEntityHandle::Model),
AnyWeakEntityHandle::View(handle) => handle.upgrade(cx).map(AnyEntityHandle::View),
}
}
}
fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option<Credentials> {
if IMPERSONATE_LOGIN.is_some() {
return None;
@ -994,7 +1096,7 @@ mod tests {
let (done_tx1, mut done_rx1) = smol::channel::unbounded();
let (done_tx2, mut done_rx2) = smol::channel::unbounded();
client.add_entity_message_handler(
client.add_model_message_handler(
move |model: ModelHandle<Model>, _: TypedEnvelope<proto::UnshareProject>, _, cx| {
match model.read_with(&cx, |model, _| model.id) {
1 => done_tx1.try_send(()).unwrap(),