wip
This commit is contained in:
parent
57ffa8201e
commit
dd1a2a9e44
16 changed files with 138 additions and 181 deletions
|
@ -11,7 +11,8 @@ use async_tungstenite::tungstenite::{
|
|||
http::{Request, StatusCode},
|
||||
};
|
||||
use futures::{
|
||||
future::BoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _, TryStreamExt,
|
||||
future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _,
|
||||
TryStreamExt,
|
||||
};
|
||||
use gpui2::{
|
||||
serde_json, AnyModel, AnyWeakModel, AppContext, AsyncAppContext, Model, SemanticVersion, Task,
|
||||
|
@ -233,14 +234,12 @@ struct ClientState {
|
|||
message_handlers: HashMap<
|
||||
TypeId,
|
||||
Arc<
|
||||
dyn Send
|
||||
+ Sync
|
||||
+ Fn(
|
||||
AnyModel,
|
||||
Box<dyn AnyTypedEnvelope>,
|
||||
&Arc<Client>,
|
||||
AsyncAppContext,
|
||||
) -> BoxFuture<'static, Result<()>>,
|
||||
dyn Fn(
|
||||
AnyModel,
|
||||
Box<dyn AnyTypedEnvelope>,
|
||||
&Arc<Client>,
|
||||
AsyncAppContext,
|
||||
) -> LocalBoxFuture<'static, Result<()>>,
|
||||
>,
|
||||
>,
|
||||
}
|
||||
|
@ -310,10 +309,7 @@ pub struct PendingEntitySubscription<T: 'static> {
|
|||
consumed: bool,
|
||||
}
|
||||
|
||||
impl<T> PendingEntitySubscription<T>
|
||||
where
|
||||
T: 'static + Send,
|
||||
{
|
||||
impl<T: 'static> PendingEntitySubscription<T> {
|
||||
pub fn set_model(mut self, model: &Model<T>, cx: &mut AsyncAppContext) -> Subscription {
|
||||
self.consumed = true;
|
||||
let mut state = self.client.state.write();
|
||||
|
@ -341,10 +337,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for PendingEntitySubscription<T>
|
||||
where
|
||||
T: 'static,
|
||||
{
|
||||
impl<T: 'static> Drop for PendingEntitySubscription<T> {
|
||||
fn drop(&mut self) {
|
||||
if !self.consumed {
|
||||
let mut state = self.client.state.write();
|
||||
|
@ -529,7 +522,7 @@ impl Client {
|
|||
remote_id: u64,
|
||||
) -> Result<PendingEntitySubscription<T>>
|
||||
where
|
||||
T: 'static + Send,
|
||||
T: 'static,
|
||||
{
|
||||
let id = (TypeId::of::<T>(), remote_id);
|
||||
|
||||
|
@ -557,9 +550,9 @@ impl Client {
|
|||
) -> Subscription
|
||||
where
|
||||
M: EnvelopedMessage,
|
||||
E: 'static + Send,
|
||||
H: 'static + Send + Sync + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<()>> + Send,
|
||||
E: 'static,
|
||||
H: 'static + Sync + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<()>>,
|
||||
{
|
||||
let message_type_id = TypeId::of::<M>();
|
||||
|
||||
|
@ -573,7 +566,7 @@ impl Client {
|
|||
Arc::new(move |subscriber, envelope, client, cx| {
|
||||
let subscriber = subscriber.downcast::<E>().unwrap();
|
||||
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
|
||||
handler(subscriber, *envelope, client.clone(), cx).boxed()
|
||||
handler(subscriber, *envelope, client.clone(), cx).boxed_local()
|
||||
}),
|
||||
);
|
||||
if prev_handler.is_some() {
|
||||
|
@ -599,9 +592,9 @@ impl Client {
|
|||
) -> Subscription
|
||||
where
|
||||
M: RequestMessage,
|
||||
E: 'static + Send,
|
||||
H: 'static + Send + Sync + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<M::Response>> + Send,
|
||||
E: 'static,
|
||||
H: 'static + Sync + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<M::Response>>,
|
||||
{
|
||||
self.add_message_handler(model, move |handle, envelope, this, cx| {
|
||||
Self::respond_to_request(
|
||||
|
@ -615,9 +608,9 @@ impl Client {
|
|||
pub fn add_model_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
|
||||
where
|
||||
M: EntityMessage,
|
||||
E: 'static + Send,
|
||||
H: 'static + Send + Sync + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<()>> + Send,
|
||||
E: 'static,
|
||||
H: 'static + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<()>>,
|
||||
{
|
||||
self.add_entity_message_handler::<M, E, _, _>(move |subscriber, message, client, cx| {
|
||||
handler(subscriber.downcast::<E>().unwrap(), message, client, cx)
|
||||
|
@ -627,9 +620,9 @@ impl Client {
|
|||
fn add_entity_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
|
||||
where
|
||||
M: EntityMessage,
|
||||
E: 'static + Send,
|
||||
H: 'static + Send + Sync + Fn(AnyModel, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<()>> + Send,
|
||||
E: 'static,
|
||||
H: 'static + Fn(AnyModel, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<()>>,
|
||||
{
|
||||
let model_type_id = TypeId::of::<E>();
|
||||
let message_type_id = TypeId::of::<M>();
|
||||
|
@ -655,7 +648,7 @@ impl Client {
|
|||
message_type_id,
|
||||
Arc::new(move |handle, envelope, client, cx| {
|
||||
let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
|
||||
handler(handle, *envelope, client.clone(), cx).boxed()
|
||||
handler(handle, *envelope, client.clone(), cx).boxed_local()
|
||||
}),
|
||||
);
|
||||
if prev_handler.is_some() {
|
||||
|
@ -666,9 +659,9 @@ impl Client {
|
|||
pub fn add_model_request_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
|
||||
where
|
||||
M: EntityMessage + RequestMessage,
|
||||
E: 'static + Send,
|
||||
H: 'static + Send + Sync + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<M::Response>> + Send,
|
||||
E: 'static,
|
||||
H: 'static + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
|
||||
F: 'static + Future<Output = Result<M::Response>>,
|
||||
{
|
||||
self.add_model_message_handler(move |entity, envelope, client, cx| {
|
||||
Self::respond_to_request::<M, _>(
|
||||
|
@ -705,7 +698,7 @@ impl Client {
|
|||
read_credentials_from_keychain(cx).await.is_some()
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
#[async_recursion(?Send)]
|
||||
pub async fn authenticate_and_connect(
|
||||
self: &Arc<Self>,
|
||||
try_keychain: bool,
|
||||
|
@ -1050,7 +1043,7 @@ impl Client {
|
|||
write!(&mut url, "&impersonate={}", impersonate_login).unwrap();
|
||||
}
|
||||
|
||||
cx.run_on_main(move |cx| cx.open_url(&url))?.await;
|
||||
cx.update(|cx| cx.open_url(&url))?;
|
||||
|
||||
// Receive the HTTP request from the user's browser. Retrieve the user id and encrypted
|
||||
// access token from the query params.
|
||||
|
@ -1101,7 +1094,7 @@ impl Client {
|
|||
let access_token = private_key
|
||||
.decrypt_string(&access_token)
|
||||
.context("failed to decrypt access token")?;
|
||||
cx.run_on_main(|cx| cx.activate(true))?.await;
|
||||
cx.update(|cx| cx.activate(true))?;
|
||||
|
||||
Ok(Credentials {
|
||||
user_id: user_id.parse()?,
|
||||
|
@ -1293,7 +1286,7 @@ impl Client {
|
|||
sender_id,
|
||||
type_name
|
||||
);
|
||||
cx.spawn_on_main(move |_| async move {
|
||||
cx.spawn(move |_| async move {
|
||||
match future.await {
|
||||
Ok(()) => {
|
||||
log::debug!(
|
||||
|
@ -1332,9 +1325,8 @@ async fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option<Credenti
|
|||
}
|
||||
|
||||
let (user_id, access_token) = cx
|
||||
.run_on_main(|cx| cx.read_credentials(&ZED_SERVER_URL).log_err().flatten())
|
||||
.ok()?
|
||||
.await?;
|
||||
.update(|cx| cx.read_credentials(&ZED_SERVER_URL).log_err().flatten())
|
||||
.ok()??;
|
||||
|
||||
Some(Credentials {
|
||||
user_id: user_id.parse().ok()?,
|
||||
|
@ -1346,19 +1338,17 @@ async fn write_credentials_to_keychain(
|
|||
credentials: Credentials,
|
||||
cx: &AsyncAppContext,
|
||||
) -> Result<()> {
|
||||
cx.run_on_main(move |cx| {
|
||||
cx.update(move |cx| {
|
||||
cx.write_credentials(
|
||||
&ZED_SERVER_URL,
|
||||
&credentials.user_id.to_string(),
|
||||
credentials.access_token.as_bytes(),
|
||||
)
|
||||
})?
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_credentials_from_keychain(cx: &AsyncAppContext) -> Result<()> {
|
||||
cx.run_on_main(move |cx| cx.delete_credentials(&ZED_SERVER_URL))?
|
||||
.await
|
||||
cx.update(move |cx| cx.delete_credentials(&ZED_SERVER_URL))?
|
||||
}
|
||||
|
||||
const WORKTREE_URL_PREFIX: &str = "zed://worktrees/";
|
||||
|
@ -1430,7 +1420,7 @@ mod tests {
|
|||
|
||||
// Time out when client tries to connect.
|
||||
client.override_authenticate(move |cx| {
|
||||
cx.executor().spawn(async move {
|
||||
cx.background_executor().spawn(async move {
|
||||
Ok(Credentials {
|
||||
user_id,
|
||||
access_token: "token".into(),
|
||||
|
@ -1438,7 +1428,7 @@ mod tests {
|
|||
})
|
||||
});
|
||||
client.override_establish_connection(|_, cx| {
|
||||
cx.executor().spawn(async move {
|
||||
cx.background_executor().spawn(async move {
|
||||
future::pending::<()>().await;
|
||||
unreachable!()
|
||||
})
|
||||
|
@ -1472,7 +1462,7 @@ mod tests {
|
|||
// Time out when re-establishing the connection.
|
||||
server.allow_connections();
|
||||
client.override_establish_connection(|_, cx| {
|
||||
cx.executor().spawn(async move {
|
||||
cx.background_executor().spawn(async move {
|
||||
future::pending::<()>().await;
|
||||
unreachable!()
|
||||
})
|
||||
|
@ -1504,7 +1494,7 @@ mod tests {
|
|||
move |cx| {
|
||||
let auth_count = auth_count.clone();
|
||||
let dropped_auth_count = dropped_auth_count.clone();
|
||||
cx.executor().spawn(async move {
|
||||
cx.background_executor().spawn(async move {
|
||||
*auth_count.lock() += 1;
|
||||
let _drop = util::defer(move || *dropped_auth_count.lock() += 1);
|
||||
future::pending::<()>().await;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
|
||||
use gpui2::{serde_json, AppContext, AppMetadata, Executor, Task};
|
||||
use gpui2::{serde_json, AppContext, AppMetadata, BackgroundExecutor, Task};
|
||||
use lazy_static::lazy_static;
|
||||
use parking_lot::Mutex;
|
||||
use serde::Serialize;
|
||||
|
@ -14,7 +14,7 @@ use util::{channel::ReleaseChannel, TryFutureExt};
|
|||
|
||||
pub struct Telemetry {
|
||||
http_client: Arc<dyn HttpClient>,
|
||||
executor: Executor,
|
||||
executor: BackgroundExecutor,
|
||||
state: Mutex<TelemetryState>,
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue