remote projects per user (#10594)

Release Notes:

- Made remote projects per-user instead of per-channel. If you'd like to
be part of the remote development alpha, please email hi@zed.dev.

---------

Co-authored-by: Bennet Bo Fenner <53836821+bennetbo@users.noreply.github.com>
Co-authored-by: Bennet <bennetbo@gmx.de>
Co-authored-by: Nate Butler <1714999+iamnbutler@users.noreply.github.com>
Co-authored-by: Nate Butler <iamnbutler@gmail.com>
This commit is contained in:
Conrad Irwin 2024-04-23 15:33:09 -06:00 committed by GitHub
parent 8ae4c3277f
commit e0c83a1d32
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 2807 additions and 1625 deletions

View file

@ -255,6 +255,13 @@ impl DevServerSession {
pub fn dev_server_id(&self) -> DevServerId {
self.0.dev_server_id().unwrap()
}
fn dev_server(&self) -> &dev_server::Model {
match &self.0.principal {
Principal::DevServer(dev_server) => dev_server,
_ => unreachable!(),
}
}
}
impl Deref for DevServerSession {
@ -405,6 +412,7 @@ impl Server {
.add_request_handler(user_handler(rejoin_remote_projects))
.add_request_handler(user_handler(create_remote_project))
.add_request_handler(user_handler(create_dev_server))
.add_request_handler(user_handler(delete_dev_server))
.add_request_handler(dev_server_handler(share_remote_project))
.add_request_handler(dev_server_handler(shutdown_dev_server))
.add_request_handler(dev_server_handler(reconnect_dev_server))
@ -1044,12 +1052,14 @@ impl Server {
.await?;
}
let (contacts, channels_for_user, channel_invites) = future::try_join3(
self.app_state.db.get_contacts(user.id),
self.app_state.db.get_channels_for_user(user.id),
self.app_state.db.get_channel_invites_for_user(user.id),
)
.await?;
let (contacts, channels_for_user, channel_invites, remote_projects) =
future::try_join4(
self.app_state.db.get_contacts(user.id),
self.app_state.db.get_channels_for_user(user.id),
self.app_state.db.get_channel_invites_for_user(user.id),
self.app_state.db.remote_projects_update(user.id),
)
.await?;
{
let mut pool = self.connection_pool.lock();
@ -1067,9 +1077,10 @@ impl Server {
)?;
self.peer.send(
connection_id,
build_channels_update(channels_for_user, channel_invites, &pool),
build_channels_update(channels_for_user, channel_invites),
)?;
}
send_remote_projects_update(user.id, remote_projects, session).await;
if let Some(incoming_call) =
self.app_state.db.incoming_call_for_user(user.id).await?
@ -1087,9 +1098,6 @@ impl Server {
};
pool.add_dev_server(connection_id, dev_server.id, zed_version);
}
update_dev_server_status(dev_server, proto::DevServerStatus::Online, &session)
.await;
// todo!() allow only one connection.
let projects = self
.app_state
@ -1098,6 +1106,13 @@ impl Server {
.await?;
self.peer
.send(connection_id, proto::DevServerInstructions { projects })?;
let status = self
.app_state
.db
.remote_projects_update(dev_server.user_id)
.await?;
send_remote_projects_update(dev_server.user_id, status, &session).await;
}
}
@ -1401,10 +1416,8 @@ async fn connection_lost(
update_user_contacts(session.user_id(), &session).await?;
},
Principal::DevServer(dev_server) => {
lost_dev_server_connection(&session).await?;
update_dev_server_status(&dev_server, proto::DevServerStatus::Offline, &session)
.await;
Principal::DevServer(_) => {
lost_dev_server_connection(&session.for_dev_server().unwrap()).await?;
},
}
},
@ -1941,6 +1954,9 @@ async fn share_project(
RoomId::from_proto(request.room_id),
session.connection_id,
&request.worktrees,
request
.remote_project_id
.map(|id| RemoteProjectId::from_proto(id)),
)
.await?;
response.send(proto::ShareProjectResponse {
@ -1954,14 +1970,25 @@ async fn share_project(
/// Unshare a project from the room.
async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> {
let project_id = ProjectId::from_proto(message.project_id);
unshare_project_internal(project_id, &session).await
unshare_project_internal(
project_id,
session.connection_id,
session.user_id(),
&session,
)
.await
}
async fn unshare_project_internal(project_id: ProjectId, session: &Session) -> Result<()> {
async fn unshare_project_internal(
project_id: ProjectId,
connection_id: ConnectionId,
user_id: Option<UserId>,
session: &Session,
) -> Result<()> {
let (room, guest_connection_ids) = &*session
.db()
.await
.unshare_project(project_id, session.connection_id)
.unshare_project(project_id, connection_id, user_id)
.await?;
let message = proto::UnshareProject {
@ -1969,7 +1996,7 @@ async fn unshare_project_internal(project_id: ProjectId, session: &Session) -> R
};
broadcast(
Some(session.connection_id),
Some(connection_id),
guest_connection_ids.iter().copied(),
|conn_id| session.peer.send(conn_id, message.clone()),
);
@ -1980,13 +2007,13 @@ async fn unshare_project_internal(project_id: ProjectId, session: &Session) -> R
Ok(())
}
/// Share a project into the room.
/// DevServer makes a project available online
async fn share_remote_project(
request: proto::ShareRemoteProject,
response: Response<proto::ShareRemoteProject>,
session: DevServerSession,
) -> Result<()> {
let remote_project = session
let (remote_project, user_id, status) = session
.db()
.await
.share_remote_project(
@ -2000,22 +2027,7 @@ async fn share_remote_project(
return Err(anyhow!("failed to share remote project"))?;
};
for (connection_id, _) in session
.connection_pool()
.await
.channel_connection_ids(ChannelId::from_proto(remote_project.channel_id))
{
session
.peer
.send(
connection_id,
proto::UpdateChannels {
remote_projects: vec![remote_project.clone()],
..Default::default()
},
)
.trace_err();
}
send_remote_projects_update(user_id, status, &session).await;
response.send(proto::ShareProjectResponse { project_id })?;
@ -2081,19 +2093,21 @@ fn join_project_internal(
})
.collect::<Vec<_>>();
let add_project_collaborator = proto::AddProjectCollaborator {
project_id: project_id.to_proto(),
collaborator: Some(proto::Collaborator {
peer_id: Some(session.connection_id.into()),
replica_id: replica_id.0 as u32,
user_id: guest_user_id.to_proto(),
}),
};
for collaborator in &collaborators {
session
.peer
.send(
collaborator.peer_id.unwrap().into(),
proto::AddProjectCollaborator {
project_id: project_id.to_proto(),
collaborator: Some(proto::Collaborator {
peer_id: Some(session.connection_id.into()),
replica_id: replica_id.0 as u32,
user_id: guest_user_id.to_proto(),
}),
},
add_project_collaborator.clone(),
)
.trace_err();
}
@ -2105,7 +2119,10 @@ fn join_project_internal(
replica_id: replica_id.0 as u32,
collaborators: collaborators.clone(),
language_servers: project.language_servers.clone(),
role: project.role.into(), // todo
role: project.role.into(),
remote_project_id: project
.remote_project_id
.map(|remote_project_id| remote_project_id.0 as u64),
})?;
for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
@ -2188,8 +2205,6 @@ async fn leave_project(request: proto::LeaveProject, session: UserSession) -> Re
let (room, project) = &*db.leave_project(project_id, sender_id).await?;
tracing::info!(
%project_id,
host_user_id = ?project.host_user_id,
host_connection_id = ?project.host_connection_id,
"leave project"
);
@ -2224,13 +2239,33 @@ async fn create_remote_project(
response: Response<proto::CreateRemoteProject>,
session: UserSession,
) -> Result<()> {
let (channel, remote_project) = session
let dev_server_id = DevServerId(request.dev_server_id as i32);
let dev_server_connection_id = session
.connection_pool()
.await
.dev_server_connection_id(dev_server_id);
let Some(dev_server_connection_id) = dev_server_connection_id else {
Err(ErrorCode::DevServerOffline
.message("Cannot create a remote project when the dev server is offline".to_string())
.anyhow())?
};
let path = request.path.clone();
//Check that the path exists on the dev server
session
.peer
.forward_request(
session.connection_id,
dev_server_connection_id,
proto::ValidateRemoteProjectRequest { path: path.clone() },
)
.await?;
let (remote_project, update) = session
.db()
.await
.create_remote_project(
ChannelId(request.channel_id as i32),
DevServerId(request.dev_server_id as i32),
&request.name,
&request.path,
session.user_id(),
)
@ -2242,25 +2277,12 @@ async fn create_remote_project(
.get_remote_projects_for_dev_server(remote_project.dev_server_id)
.await?;
let update = proto::UpdateChannels {
remote_projects: vec![remote_project.to_proto(None)],
..Default::default()
};
let connection_pool = session.connection_pool().await;
for (connection_id, role) in connection_pool.channel_connection_ids(channel.root_id()) {
if role.can_see_all_descendants() {
session.peer.send(connection_id, update.clone())?;
}
}
session.peer.send(
dev_server_connection_id,
proto::DevServerInstructions { projects },
)?;
let dev_server_id = remote_project.dev_server_id;
let dev_server_connection_id = connection_pool.dev_server_connection_id(dev_server_id);
if let Some(dev_server_connection_id) = dev_server_connection_id {
session.peer.send(
dev_server_connection_id,
proto::DevServerInstructions { projects },
)?;
}
send_remote_projects_update(session.user_id(), update, &session).await;
response.send(proto::CreateRemoteProjectResponse {
remote_project: Some(remote_project.to_proto(None)),
@ -2276,37 +2298,56 @@ async fn create_dev_server(
let access_token = auth::random_token();
let hashed_access_token = auth::hash_access_token(&access_token);
let (channel, dev_server) = session
let (dev_server, status) = session
.db()
.await
.create_dev_server(
ChannelId(request.channel_id as i32),
&request.name,
&hashed_access_token,
session.user_id(),
)
.create_dev_server(&request.name, &hashed_access_token, session.user_id())
.await?;
let update = proto::UpdateChannels {
dev_servers: vec![dev_server.to_proto(proto::DevServerStatus::Offline)],
..Default::default()
};
let connection_pool = session.connection_pool().await;
for (connection_id, role) in connection_pool.channel_connection_ids(channel.root_id()) {
if role.can_see_channel(channel.visibility) {
session.peer.send(connection_id, update.clone())?;
}
}
send_remote_projects_update(session.user_id(), status, &session).await;
response.send(proto::CreateDevServerResponse {
dev_server_id: dev_server.id.0 as u64,
channel_id: request.channel_id,
access_token: auth::generate_dev_server_token(dev_server.id.0 as usize, access_token),
name: request.name.clone(),
})?;
Ok(())
}
async fn delete_dev_server(
request: proto::DeleteDevServer,
response: Response<proto::DeleteDevServer>,
session: UserSession,
) -> Result<()> {
let dev_server_id = DevServerId(request.dev_server_id as i32);
let dev_server = session.db().await.get_dev_server(dev_server_id).await?;
if dev_server.user_id != session.user_id() {
return Err(anyhow!(ErrorCode::Forbidden))?;
}
let connection_id = session
.connection_pool()
.await
.dev_server_connection_id(dev_server_id);
if let Some(connection_id) = connection_id {
shutdown_dev_server_internal(dev_server_id, connection_id, &session).await?;
session
.peer
.send(connection_id, proto::ShutdownDevServer {})?;
}
let status = session
.db()
.await
.delete_dev_server(dev_server_id, session.user_id())
.await?;
send_remote_projects_update(session.user_id(), status, &session).await;
response.send(proto::Ack {})?;
Ok(())
}
async fn rejoin_remote_projects(
request: proto::RejoinRemoteProjects,
response: Response<proto::RejoinRemoteProjects>,
@ -2403,8 +2444,15 @@ async fn shutdown_dev_server(
session: DevServerSession,
) -> Result<()> {
response.send(proto::Ack {})?;
shutdown_dev_server_internal(session.dev_server_id(), session.connection_id, &session).await
}
async fn shutdown_dev_server_internal(
dev_server_id: DevServerId,
connection_id: ConnectionId,
session: &Session,
) -> Result<()> {
let (remote_projects, dev_server) = {
let dev_server_id = session.dev_server_id();
let db = session.db().await;
let remote_projects = db.get_remote_projects_for_dev_server(dev_server_id).await?;
let dev_server = db.get_dev_server(dev_server_id).await?;
@ -2412,22 +2460,26 @@ async fn shutdown_dev_server(
};
for project_id in remote_projects.iter().filter_map(|p| p.project_id) {
unshare_project_internal(ProjectId::from_proto(project_id), &session.0).await?;
unshare_project_internal(
ProjectId::from_proto(project_id),
connection_id,
None,
session,
)
.await?;
}
let update = proto::UpdateChannels {
remote_projects,
dev_servers: vec![dev_server.to_proto(proto::DevServerStatus::Offline)],
..Default::default()
};
for (connection_id, _) in session
session
.connection_pool()
.await
.channel_connection_ids(dev_server.channel_id)
{
session.peer.send(connection_id, update.clone()).trace_err();
}
.set_dev_server_offline(dev_server_id);
let status = session
.db()
.await
.remote_projects_update(dev_server.user_id)
.await?;
send_remote_projects_update(dev_server.user_id, status, &session).await;
Ok(())
}
@ -4626,7 +4678,7 @@ fn notify_membership_updated(
..Default::default()
};
let mut update = build_channels_update(result.new_channels, vec![], connection_pool);
let mut update = build_channels_update(result.new_channels, vec![]);
update.delete_channels = result
.removed_channels
.into_iter()
@ -4659,7 +4711,6 @@ fn build_update_user_channels(channels: &ChannelsForUser) -> proto::UpdateUserCh
fn build_channels_update(
channels: ChannelsForUser,
channel_invites: Vec<db::Channel>,
pool: &ConnectionPool,
) -> proto::UpdateChannels {
let mut update = proto::UpdateChannels::default();
@ -4684,13 +4735,6 @@ fn build_channels_update(
}
update.hosted_projects = channels.hosted_projects;
update.dev_servers = channels
.dev_servers
.into_iter()
.map(|dev_server| dev_server.to_proto(pool.dev_server_status(dev_server.id)))
.collect();
update.remote_projects = channels.remote_projects;
update
}
@ -4777,24 +4821,19 @@ fn channel_updated(
);
}
async fn update_dev_server_status(
dev_server: &dev_server::Model,
status: proto::DevServerStatus,
async fn send_remote_projects_update(
user_id: UserId,
mut status: proto::RemoteProjectsUpdate,
session: &Session,
) {
let pool = session.connection_pool().await;
let connections = pool.channel_connection_ids(dev_server.channel_id);
for (connection_id, _) in connections {
session
.peer
.send(
connection_id,
proto::UpdateChannels {
dev_servers: vec![dev_server.to_proto(status)],
..Default::default()
},
)
.trace_err();
for dev_server in &mut status.dev_servers {
dev_server.status =
pool.dev_server_status(DevServerId(dev_server.dev_server_id as i32)) as i32;
}
let connections = pool.user_connection_ids(user_id);
for connection_id in connections {
session.peer.send(connection_id, status.clone()).trace_err();
}
}
@ -4833,7 +4872,7 @@ async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()>
Ok(())
}
async fn lost_dev_server_connection(session: &Session) -> Result<()> {
async fn lost_dev_server_connection(session: &DevServerSession) -> Result<()> {
log::info!("lost dev server connection, unsharing projects");
let project_ids = session
.db()
@ -4843,9 +4882,14 @@ async fn lost_dev_server_connection(session: &Session) -> Result<()> {
for project_id in project_ids {
// not unshare re-checks the connection ids match, so we get away with no transaction
unshare_project_internal(project_id, &session).await?;
unshare_project_internal(project_id, session.connection_id, None, &session).await?;
}
let user_id = session.dev_server().user_id;
let update = session.db().await.remote_projects_update(user_id).await?;
send_remote_projects_update(user_id, update, session).await;
Ok(())
}
@ -4947,7 +4991,7 @@ async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
fn project_left(project: &db::LeftProject, session: &UserSession) {
for connection_id in &project.connection_ids {
if project.host_user_id == Some(session.user_id()) {
if project.should_unshare {
session
.peer
.send(