Register projects in the database and record worktree extensions

This commit is contained in:
Antonio Scandurra 2022-06-21 10:29:26 +02:00
parent 44160869eb
commit 7acebc4eb8
6 changed files with 312 additions and 140 deletions

View file

@ -2,7 +2,7 @@ mod store;
use crate::{
auth,
db::{self, ChannelId, MessageId, User, UserId},
db::{self, ChannelId, MessageId, ProjectId, User, UserId},
AppState, Result,
};
use anyhow::anyhow;
@ -299,7 +299,7 @@ impl Server {
let executor = executor.clone();
async move {
let mut period_start = OffsetDateTime::now_utc();
let mut active_projects = Vec::<(UserId, u64)>::new();
let mut active_projects = Vec::<(UserId, ProjectId)>::new();
loop {
let sleep = executor.sleep(interval);
sleep.await;
@ -458,8 +458,12 @@ impl Server {
for (project_id, project) in removed_connection.hosted_projects {
broadcast(connection_id, project.guests.keys().copied(), |conn_id| {
self.peer
.send(conn_id, proto::UnregisterProject { project_id })
self.peer.send(
conn_id,
proto::UnregisterProject {
project_id: project_id.to_proto(),
},
)
});
for (_, receipts) in project.join_requests {
@ -484,7 +488,7 @@ impl Server {
self.peer.send(
conn_id,
proto::RemoveProjectCollaborator {
project_id,
project_id: project_id.to_proto(),
peer_id: connection_id.0,
},
)
@ -493,7 +497,9 @@ impl Server {
self.peer
.send(
project.host_connection_id,
proto::ProjectUnshared { project_id },
proto::ProjectUnshared {
project_id: project_id.to_proto(),
},
)
.trace_err();
}
@ -567,14 +573,18 @@ impl Server {
request: TypedEnvelope<proto::RegisterProject>,
response: Response<proto::RegisterProject>,
) -> Result<()> {
let project_id;
{
let mut state = self.store_mut().await;
let user_id = state.user_id_for_connection(request.sender_id)?;
project_id = state.register_project(request.sender_id, user_id);
};
let user_id = self
.store()
.await
.user_id_for_connection(request.sender_id)?;
let project_id = self.app_state.db.register_project(user_id).await?;
self.store_mut()
.await
.register_project(request.sender_id, project_id)?;
response.send(proto::RegisterProjectResponse { project_id })?;
response.send(proto::RegisterProjectResponse {
project_id: project_id.to_proto(),
})?;
Ok(())
}
@ -586,8 +596,10 @@ impl Server {
) -> Result<()> {
let (user_id, project) = {
let mut state = self.store_mut().await;
let project =
state.unregister_project(request.payload.project_id, request.sender_id)?;
let project = state.unregister_project(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
(state.user_id_for_connection(request.sender_id)?, project)
};
@ -664,7 +676,7 @@ impl Server {
request: TypedEnvelope<proto::JoinProject>,
response: Response<proto::JoinProject>,
) -> Result<()> {
let project_id = request.payload.project_id;
let project_id = ProjectId::from_proto(request.payload.project_id);
let host_user_id;
let guest_user_id;
@ -677,7 +689,7 @@ impl Server {
guest_user_id = state.user_id_for_connection(request.sender_id)?;
};
tracing::info!(project_id, %host_user_id, %host_connection_id, "join project");
tracing::info!(%project_id, %host_user_id, %host_connection_id, "join project");
let has_contact = self
.app_state
.db
@ -695,7 +707,7 @@ impl Server {
self.peer.send(
host_connection_id,
proto::RequestJoinProject {
project_id,
project_id: project_id.to_proto(),
requester_id: guest_user_id.to_proto(),
},
)?;
@ -710,7 +722,7 @@ impl Server {
{
let mut state = self.store_mut().await;
let project_id = request.payload.project_id;
let project_id = ProjectId::from_proto(request.payload.project_id);
let project = state.project(project_id)?;
if project.host_connection_id != request.sender_id {
Err(anyhow!("no such connection"))?;
@ -790,7 +802,7 @@ impl Server {
self.peer.send(
conn_id,
proto::AddProjectCollaborator {
project_id,
project_id: project_id.to_proto(),
collaborator: Some(proto::Collaborator {
peer_id: receipt.sender_id.0,
replica_id: *replica_id as u32,
@ -828,13 +840,13 @@ impl Server {
request: TypedEnvelope<proto::LeaveProject>,
) -> Result<()> {
let sender_id = request.sender_id;
let project_id = request.payload.project_id;
let project_id = ProjectId::from_proto(request.payload.project_id);
let project;
{
let mut store = self.store_mut().await;
project = store.leave_project(sender_id, project_id)?;
tracing::info!(
project_id,
%project_id,
host_user_id = %project.host_user_id,
host_connection_id = %project.host_connection_id,
"leave project"
@ -845,7 +857,7 @@ impl Server {
self.peer.send(
conn_id,
proto::RemoveProjectCollaborator {
project_id,
project_id: project_id.to_proto(),
peer_id: sender_id.0,
},
)
@ -856,7 +868,7 @@ impl Server {
self.peer.send(
project.host_connection_id,
proto::JoinProjectRequestCancelled {
project_id,
project_id: project_id.to_proto(),
requester_id: requester_id.to_proto(),
},
)?;
@ -865,7 +877,9 @@ impl Server {
if project.unshare {
self.peer.send(
project.host_connection_id,
proto::ProjectUnshared { project_id },
proto::ProjectUnshared {
project_id: project_id.to_proto(),
},
)?;
}
}
@ -877,18 +891,15 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::UpdateProject>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let user_id;
{
let mut state = self.store_mut().await;
user_id = state.user_id_for_connection(request.sender_id)?;
let guest_connection_ids = state
.read_project(request.payload.project_id, request.sender_id)?
.read_project(project_id, request.sender_id)?
.guest_connection_ids();
state.update_project(
request.payload.project_id,
&request.payload.worktrees,
request.sender_id,
)?;
state.update_project(project_id, &request.payload.worktrees, request.sender_id)?;
broadcast(request.sender_id, guest_connection_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -902,9 +913,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::RegisterProjectActivity>,
) -> Result<()> {
self.store_mut()
.await
.register_project_activity(request.payload.project_id, request.sender_id)?;
self.store_mut().await.register_project_activity(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
Ok(())
}
@ -913,28 +925,25 @@ impl Server {
request: TypedEnvelope<proto::UpdateWorktree>,
response: Response<proto::UpdateWorktree>,
) -> Result<()> {
let (connection_ids, metadata_changed) = {
let project_id = ProjectId::from_proto(request.payload.project_id);
let worktree_id = request.payload.worktree_id;
let (connection_ids, metadata_changed, extension_counts) = {
let mut store = self.store_mut().await;
let (connection_ids, metadata_changed, extension_counts) = store.update_worktree(
request.sender_id,
request.payload.project_id,
request.payload.worktree_id,
project_id,
worktree_id,
&request.payload.root_name,
&request.payload.removed_entries,
&request.payload.updated_entries,
request.payload.scan_id,
)?;
for (extension, count) in extension_counts {
tracing::info!(
project_id = request.payload.project_id,
worktree_id = request.payload.worktree_id,
?extension,
%count,
"worktree updated"
);
}
(connection_ids, metadata_changed)
(connection_ids, metadata_changed, extension_counts.clone())
};
self.app_state
.db
.update_worktree_extensions(project_id, worktree_id, extension_counts)
.await?;
broadcast(request.sender_id, connection_ids, |connection_id| {
self.peer
@ -961,7 +970,7 @@ impl Server {
.clone()
.ok_or_else(|| anyhow!("invalid summary"))?;
let receiver_ids = self.store_mut().await.update_diagnostic_summary(
request.payload.project_id,
ProjectId::from_proto(request.payload.project_id),
request.payload.worktree_id,
request.sender_id,
summary,
@ -979,7 +988,7 @@ impl Server {
request: TypedEnvelope<proto::StartLanguageServer>,
) -> Result<()> {
let receiver_ids = self.store_mut().await.start_language_server(
request.payload.project_id,
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
request
.payload
@ -998,10 +1007,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::UpdateLanguageServer>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -1020,7 +1029,10 @@ impl Server {
let host_connection_id = self
.store()
.await
.read_project(request.payload.remote_entity_id(), request.sender_id)?
.read_project(
ProjectId::from_proto(request.payload.remote_entity_id()),
request.sender_id,
)?
.host_connection_id;
response.send(
@ -1036,10 +1048,11 @@ impl Server {
request: TypedEnvelope<proto::SaveBuffer>,
response: Response<proto::SaveBuffer>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let host = self
.store()
.await
.read_project(request.payload.project_id, request.sender_id)?
.read_project(project_id, request.sender_id)?
.host_connection_id;
let response_payload = self
.peer
@ -1049,7 +1062,7 @@ impl Server {
let mut guests = self
.store()
.await
.read_project(request.payload.project_id, request.sender_id)?
.read_project(project_id, request.sender_id)?
.connection_ids();
guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
broadcast(host, guests, |conn_id| {
@ -1065,10 +1078,11 @@ impl Server {
request: TypedEnvelope<proto::UpdateBuffer>,
response: Response<proto::UpdateBuffer>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let receiver_ids = {
let mut store = self.store_mut().await;
store.register_project_activity(request.payload.project_id, request.sender_id)?;
store.project_connection_ids(request.payload.project_id, request.sender_id)?
store.register_project_activity(project_id, request.sender_id)?;
store.project_connection_ids(project_id, request.sender_id)?
};
broadcast(request.sender_id, receiver_ids, |connection_id| {
@ -1083,10 +1097,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::UpdateBufferFile>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -1098,10 +1112,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::BufferReloaded>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -1113,10 +1127,10 @@ impl Server {
self: Arc<Server>,
request: TypedEnvelope<proto::BufferSaved>,
) -> Result<()> {
let receiver_ids = self
.store()
.await
.project_connection_ids(request.payload.project_id, request.sender_id)?;
let receiver_ids = self.store().await.project_connection_ids(
ProjectId::from_proto(request.payload.project_id),
request.sender_id,
)?;
broadcast(request.sender_id, receiver_ids, |connection_id| {
self.peer
.forward_send(request.sender_id, connection_id, request.payload.clone())
@ -1129,18 +1143,19 @@ impl Server {
request: TypedEnvelope<proto::Follow>,
response: Response<proto::Follow>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let leader_id = ConnectionId(request.payload.leader_id);
let follower_id = request.sender_id;
{
let mut store = self.store_mut().await;
if !store
.project_connection_ids(request.payload.project_id, follower_id)?
.project_connection_ids(project_id, follower_id)?
.contains(&leader_id)
{
Err(anyhow!("no such peer"))?;
}
store.register_project_activity(request.payload.project_id, follower_id)?;
store.register_project_activity(project_id, follower_id)?;
}
let mut response_payload = self
@ -1155,15 +1170,16 @@ impl Server {
}
async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let leader_id = ConnectionId(request.payload.leader_id);
let mut store = self.store_mut().await;
if !store
.project_connection_ids(request.payload.project_id, request.sender_id)?
.project_connection_ids(project_id, request.sender_id)?
.contains(&leader_id)
{
Err(anyhow!("no such peer"))?;
}
store.register_project_activity(request.payload.project_id, request.sender_id)?;
store.register_project_activity(project_id, request.sender_id)?;
self.peer
.forward_send(request.sender_id, leader_id, request.payload)?;
Ok(())
@ -1173,10 +1189,10 @@ impl Server {
self: Arc<Self>,
request: TypedEnvelope<proto::UpdateFollowers>,
) -> Result<()> {
let project_id = ProjectId::from_proto(request.payload.project_id);
let mut store = self.store_mut().await;
store.register_project_activity(request.payload.project_id, request.sender_id)?;
let connection_ids =
store.project_connection_ids(request.payload.project_id, request.sender_id)?;
store.register_project_activity(project_id, request.sender_id)?;
let connection_ids = store.project_connection_ids(project_id, request.sender_id)?;
let leader_id = request
.payload
.variant