Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Conrad Irwin
7b0db3ba23 TEMP 2024-03-11 09:00:08 -06:00
Conrad Irwin
e20896ae84 Allow opening files in a hosted project 2024-03-07 21:59:28 -07:00
8 changed files with 200 additions and 34 deletions

View file

@ -1,5 +1,5 @@
use crate::Result; use crate::Result;
use rpc::proto; use rpc::{proto, ConnectionId};
use sea_orm::{entity::prelude::*, DbErr}; use sea_orm::{entity::prelude::*, DbErr};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -267,3 +267,8 @@ impl Into<i32> for ChannelVisibility {
proto.into() proto.into()
} }
} }
pub enum ProjectLocation {
Remote(ConnectionId),
Hosted(HostedProjectId),
}

View file

@ -912,12 +912,28 @@ impl Database {
} }
/// Returns the host connection for a read-only request to join a shared project. /// Returns the host connection for a read-only request to join a shared project.
pub async fn host_for_read_only_project_request( pub async fn location_for_read_only_project_request(
&self, &self,
project_id: ProjectId, project_id: ProjectId,
connection_id: ConnectionId, connection_id: ConnectionId,
) -> Result<ConnectionId> { ) -> Result<ProjectLocation> {
let room_id = self.room_id_for_project(project_id).await?; let project = self
.transaction(|tx| async move {
Ok(project::Entity::find_by_id(project_id)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("project {} not found", project_id))?)
})
.await?;
if let Some(hosted_project_id) = project.hosted_project_id {
return Ok(ProjectLocation::Hosted(hosted_project_id));
};
let Some(room_id) = project.room_id else {
return Err(anyhow!("project not in room"))?;
};
self.room_transaction(room_id, |tx| async move { self.room_transaction(room_id, |tx| async move {
let current_participant = room_participant::Entity::find() let current_participant = room_participant::Entity::find()
.filter(room_participant::Column::RoomId.eq(room_id)) .filter(room_participant::Column::RoomId.eq(room_id))
@ -943,7 +959,7 @@ impl Database {
.await? .await?
.ok_or_else(|| anyhow!("failed to read project host"))?; .ok_or_else(|| anyhow!("failed to read project host"))?;
Ok(host.connection()) Ok(ProjectLocation::Remote(host.connection()))
}) })
.await .await
.map(|guard| guard.into_inner()) .map(|guard| guard.into_inner())
@ -1097,7 +1113,7 @@ impl Database {
.ok_or_else(|| anyhow!("project {} not found", project_id))?; .ok_or_else(|| anyhow!("project {} not found", project_id))?;
Ok(project Ok(project
.room_id .room_id
.ok_or_else(|| anyhow!("project not in room"))?) .ok_or_else(|| anyhow!("project is not in a room"))?)
}) })
.await .await
} }

125
crates/collab/src/hosted.rs Normal file
View file

