WIP - Register client RPC handlers on app startup

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Max Brunsfeld 2022-02-15 18:03:06 -08:00
parent 1ca1595490
commit 71abea728e
6 changed files with 268 additions and 269 deletions

View file

@ -12,7 +12,10 @@ use async_tungstenite::tungstenite::{
http::{Request, StatusCode},
};
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use gpui::{action, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
use gpui::{
action, AnyModelHandle, AnyWeakModelHandle, AsyncAppContext, Entity, ModelHandle,
MutableAppContext, Task,
};
use http::HttpClient;
use lazy_static::lazy_static;
use parking_lot::RwLock;
@ -20,7 +23,7 @@ use postage::watch;
use rand::prelude::*;
use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage};
use std::{
any::{type_name, TypeId},
any::TypeId,
collections::HashMap,
convert::TryFrom,
fmt::Write as _,
@ -124,19 +127,29 @@ pub enum Status {
ReconnectionError { next_reconnection: Instant },
}
type ModelHandler = Box<
dyn Send
+ Sync
+ FnMut(Box<dyn AnyTypedEnvelope>, &AsyncAppContext) -> LocalBoxFuture<'static, Result<()>>,
>;
struct ClientState {
credentials: Option<Credentials>,
status: (watch::Sender<Status>, watch::Receiver<Status>),
entity_id_extractors: HashMap<TypeId, Box<dyn Send + Sync + Fn(&dyn AnyTypedEnvelope) -> u64>>,
model_handlers: HashMap<(TypeId, Option<u64>), Option<ModelHandler>>,
_maintain_connection: Option<Task<()>>,
heartbeat_interval: Duration,
pending_messages: HashMap<(TypeId, u64), Vec<Box<dyn AnyTypedEnvelope>>>,
models_by_entity_type_and_remote_id: HashMap<(TypeId, u64), AnyWeakModelHandle>,
models_by_message_type: HashMap<TypeId, AnyModelHandle>,
model_types_by_message_type: HashMap<TypeId, TypeId>,
message_handlers: HashMap<
TypeId,
Box<
dyn Send
+ Sync
+ Fn(
AnyModelHandle,
Box<dyn AnyTypedEnvelope>,
AsyncAppContext,
) -> LocalBoxFuture<'static, Result<()>>,
>,
>,
}
#[derive(Clone, Debug)]
@ -151,23 +164,27 @@ impl Default for ClientState {
credentials: None,
status: watch::channel_with(Status::SignedOut),
entity_id_extractors: Default::default(),
model_handlers: Default::default(),
_maintain_connection: None,
heartbeat_interval: Duration::from_secs(5),
models_by_message_type: Default::default(),
models_by_entity_type_and_remote_id: Default::default(),
model_types_by_message_type: Default::default(),
pending_messages: Default::default(),
message_handlers: Default::default(),
}
}
}
pub struct Subscription {
client: Weak<Client>,
id: (TypeId, Option<u64>),
id: (TypeId, u64),
}
impl Drop for Subscription {
fn drop(&mut self) {
if let Some(client) = self.client.upgrade() {
let mut state = client.state.write();
let _ = state.model_handlers.remove(&self.id).unwrap();
let _ = state.models_by_entity_type_and_remote_id.remove(&self.id);
}
}
}
@ -266,125 +283,108 @@ impl Client {
}
}
pub fn add_message_handler<T, M, F, Fut>(
pub fn add_model_for_remote_entity<T: Entity>(
self: &Arc<Self>,
cx: &mut ModelContext<M>,
mut handler: F,
) -> Subscription
handle: ModelHandle<T>,
remote_id: u64,
) -> Subscription {
let mut state = self.state.write();
let id = (TypeId::of::<T>(), remote_id);
state
.models_by_entity_type_and_remote_id
.insert(id, AnyModelHandle::from(handle).downgrade());
Subscription {
client: Arc::downgrade(self),
id,
}
}
pub fn add_message_handler<M, E, H, F>(self: &Arc<Self>, model: ModelHandle<E>, handler: H)
where
T: EnvelopedMessage,
M: Entity,
F: 'static
M: EnvelopedMessage,
E: Entity,
H: 'static
+ Send
+ Sync
+ FnMut(ModelHandle<M>, TypedEnvelope<T>, Arc<Self>, AsyncAppContext) -> Fut,
Fut: 'static + Future<Output = Result<()>>,
+ Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
F: 'static + Future<Output = Result<()>>,
{
let subscription_id = (TypeId::of::<T>(), None);
let message_type_id = TypeId::of::<M>();
let client = self.clone();
let mut state = self.state.write();
let model = cx.weak_handle();
let prev_handler = state.model_handlers.insert(
subscription_id,
Some(Box::new(move |envelope, cx| {
if let Some(model) = model.upgrade(cx) {
let envelope = envelope.into_any().downcast::<TypedEnvelope<T>>().unwrap();
handler(model, *envelope, client.clone(), cx.clone()).boxed_local()
} else {
async move {
Err(anyhow!(
"received message for {:?} but model was dropped",
type_name::<M>()
))
}
.boxed_local()
}
})),
state
.models_by_message_type
.insert(message_type_id, model.into());
let prev_handler = state.message_handlers.insert(
message_type_id,
Box::new(move |handle, envelope, cx| {
let model = handle.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
handler(model, *envelope, client.clone(), cx).boxed_local()
}),
);
if prev_handler.is_some() {
panic!("registered handler for the same message twice");
}
Subscription {
client: Arc::downgrade(self),
id: subscription_id,
}
}
pub fn add_entity_message_handler<T, M, F, Fut>(
self: &Arc<Self>,
remote_id: u64,
cx: &mut ModelContext<M>,
mut handler: F,
) -> Subscription
pub fn add_entity_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
where
T: EntityMessage,
M: Entity,
F: 'static
M: EntityMessage,
E: Entity,
H: 'static
+ Send
+ Sync
+ FnMut(ModelHandle<M>, TypedEnvelope<T>, Arc<Self>, AsyncAppContext) -> Fut,
Fut: 'static + Future<Output = Result<()>>,
+ Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
F: 'static + Future<Output = Result<()>>,
{
let subscription_id = (TypeId::of::<T>(), Some(remote_id));
let model_type_id = TypeId::of::<E>();
let message_type_id = TypeId::of::<M>();
let client = self.clone();
let mut state = self.state.write();
let model = cx.weak_handle();
state
.model_types_by_message_type
.insert(message_type_id, model_type_id);
state
.entity_id_extractors
.entry(subscription_id.0)
.entry(message_type_id)
.or_insert_with(|| {
Box::new(|envelope| {
let envelope = envelope
.as_any()
.downcast_ref::<TypedEnvelope<T>>()
.downcast_ref::<TypedEnvelope<M>>()
.unwrap();
envelope.payload.remote_entity_id()
})
});
let prev_handler = state.model_handlers.insert(
subscription_id,
Some(Box::new(move |envelope, cx| {
if let Some(model) = model.upgrade(cx) {
let envelope = envelope.into_any().downcast::<TypedEnvelope<T>>().unwrap();
handler(model, *envelope, client.clone(), cx.clone()).boxed_local()
} else {
async move {
Err(anyhow!(
"received message for {:?} but model was dropped",
type_name::<M>()
))
}
.boxed_local()
}
})),
let prev_handler = state.message_handlers.insert(
message_type_id,
Box::new(move |handle, envelope, cx| {
let model = handle.downcast::<E>().unwrap();
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
handler(model, *envelope, client.clone(), cx).boxed_local()
}),
);
if prev_handler.is_some() {
panic!("registered a handler for the same entity twice")
}
Subscription {
client: Arc::downgrade(self),
id: subscription_id,
panic!("registered handler for the same message twice");
}
}
pub fn add_entity_request_handler<T, M, F, Fut>(
self: &Arc<Self>,
remote_id: u64,
cx: &mut ModelContext<M>,
mut handler: F,
) -> Subscription
pub fn add_entity_request_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
where
T: EntityMessage + RequestMessage,
M: Entity,
F: 'static
M: EntityMessage + RequestMessage,
E: Entity,
H: 'static
+ Send
+ Sync
+ FnMut(ModelHandle<M>, TypedEnvelope<T>, Arc<Self>, AsyncAppContext) -> Fut,
Fut: 'static + Future<Output = Result<T::Response>>,
+ Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
F: 'static + Future<Output = Result<M::Response>>,
{
self.add_entity_message_handler(remote_id, cx, move |model, envelope, client, cx| {
self.add_entity_message_handler(move |model, envelope, client, cx| {
let receipt = envelope.receipt();
let response = handler(model, envelope, client.clone(), cx);
async move {
@ -500,26 +500,37 @@ impl Client {
while let Some(message) = incoming.next().await {
let mut state = this.state.write();
let payload_type_id = message.payload_type_id();
let entity_id = if let Some(extract_entity_id) =
state.entity_id_extractors.get(&message.payload_type_id())
{
Some((extract_entity_id)(message.as_ref()))
} else {
None
};
let type_name = message.payload_type_name();
let handler_key = (payload_type_id, entity_id);
if let Some(handler) = state.model_handlers.get_mut(&handler_key) {
let mut handler = handler.take().unwrap();
let model = state.models_by_message_type.get(&payload_type_id).cloned().or_else(|| {
let extract_entity_id = state.entity_id_extractors.get(&message.payload_type_id())?;
let entity_id = (extract_entity_id)(message.as_ref());
let model_type_id = *state.model_types_by_message_type.get(&payload_type_id)?;
// TODO - if we don't have this model yet, then buffer the message
let model = state.models_by_entity_type_and_remote_id.get(&(model_type_id, entity_id))?;
if let Some(model) = model.upgrade(&cx) {
Some(model)
} else {
state.models_by_entity_type_and_remote_id.remove(&(model_type_id, entity_id));
None
}
});
let model = if let Some(model) = model {
model
} else {
log::info!("unhandled message {}", type_name);
continue;
};
if let Some(handler) = state.message_handlers.remove(&payload_type_id) {
drop(state); // Avoid deadlocks if the handler interacts with rpc::Client
let future = (handler)(message, &cx);
let future = handler(model, message, cx.clone());
{
let mut state = this.state.write();
if state.model_handlers.contains_key(&handler_key) {
state.model_handlers.insert(handler_key, Some(handler));
}
state.message_handlers.insert(payload_type_id, handler);
}
let client_id = this.id;
@ -915,106 +926,107 @@ mod tests {
assert_eq!(decode_worktree_url("not://the-right-format"), None);
}
#[gpui::test]
async fn test_subscribing_to_entity(mut cx: TestAppContext) {
cx.foreground().forbid_parking();
// #[gpui::test]
// async fn test_subscribing_to_entity(mut cx: TestAppContext) {
// cx.foreground().forbid_parking();
let user_id = 5;
let mut client = Client::new(FakeHttpClient::with_404_response());
let server = FakeServer::for_client(user_id, &mut client, &cx).await;
// let user_id = 5;
// let mut client = Client::new(FakeHttpClient::with_404_response());
// let server = FakeServer::for_client(user_id, &mut client, &cx).await;
let model = cx.add_model(|_| Model { subscription: None });
let (mut done_tx1, mut done_rx1) = postage::oneshot::channel();
let (mut done_tx2, mut done_rx2) = postage::oneshot::channel();
let _subscription1 = model.update(&mut cx, |_, cx| {
client.add_entity_message_handler(
1,
cx,
move |_, _: TypedEnvelope<proto::UnshareProject>, _, _| {
postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap();
async { Ok(()) }
},
)
});
let _subscription2 = model.update(&mut cx, |_, cx| {
client.add_entity_message_handler(
2,
cx,
move |_, _: TypedEnvelope<proto::UnshareProject>, _, _| {
postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap();
async { Ok(()) }
},
)
});
// let model = cx.add_model(|_| Model { subscription: None });
// let (mut done_tx1, mut done_rx1) = postage::oneshot::channel();
// let (mut done_tx2, mut done_rx2) = postage::oneshot::channel();
// let _subscription1 = model.update(&mut cx, |_, cx| {
// client.add_entity_message_handler(
// 1,
// cx,
// move |_, _: TypedEnvelope<proto::UnshareProject>, _, _| {
// postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap();
// async { Ok(()) }
// },
// )
// });
// let _subscription2 = model.update(&mut cx, |_, cx| {
// client.add_entity_message_handler(
// 2,
// cx,
// move |_, _: TypedEnvelope<proto::UnshareProject>, _, _| {
// postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap();
// async { Ok(()) }
// },
// )
// });
// Ensure dropping a subscription for the same entity type still allows receiving of
// messages for other entity IDs of the same type.
let subscription3 = model.update(&mut cx, |_, cx| {
client.add_entity_message_handler(
3,
cx,
|_, _: TypedEnvelope<proto::UnshareProject>, _, _| async { Ok(()) },
)
});
drop(subscription3);
// // Ensure dropping a subscription for the same entity type still allows receiving of
// // messages for other entity IDs of the same type.
// let subscription3 = model.update(&mut cx, |_, cx| {
// client.add_entity_message_handler(
// 3,
// cx,
// |_, _: TypedEnvelope<proto::UnshareProject>, _, _| async { Ok(()) },
// )
// });
// drop(subscription3);
server.send(proto::UnshareProject { project_id: 1 });
server.send(proto::UnshareProject { project_id: 2 });
done_rx1.next().await.unwrap();
done_rx2.next().await.unwrap();
}
// server.send(proto::UnshareProject { project_id: 1 });
// server.send(proto::UnshareProject { project_id: 2 });
// done_rx1.next().await.unwrap();
// done_rx2.next().await.unwrap();
// }
#[gpui::test]
async fn test_subscribing_after_dropping_subscription(mut cx: TestAppContext) {
cx.foreground().forbid_parking();
// #[gpui::test]
// async fn test_subscribing_after_dropping_subscription(mut cx: TestAppContext) {
// cx.foreground().forbid_parking();
let user_id = 5;
let mut client = Client::new(FakeHttpClient::with_404_response());
let server = FakeServer::for_client(user_id, &mut client, &cx).await;
// let user_id = 5;
// let mut client = Client::new(FakeHttpClient::with_404_response());
// let server = FakeServer::for_client(user_id, &mut client, &cx).await;
let model = cx.add_model(|_| Model { subscription: None });
let (mut done_tx1, _done_rx1) = postage::oneshot::channel();
let (mut done_tx2, mut done_rx2) = postage::oneshot::channel();
let subscription1 = model.update(&mut cx, |_, cx| {
client.add_message_handler(cx, move |_, _: TypedEnvelope<proto::Ping>, _, _| {
postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap();
async { Ok(()) }
})
});
drop(subscription1);
let _subscription2 = model.update(&mut cx, |_, cx| {
client.add_message_handler(cx, move |_, _: TypedEnvelope<proto::Ping>, _, _| {
postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap();
async { Ok(()) }
})
});
server.send(proto::Ping {});
done_rx2.next().await.unwrap();
}
// let model = cx.add_model(|_| Model { subscription: None });
// let (mut done_tx1, _done_rx1) = postage::oneshot::channel();
// let (mut done_tx2, mut done_rx2) = postage::oneshot::channel();
// let subscription1 = model.update(&mut cx, |_, cx| {
// client.add_message_handler(cx, move |_, _: TypedEnvelope<proto::Ping>, _, _| {
// postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap();
// async { Ok(()) }
// })
// });
// drop(subscription1);
// let _subscription2 = model.update(&mut cx, |_, cx| {
// client.add_message_handler(cx, move |_, _: TypedEnvelope<proto::Ping>, _, _| {
// postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap();
// async { Ok(()) }
// })
// });
// server.send(proto::Ping {});
// done_rx2.next().await.unwrap();
// }
#[gpui::test]
async fn test_dropping_subscription_in_handler(mut cx: TestAppContext) {
cx.foreground().forbid_parking();
// #[gpui::test]
// async fn test_dropping_subscription_in_handler(mut cx: TestAppContext) {
// cx.foreground().forbid_parking();
let user_id = 5;
let mut client = Client::new(FakeHttpClient::with_404_response());
let server = FakeServer::for_client(user_id, &mut client, &cx).await;
// let user_id = 5;
// let mut client = Client::new(FakeHttpClient::with_404_response());
// let server = FakeServer::for_client(user_id, &mut client, &cx).await;
let model = cx.add_model(|_| Model { subscription: None });
let (mut done_tx, mut done_rx) = postage::oneshot::channel();
model.update(&mut cx, |model, cx| {
model.subscription = Some(client.add_message_handler(
cx,
move |model, _: TypedEnvelope<proto::Ping>, _, mut cx| {
model.update(&mut cx, |model, _| model.subscription.take());
postage::sink::Sink::try_send(&mut done_tx, ()).unwrap();
async { Ok(()) }
},
));
});
server.send(proto::Ping {});
done_rx.next().await.unwrap();
}
// let model = cx.add_model(|_| Model { subscription: None });
// let (mut done_tx, mut done_rx) = postage::oneshot::channel();
// client.add_message_handler(
// model.clone(),
// move |model, _: TypedEnvelope<proto::Ping>, _, mut cx| {
// model.update(&mut cx, |model, _| model.subscription.take());
// postage::sink::Sink::try_send(&mut done_tx, ()).unwrap();
// async { Ok(()) }
// },
// );
// model.update(&mut cx, |model, cx| {
// model.subscription = Some();
// });
// server.send(proto::Ping {});
// done_rx.next().await.unwrap();
// }
struct Model {
subscription: Option<Subscription>,