Rip out project registration and use sharing/unsharing instead

This commit is contained in:
Antonio Scandurra 2022-09-30 12:23:57 +02:00
parent be8990ea78
commit 074b8f18d1
7 changed files with 333 additions and 525 deletions

View file

@ -35,7 +35,6 @@ use lsp::{
};
use lsp_command::*;
use parking_lot::Mutex;
use postage::stream::Stream;
use postage::watch;
use rand::prelude::*;
use search::SearchQuery;
@ -153,10 +152,8 @@ enum WorktreeHandle {
enum ProjectClientState {
Local {
is_shared: bool,
remote_id_tx: watch::Sender<Option<u64>>,
remote_id_rx: watch::Receiver<Option<u64>>,
_maintain_remote_id: Task<Option<()>>,
remote_id: Option<u64>,
_detect_unshare: Task<Option<()>>,
},
Remote {
sharing_has_stopped: bool,
@ -382,7 +379,6 @@ impl Project {
client.add_model_message_handler(Self::handle_update_language_server);
client.add_model_message_handler(Self::handle_remove_collaborator);
client.add_model_message_handler(Self::handle_update_project);
client.add_model_message_handler(Self::handle_unregister_project);
client.add_model_message_handler(Self::handle_unshare_project);
client.add_model_message_handler(Self::handle_create_buffer_for_peer);
client.add_model_message_handler(Self::handle_update_buffer_file);
@ -423,24 +419,19 @@ impl Project {
cx: &mut MutableAppContext,
) -> ModelHandle<Self> {
cx.add_model(|cx: &mut ModelContext<Self>| {
let (remote_id_tx, remote_id_rx) = watch::channel();
let _maintain_remote_id = cx.spawn_weak({
let mut status_rx = client.clone().status();
move |this, mut cx| async move {
while let Some(status) = status_rx.recv().await {
let this = this.upgrade(&cx)?;
if status.is_connected() {
this.update(&mut cx, |this, cx| this.register(cx))
.await
.log_err()?;
} else {
this.update(&mut cx, |this, cx| this.unregister(cx))
.await
.log_err();
let mut status = client.status();
let _detect_unshare = cx.spawn_weak(move |this, mut cx| {
async move {
let is_connected = status.next().await.map_or(false, |s| s.is_connected());
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || status.next().await.is_some() {
if let Some(this) = this.upgrade(&cx) {
let _ = this.update(&mut cx, |this, cx| this.unshare(cx));
}
}
None
Ok(())
}
.log_err()
});
let handle = cx.weak_handle();
@ -456,10 +447,8 @@ impl Project {
loading_local_worktrees: Default::default(),
buffer_snapshots: Default::default(),
client_state: ProjectClientState::Local {
is_shared: false,
remote_id_tx,
remote_id_rx,
_maintain_remote_id,
remote_id: None,
_detect_unshare,
},
opened_buffer: watch::channel(),
client_subscriptions: Vec::new(),
@ -762,113 +751,9 @@ impl Project {
&self.fs
}
fn unregister(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
self.unshare(cx).log_err();
if let ProjectClientState::Local { remote_id_rx, .. } = &mut self.client_state {
if let Some(remote_id) = *remote_id_rx.borrow() {
let request = self.client.request(proto::UnregisterProject {
project_id: remote_id,
});
return cx.spawn(|this, mut cx| async move {
let response = request.await;
// Unregistering the project causes the server to send out a
// contact update removing this project from the host's list
// of online projects. Wait until this contact update has been
// processed before clearing out this project's remote id, so
// that there is no moment where this project appears in the
// contact metadata and *also* has no remote id.
this.update(&mut cx, |this, cx| {
this.user_store()
.update(cx, |store, _| store.contact_updates_done())
})
.await;
this.update(&mut cx, |this, cx| {
if let ProjectClientState::Local { remote_id_tx, .. } =
&mut this.client_state
{
*remote_id_tx.borrow_mut() = None;
}
this.client_subscriptions.clear();
this.metadata_changed(cx);
});
response.map(drop)
});
}
}
Task::ready(Ok(()))
}
fn register(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if let ProjectClientState::Local { remote_id_rx, .. } = &self.client_state {
if remote_id_rx.borrow().is_some() {
return Task::ready(Ok(()));
}
let response = self.client.request(proto::RegisterProject {});
cx.spawn(|this, mut cx| async move {
let remote_id = response.await?.project_id;
this.update(&mut cx, |this, cx| {
if let ProjectClientState::Local { remote_id_tx, .. } = &mut this.client_state {
*remote_id_tx.borrow_mut() = Some(remote_id);
}
this.metadata_changed(cx);
cx.emit(Event::RemoteIdChanged(Some(remote_id)));
this.client_subscriptions
.push(this.client.add_model_for_remote_entity(remote_id, cx));
Ok(())
})
})
} else {
Task::ready(Err(anyhow!("can't register a remote project")))
}
}
pub fn remote_id(&self) -> Option<u64> {
match &self.client_state {
ProjectClientState::Local { remote_id_rx, .. } => *remote_id_rx.borrow(),
ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
}
}
pub fn next_remote_id(&self) -> impl Future<Output = u64> {
let mut id = None;
let mut watch = None;
match &self.client_state {
ProjectClientState::Local { remote_id_rx, .. } => watch = Some(remote_id_rx.clone()),
ProjectClientState::Remote { remote_id, .. } => id = Some(*remote_id),
}
async move {
if let Some(id) = id {
return id;
}
let mut watch = watch.unwrap();
loop {
let id = *watch.borrow();
if let Some(id) = id {
return id;
}
watch.next().await;
}
}
}
pub fn shared_remote_id(&self) -> Option<u64> {
match &self.client_state {
ProjectClientState::Local {
remote_id_rx,
is_shared,
..
} => {
if *is_shared {
*remote_id_rx.borrow()
} else {
None
}
}
ProjectClientState::Local { remote_id, .. } => *remote_id,
ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
}
}
@ -881,7 +766,7 @@ impl Project {
}
fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
if let ProjectClientState::Local { remote_id_rx, .. } = &self.client_state {
if let ProjectClientState::Local { remote_id, .. } = &self.client_state {
// Broadcast worktrees only if the project is online.
let worktrees = self
.worktrees
@ -892,7 +777,7 @@ impl Project {
.map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto())
})
.collect();
if let Some(project_id) = *remote_id_rx.borrow() {
if let Some(project_id) = *remote_id {
self.client
.send(proto::UpdateProject {
project_id,
@ -1164,113 +1049,105 @@ impl Project {
}
}
pub fn share(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
let project_id;
if let ProjectClientState::Local {
remote_id_rx,
is_shared,
..
} = &mut self.client_state
{
if *is_shared {
return Task::ready(Ok(()));
}
*is_shared = true;
if let Some(id) = *remote_id_rx.borrow() {
project_id = id;
} else {
return Task::ready(Err(anyhow!("project hasn't been registered")));
pub fn share(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<u64>> {
if let ProjectClientState::Local { remote_id, .. } = &mut self.client_state {
if let Some(remote_id) = remote_id {
return Task::ready(Ok(*remote_id));
}
let response = self.client.request(proto::ShareProject {});
cx.spawn(|this, mut cx| async move {
let project_id = response.await?.project_id;
let mut worktree_share_tasks = Vec::new();
this.update(&mut cx, |this, cx| {
if let ProjectClientState::Local { remote_id, .. } = &mut this.client_state {
*remote_id = Some(project_id);
}
for open_buffer in this.opened_buffers.values_mut() {
match open_buffer {
OpenBuffer::Strong(_) => {}
OpenBuffer::Weak(buffer) => {
if let Some(buffer) = buffer.upgrade(cx) {
*open_buffer = OpenBuffer::Strong(buffer);
}
}
OpenBuffer::Operations(_) => unreachable!(),
}
}
for worktree_handle in this.worktrees.iter_mut() {
match worktree_handle {
WorktreeHandle::Strong(_) => {}
WorktreeHandle::Weak(worktree) => {
if let Some(worktree) = worktree.upgrade(cx) {
*worktree_handle = WorktreeHandle::Strong(worktree);
}
}
}
}
for worktree in this.worktrees(cx).collect::<Vec<_>>() {
worktree.update(cx, |worktree, cx| {
let worktree = worktree.as_local_mut().unwrap();
worktree_share_tasks.push(worktree.share(project_id, cx));
});
}
for (server_id, status) in &this.language_server_statuses {
this.client
.send(proto::StartLanguageServer {
project_id,
server: Some(proto::LanguageServer {
id: *server_id as u64,
name: status.name.clone(),
}),
})
.log_err();
}
this.client_subscriptions
.push(this.client.add_model_for_remote_entity(project_id, cx));
this.metadata_changed(cx);
cx.emit(Event::RemoteIdChanged(Some(project_id)));
cx.notify();
});
futures::future::try_join_all(worktree_share_tasks).await?;
Ok(project_id)
})
} else {
return Task::ready(Err(anyhow!("can't share a remote project")));
};
for open_buffer in self.opened_buffers.values_mut() {
match open_buffer {
OpenBuffer::Strong(_) => {}
OpenBuffer::Weak(buffer) => {
if let Some(buffer) = buffer.upgrade(cx) {
*open_buffer = OpenBuffer::Strong(buffer);
}
}
OpenBuffer::Operations(_) => unreachable!(),
}
Task::ready(Err(anyhow!("can't share a remote project")))
}
for worktree_handle in self.worktrees.iter_mut() {
match worktree_handle {
WorktreeHandle::Strong(_) => {}
WorktreeHandle::Weak(worktree) => {
if let Some(worktree) = worktree.upgrade(cx) {
*worktree_handle = WorktreeHandle::Strong(worktree);
}
}
}
}
let mut tasks = Vec::new();
for worktree in self.worktrees(cx).collect::<Vec<_>>() {
worktree.update(cx, |worktree, cx| {
let worktree = worktree.as_local_mut().unwrap();
tasks.push(worktree.share(project_id, cx));
});
}
for (server_id, status) in &self.language_server_statuses {
self.client
.send(proto::StartLanguageServer {
project_id,
server: Some(proto::LanguageServer {
id: *server_id as u64,
name: status.name.clone(),
}),
})
.log_err();
}
cx.spawn(|this, mut cx| async move {
for task in tasks {
task.await?;
}
this.update(&mut cx, |_, cx| cx.notify());
Ok(())
})
}
pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
if let ProjectClientState::Local {
is_shared,
remote_id_rx,
..
} = &mut self.client_state
{
if !*is_shared {
return Ok(());
}
if let ProjectClientState::Local { remote_id, .. } = &mut self.client_state {
if let Some(project_id) = remote_id.take() {
self.collaborators.clear();
self.shared_buffers.clear();
self.client_subscriptions.clear();
*is_shared = false;
self.collaborators.clear();
self.shared_buffers.clear();
for worktree_handle in self.worktrees.iter_mut() {
if let WorktreeHandle::Strong(worktree) = worktree_handle {
let is_visible = worktree.update(cx, |worktree, _| {
worktree.as_local_mut().unwrap().unshare();
worktree.is_visible()
});
if !is_visible {
*worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
for worktree_handle in self.worktrees.iter_mut() {
if let WorktreeHandle::Strong(worktree) = worktree_handle {
let is_visible = worktree.update(cx, |worktree, _| {
worktree.as_local_mut().unwrap().unshare();
worktree.is_visible()
});
if !is_visible {
*worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
}
}
}
}
for open_buffer in self.opened_buffers.values_mut() {
if let OpenBuffer::Strong(buffer) = open_buffer {
*open_buffer = OpenBuffer::Weak(buffer.downgrade());
for open_buffer in self.opened_buffers.values_mut() {
if let OpenBuffer::Strong(buffer) = open_buffer {
*open_buffer = OpenBuffer::Weak(buffer.downgrade());
}
}
}
cx.notify();
if let Some(project_id) = *remote_id_rx.borrow() {
self.metadata_changed(cx);
cx.notify();
self.client.send(proto::UnshareProject { project_id })?;
}
@ -1750,7 +1627,7 @@ impl Project {
) -> Option<()> {
match event {
BufferEvent::Operation(operation) => {
if let Some(project_id) = self.shared_remote_id() {
if let Some(project_id) = self.remote_id() {
let request = self.client.request(proto::UpdateBuffer {
project_id,
buffer_id: buffer.read(cx).remote_id(),
@ -2155,7 +2032,7 @@ impl Project {
)
.ok();
if let Some(project_id) = this.shared_remote_id() {
if let Some(project_id) = this.remote_id() {
this.client
.send(proto::StartLanguageServer {
project_id,
@ -2562,7 +2439,7 @@ impl Project {
language_server_id: usize,
event: proto::update_language_server::Variant,
) {
if let Some(project_id) = self.shared_remote_id() {
if let Some(project_id) = self.remote_id() {
self.client
.send(proto::UpdateLanguageServer {
project_id,
@ -4273,7 +4150,7 @@ impl Project {
pub fn is_shared(&self) -> bool {
match &self.client_state {
ProjectClientState::Local { is_shared, .. } => *is_shared,
ProjectClientState::Local { remote_id, .. } => remote_id.is_some(),
ProjectClientState::Remote { .. } => false,
}
}
@ -4310,7 +4187,7 @@ impl Project {
let project_id = project.update(&mut cx, |project, cx| {
project.add_worktree(&worktree, cx);
project.shared_remote_id()
project.remote_id()
});
if let Some(project_id) = project_id {
@ -4439,7 +4316,7 @@ impl Project {
renamed_buffers.push((cx.handle(), old_path));
}
if let Some(project_id) = self.shared_remote_id() {
if let Some(project_id) = self.remote_id() {
self.client
.send(proto::UpdateBufferFile {
project_id,
@ -4552,16 +4429,6 @@ impl Project {
// RPC message handlers
async fn handle_unregister_project(
this: ModelHandle<Self>,
_: TypedEnvelope<proto::UnregisterProject>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
this.update(&mut cx, |this, cx| this.disconnected_from_host(cx));
Ok(())
}
async fn handle_unshare_project(
this: ModelHandle<Self>,
_: TypedEnvelope<proto::UnshareProject>,
@ -5987,10 +5854,10 @@ impl Entity for Project {
self.project_store.update(cx, ProjectStore::prune_projects);
match &self.client_state {
ProjectClientState::Local { remote_id_rx, .. } => {
if let Some(project_id) = *remote_id_rx.borrow() {
ProjectClientState::Local { remote_id, .. } => {
if let Some(project_id) = *remote_id {
self.client
.send(proto::UnregisterProject { project_id })
.send(proto::UnshareProject { project_id })
.log_err();
}
}