@ -0,0 +1,125 @@
use anyhow::anyhow;
use rpc::proto::{
self, create_buffer_for_peer, CreateBufferForPeer, EntityMessage, OpenBufferResponse,
RequestMessage,
};
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
use crate::db::{buffer, worktree_entry, HostedProjectId, ProjectId};
use crate::rpc::{Response, Session};
use crate::Result;
pub(crate) trait ProjectRequest: EntityMessage + RequestMessage {
async fn handle_hosted_project_request(
self,
_hosted_project_id: HostedProjectId,
_response: Response<Self>,
_session: Session,
) -> Result<()> {
Err(anyhow!("not supported for hosted projects"))?
}
}
impl ProjectRequest for proto::GetHover {}
impl ProjectRequest for proto::GetDefinition {}
impl ProjectRequest for proto::GetTypeDefinition {}
impl ProjectRequest for proto::GetReferences {}
impl ProjectRequest for proto::SearchProject {}
impl ProjectRequest for proto::GetDocumentHighlights {}
impl ProjectRequest for proto::GetProjectSymbols {}
impl ProjectRequest for proto::OpenBufferForSymbol {}
impl ProjectRequest for proto::OpenBufferById {}
impl ProjectRequest for proto::SynchronizeBuffers {}
impl ProjectRequest for proto::InlayHints {}
impl ProjectRequest for proto::OpenBufferByPath {
async fn handle_hosted_project_request(
self,
_hosted_project_id: HostedProjectId,
response: Response<Self>,
session: Session,
) -> Result<()> {
let project_id = ProjectId(self.project_id as i32);
let worktree_id = self.worktree_id as i32;
let path = self.path.clone();
let (entry, buffer) = session
.db()
.await
.transaction({
let path = &path;
move |tx| async move {
let entry = worktree_entry::Entity::find()
.filter(
Condition::all()
.add(worktree_entry::Column::ProjectId.eq(project_id))
.add(worktree_entry::Column::WorktreeId.eq(worktree_id))
.add(worktree_entry::Column::Path.eq(path.clone())),
)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such file"))?;
/// TODO: use a different column here?
let buffer = buffer::Entity::find_by_id(BufferId(entry.inode))
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such buffer"))?;
Ok((entry, buffer))
}
})
.await?;
response.send(OpenBufferResponse {
buffer_id: entry.inode as u64,
})?;
session.peer.send(
session.connection_id,
CreateBufferForPeer {
project_id: project_id.to_proto(),
peer_id: None,
variant: Some(proto::create_buffer_for_peer::Variant::State(
proto::BufferState {
id: entry.inode as u64,
file: Some(proto::File {
worktree_id: entry.worktree_id as u64,
entry_id: Some(entry.inode as u64),
path,
mtime: Some(proto::Timestamp {
seconds: entry.mtime_seconds as u64,
nanos: entry.mtime_nanos as u32,
}),
is_deleted: entry.is_deleted,
}),
base_text: "Hello world".to_string(),
diff_base: Some("Hello world".to_string()),
line_ending: proto::LineEnding::Unix.into(),
saved_version: vec![],
saved_version_fingerprint: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
saved_mtime: Some(proto::Timestamp {
seconds: entry.mtime_seconds as u64,
nanos: entry.mtime_nanos as u32,
}),
},
)),
},
)?;
session.peer.send(
session.connection_id,
CreateBufferForPeer {
project_id: project_id.to_proto(),
peer_id: None,
variant: Some(create_buffer_for_peer::Variant::Chunk(proto::BufferChunk {
buffer_id: entry.inode as u64,
operations: vec![],
is_last: true,
})),
},
)?;
Ok(())
}
}

View file

@ -3,6 +3,7 @@ pub mod auth;
pub mod db; pub mod db;
pub mod env; pub mod env;
pub mod executor; pub mod executor;
pub mod hosted;
pub mod rpc; pub mod rpc;
#[cfg(test)] #[cfg(test)]

View file

