Remove project join requests

This commit is contained in:
Antonio Scandurra 2022-09-30 11:35:50 +02:00
parent b35e8f0164
commit be8990ea78
11 changed files with 284 additions and 1156 deletions

View file

@ -88,11 +88,6 @@ impl<R: RequestMessage> Response<R> {
self.server.peer.respond(self.receipt, payload)?;
Ok(())
}
fn into_receipt(self) -> Receipt<R> {
self.responded.store(true, SeqCst);
self.receipt
}
}
pub struct Server {
@ -160,7 +155,7 @@ impl Server {
.add_request_handler(Server::unregister_project)
.add_request_handler(Server::join_project)
.add_message_handler(Server::leave_project)
.add_message_handler(Server::respond_to_join_project_request)
.add_message_handler(Server::unshare_project)
.add_message_handler(Server::update_project)
.add_message_handler(Server::register_project_activity)
.add_request_handler(Server::update_worktree)
@ -491,21 +486,6 @@ impl Server {
},
)
});
for (_, receipts) in project.join_requests {
for receipt in receipts {
self.peer.respond(
receipt,
proto::JoinProjectResponse {
variant: Some(proto::join_project_response::Variant::Decline(
proto::join_project_response::Decline {
reason: proto::join_project_response::decline::Reason::WentOffline as i32
},
)),
},
)?;
}
}
}
for project_id in removed_connection.guest_project_ids {
@ -519,16 +499,6 @@ impl Server {
},
)
});
if project.guests.is_empty() {
self.peer
.send(
project.host_connection_id,
proto::ProjectUnshared {
project_id: project_id.to_proto(),
},
)
.trace_err();
}
}
}
@ -727,11 +697,9 @@ impl Server {
.await
.user_id_for_connection(request.sender_id)?;
let project_id = self.app_state.db.register_project(user_id).await?;
self.store().await.register_project(
request.sender_id,
project_id,
request.payload.online,
)?;
self.store()
.await
.register_project(request.sender_id, project_id)?;
response.send(proto::RegisterProjectResponse {
project_id: project_id.to_proto(),
@ -746,11 +714,10 @@ impl Server {
response: Response<proto::UnregisterProject>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let (user_id, project) = {
let mut state = self.store().await;
let project = state.unregister_project(project_id, request.sender_id)?;
(state.user_id_for_connection(request.sender_id)?, project)
};
let project = self
.store()
.await
.unregister_project(project_id, request.sender_id)?;
self.app_state.db.unregister_project(project_id).await?;
broadcast(
@ -765,32 +732,27 @@ impl Server {
)
},
);
for (_, receipts) in project.join_requests {
for receipt in receipts {
self.peer.respond(
receipt,
proto::JoinProjectResponse {
variant: Some(proto::join_project_response::Variant::Decline(
proto::join_project_response::Decline {
reason: proto::join_project_response::decline::Reason::Closed
as i32,
},
)),
},
)?;
}
}
// Send out the `UpdateContacts` message before responding to the unregister
// request. This way, when the project's host can keep track of the project's
// remote id until after they've received the `UpdateContacts` message for
// themself.
self.update_user_contacts(user_id).await?;
response.send(proto::Ack {})?;
Ok(())
}
async fn unshare_project(
self: Arc<Server>,
message: TypedEnvelope<proto::UnshareProject>,
) -> Result<()> {
let project_id = ProjectId::from_proto(message.payload.project_id);
let project = self
.store()
.await
.unshare_project(project_id, message.sender_id)?;
broadcast(message.sender_id, project.guest_connection_ids, |conn_id| {
self.peer.send(conn_id, message.payload.clone())
});
Ok(())
}
async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
let contacts = self.app_state.db.get_contacts(user_id).await?;
let store = self.store().await;
@ -849,167 +811,93 @@ impl Server {
return Err(anyhow!("no such project"))?;
}
self.store().await.request_join_project(
guest_user_id,
project_id,
response.into_receipt(),
)?;
self.peer.send(
host_connection_id,
proto::RequestJoinProject {
project_id: project_id.to_proto(),
requester_id: guest_user_id.to_proto(),
},
)?;
Ok(())
}
let mut store = self.store().await;
let (project, replica_id) = store.join_project(request.sender_id, project_id)?;
let peer_count = project.guests.len();
let mut collaborators = Vec::with_capacity(peer_count);
collaborators.push(proto::Collaborator {
peer_id: project.host_connection_id.0,
replica_id: 0,
user_id: project.host.user_id.to_proto(),
});
let worktrees = project
.worktrees
.iter()
.map(|(id, worktree)| proto::WorktreeMetadata {
id: *id,
root_name: worktree.root_name.clone(),
visible: worktree.visible,
})
.collect::<Vec<_>>();
async fn respond_to_join_project_request(
self: Arc<Server>,
request: TypedEnvelope<proto::RespondToJoinProjectRequest>,
) -> Result<()> {
let host_user_id;
{
let mut state = self.store().await;
let project_id = ProjectId::from_proto(request.payload.project_id);
let project = state.project(project_id)?;
if project.host_connection_id != request.sender_id {
Err(anyhow!("no such connection"))?;
}
host_user_id = project.host.user_id;
let guest_user_id = UserId::from_proto(request.payload.requester_id);
if !request.payload.allow {
let receipts = state
.deny_join_project_request(request.sender_id, guest_user_id, project_id)
.ok_or_else(|| anyhow!("no such request"))?;
for receipt in receipts {
self.peer.respond(
receipt,
proto::JoinProjectResponse {
variant: Some(proto::join_project_response::Variant::Decline(
proto::join_project_response::Decline {
reason: proto::join_project_response::decline::Reason::Declined
as i32,
},
)),
},
)?;
}
return Ok(());
}
let (receipts_with_replica_ids, project) = state
.accept_join_project_request(request.sender_id, guest_user_id, project_id)
.ok_or_else(|| anyhow!("no such request"))?;
let peer_count = project.guests.len();
let mut collaborators = Vec::with_capacity(peer_count);
collaborators.push(proto::Collaborator {
peer_id: project.host_connection_id.0,
replica_id: 0,
user_id: project.host.user_id.to_proto(),
});
let worktrees = project
.worktrees
.iter()
.map(|(id, worktree)| proto::WorktreeMetadata {
id: *id,
root_name: worktree.root_name.clone(),
visible: worktree.visible,
})
.collect::<Vec<_>>();
// Add all guests other than the requesting user's own connections as collaborators
for (guest_conn_id, guest) in &project.guests {
if receipts_with_replica_ids
.iter()
.all(|(receipt, _)| receipt.sender_id != *guest_conn_id)
{
collaborators.push(proto::Collaborator {
peer_id: guest_conn_id.0,
replica_id: guest.replica_id as u32,
user_id: guest.user_id.to_proto(),
});
}
}
for conn_id in project.connection_ids() {
for (receipt, replica_id) in &receipts_with_replica_ids {
if conn_id != receipt.sender_id {
self.peer.send(
conn_id,
proto::AddProjectCollaborator {
project_id: project_id.to_proto(),
collaborator: Some(proto::Collaborator {
peer_id: receipt.sender_id.0,
replica_id: *replica_id as u32,
user_id: guest_user_id.to_proto(),
}),
},
)?;
}
}
}
// First, we send the metadata associated with each worktree.
for (receipt, replica_id) in &receipts_with_replica_ids {
self.peer.respond(
*receipt,
proto::JoinProjectResponse {
variant: Some(proto::join_project_response::Variant::Accept(
proto::join_project_response::Accept {
worktrees: worktrees.clone(),
replica_id: *replica_id as u32,
collaborators: collaborators.clone(),
language_servers: project.language_servers.clone(),
},
)),
},
)?;
}
for (worktree_id, worktree) in &project.worktrees {
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
// Stream this worktree's entries.
let message = proto::UpdateWorktree {
project_id: project_id.to_proto(),
worktree_id: *worktree_id,
root_name: worktree.root_name.clone(),
updated_entries: worktree.entries.values().cloned().collect(),
removed_entries: Default::default(),
scan_id: worktree.scan_id,
is_last_update: worktree.is_complete,
};
for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
for (receipt, _) in &receipts_with_replica_ids {
self.peer.send(receipt.sender_id, update.clone())?;
}
}
// Stream this worktree's diagnostics.
for summary in worktree.diagnostic_summaries.values() {
for (receipt, _) in &receipts_with_replica_ids {
self.peer.send(
receipt.sender_id,
proto::UpdateDiagnosticSummary {
project_id: project_id.to_proto(),
worktree_id: *worktree_id,
summary: Some(summary.clone()),
},
)?;
}
}
// Add all guests other than the requesting user's own connections as collaborators
for (guest_conn_id, guest) in &project.guests {
if request.sender_id != *guest_conn_id {
collaborators.push(proto::Collaborator {
peer_id: guest_conn_id.0,
replica_id: guest.replica_id as u32,
user_id: guest.user_id.to_proto(),
});
}
}
for conn_id in project.connection_ids() {
if conn_id != request.sender_id {
self.peer.send(
conn_id,
proto::AddProjectCollaborator {
project_id: project_id.to_proto(),
collaborator: Some(proto::Collaborator {
peer_id: request.sender_id.0,
replica_id: replica_id as u32,
user_id: guest_user_id.to_proto(),
}),
},
)?;
}
}
// First, we send the metadata associated with each worktree.
response.send(proto::JoinProjectResponse {
worktrees: worktrees.clone(),
replica_id: replica_id as u32,
collaborators: collaborators.clone(),
language_servers: project.language_servers.clone(),
})?;
for (worktree_id, worktree) in &project.worktrees {
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
// Stream this worktree's entries.
let message = proto::UpdateWorktree {
project_id: project_id.to_proto(),
worktree_id: *worktree_id,
root_name: worktree.root_name.clone(),
updated_entries: worktree.entries.values().cloned().collect(),
removed_entries: Default::default(),
scan_id: worktree.scan_id,
is_last_update: worktree.is_complete,
};
for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
self.peer.send(request.sender_id, update.clone())?;
}
// Stream this worktree's diagnostics.
for summary in worktree.diagnostic_summaries.values() {
self.peer.send(
request.sender_id,
proto::UpdateDiagnosticSummary {
project_id: project_id.to_proto(),
worktree_id: *worktree_id,
summary: Some(summary.clone()),
},
)?;
}
}
self.update_user_contacts(host_user_id).await?;
Ok(())
}
@ -1041,27 +929,8 @@ impl Server {
)
});
}
if let Some(requester_id) = project.cancel_request {
self.peer.send(
project.host_connection_id,
proto::JoinProjectRequestCancelled {
project_id: project_id.to_proto(),
requester_id: requester_id.to_proto(),
},
)?;
}
if project.unshare {
self.peer.send(
project.host_connection_id,
proto::ProjectUnshared {
project_id: project_id.to_proto(),
},
)?;
}
}
self.update_user_contacts(project.host_user_id).await?;
Ok(())
}
@ -1070,61 +939,18 @@ impl Server {
request: TypedEnvelope<proto::UpdateProject>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let user_id;
{
let mut state = self.store().await;
user_id = state.user_id_for_connection(request.sender_id)?;
let guest_connection_ids = state
.read_project(project_id, request.sender_id)?
.guest_connection_ids();
let unshared_project = state.update_project(
project_id,
&request.payload.worktrees,
request.payload.online,
request.sender_id,
)?;
if let Some(unshared_project) = unshared_project {
broadcast(
request.sender_id,
unshared_project.guests.keys().copied(),
|conn_id| {
self.peer.send(
conn_id,
proto::UnregisterProject {
project_id: project_id.to_proto(),
},
)
},
);
for (_, receipts) in unshared_project.pending_join_requests {
for receipt in receipts {
self.peer.respond(
receipt,
proto::JoinProjectResponse {
variant: Some(proto::join_project_response::Variant::Decline(
proto::join_project_response::Decline {
reason:
proto::join_project_response::decline::Reason::Closed
as i32,
},
)),
},
)?;
}
}
} else {
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
self.peer.forward_send(
request.sender_id,
connection_id,
request.payload.clone(),
)
});
}
state.update_project(project_id, &request.payload.worktrees, request.sender_id)?;
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
});
};
self.update_user_contacts(user_id).await?;
Ok(())
}
@ -1146,32 +972,21 @@ impl Server {
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let worktree_id = request.payload.worktree_id;
let (connection_ids, metadata_changed) = {
let mut store = self.store().await;
let (connection_ids, metadata_changed) = store.update_worktree(
request.sender_id,
project_id,
worktree_id,
&request.payload.root_name,
&request.payload.removed_entries,
&request.payload.updated_entries,
request.payload.scan_id,
request.payload.is_last_update,
)?;
(connection_ids, metadata_changed)
};
let connection_ids = self.store().await.update_worktree(
request.sender_id,
project_id,
worktree_id,
&request.payload.root_name,
&request.payload.removed_entries,
&request.payload.updated_entries,
request.payload.scan_id,
request.payload.is_last_update,
)?;
broadcast(request.sender_id, connection_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
});
if metadata_changed {
let user_id = self
.store()
.await
.user_id_for_connection(request.sender_id)?;
self.update_user_contacts(user_id).await?;
}
response.send(proto::Ack {})?;
Ok(())
}