@ -5,10 +5,11 @@ use crate::{
db::{ db::{
self, BufferId, ChannelId, ChannelRole, ChannelsForUser, CreatedChannelMessage, Database, self, BufferId, ChannelId, ChannelRole, ChannelsForUser, CreatedChannelMessage, Database,
InviteMemberResult, MembershipUpdated, MessageId, NotificationId, Project, ProjectId, InviteMemberResult, MembershipUpdated, MessageId, NotificationId, Project, ProjectId,
RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId, ServerId, User, ProjectLocation, RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId,
UserId, ServerId, User, UserId,
}, },
executor::Executor, executor::Executor,
hosted::ProjectRequest,
AppState, Error, Result, AppState, Error, Result,
}; };
use anyhow::anyhow; use anyhow::anyhow;
@ -75,17 +76,17 @@ const MESSAGE_COUNT_PER_PAGE: usize = 100;
const MAX_MESSAGE_LEN: usize = 1024; const MAX_MESSAGE_LEN: usize = 1024;
const NOTIFICATION_COUNT_PER_PAGE: usize = 50; const NOTIFICATION_COUNT_PER_PAGE: usize = 50;
type MessageHandler = pub(crate) type MessageHandler =
Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>; Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
struct Response<R> { pub(crate) struct Response<R> {
peer: Arc<Peer>, peer: Arc<Peer>,
receipt: Receipt<R>, receipt: Receipt<R>,
responded: Arc<AtomicBool>, responded: Arc<AtomicBool>,
} }
impl<R: RequestMessage> Response<R> { impl<R: RequestMessage> Response<R> {
fn send(self, payload: R::Response) -> Result<()> { pub(crate) fn send(self, payload: R::Response) -> Result<()> {
self.responded.store(true, SeqCst); self.responded.store(true, SeqCst);
self.peer.respond(self.receipt, payload)?; self.peer.respond(self.receipt, payload)?;
Ok(()) Ok(())
@ -93,18 +94,18 @@ impl<R: RequestMessage> Response<R> {
} }
#[derive(Clone)] #[derive(Clone)]
struct Session { pub(crate) struct Session {
user_id: UserId, pub(crate) user_id: UserId,
connection_id: ConnectionId, pub(crate) connection_id: ConnectionId,
db: Arc<tokio::sync::Mutex<DbHandle>>, pub(crate) db: Arc<tokio::sync::Mutex<DbHandle>>,
peer: Arc<Peer>, pub(crate) peer: Arc<Peer>,
connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>, pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>, pub(crate) live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
_executor: Executor, _executor: Executor,
} }
impl Session { impl Session {
async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> { pub(crate) async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
#[cfg(test)] #[cfg(test)]
tokio::task::yield_now().await; tokio::task::yield_now().await;
let guard = self.db.lock().await; let guard = self.db.lock().await;
@ -113,7 +114,7 @@ impl Session {
guard guard
} }
async fn connection_pool(&self) -> ConnectionPoolGuard<'_> { pub(crate) async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
#[cfg(test)] #[cfg(test)]
tokio::task::yield_now().await; tokio::task::yield_now().await;
let guard = self.connection_pool.lock(); let guard = self.connection_pool.lock();
@ -133,7 +134,7 @@ impl fmt::Debug for Session {
} }
} }
struct DbHandle(Arc<Database>); pub(crate) struct DbHandle(Arc<Database>);
impl Deref for DbHandle { impl Deref for DbHandle {
type Target = Database; type Target = Database;
@ -1933,20 +1934,30 @@ async fn forward_read_only_project_request<T>(
session: Session, session: Session,
) -> Result<()> ) -> Result<()>
where where
T: EntityMessage + RequestMessage, T: ProjectRequest,
{ {
let project_id = ProjectId::from_proto(request.remote_entity_id()); let project_id = ProjectId::from_proto(request.remote_entity_id());
let host_connection_id = session let location = session
.db() .db()
.await .await
.host_for_read_only_project_request(project_id, session.connection_id) .location_for_read_only_project_request(project_id, session.connection_id)
.await?; .await?;
let payload = session
.peer match location {
.forward_request(session.connection_id, host_connection_id, request) ProjectLocation::Remote(connection_id) => {
.await?; let payload = session
response.send(payload)?; .peer
Ok(()) .forward_request(session.connection_id, connection_id, request)
.await?;
response.send(payload)?;
Ok(())
}
ProjectLocation::Hosted(hosted_project_id) => {
request
.handle_hosted_project_request(hosted_project_id, response, session)
.await
}
}
} }
/// forward a project request to the host. These requests are disallowed /// forward a project request to the host. These requests are disallowed

View file

@ -7839,10 +7839,13 @@ impl Project {
} }
let buffer_id = BufferId::new(state.id)?; let buffer_id = BufferId::new(state.id)?;
let buffer = cx.new_model(|_| { let buffer = Buffer::from_proto(
Buffer::from_proto(this.replica_id(), this.capability(), state, buffer_file) this.replica_id(),
.unwrap() this.capability(),
}); state,
buffer_file,
)?;
let buffer = cx.new_model(|_| buffer);
this.incomplete_remote_buffers this.incomplete_remote_buffers
.insert(buffer_id, Some(buffer)); .insert(buffer_id, Some(buffer));
} }

View file

@ -38,6 +38,10 @@ macro_rules! messages {
} }
} }
fn into_payload(self) -> envelope::Payload {
envelope::Payload::$name(self)
}
fn from_envelope(envelope: Envelope) -> Option<Self> { fn from_envelope(envelope: Envelope) -> Option<Self> {
if let Some(envelope::Payload::$name(msg)) = envelope.payload { if let Some(envelope::Payload::$name(msg)) = envelope.payload {
Some(msg) Some(msg)

View file

@ -28,6 +28,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's
responding_to: Option<u32>, responding_to: Option<u32>,
original_sender_id: Option<PeerId>, original_sender_id: Option<PeerId>,
) -> Envelope; ) -> Envelope;
fn into_payload(self) -> envelope::Payload;
fn from_envelope(envelope: Envelope) -> Option<Self>; fn from_envelope(envelope: Envelope) -> Option<Self>;
} }