Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Conrad Irwin
7b0db3ba23 TEMP 2024-03-11 09:00:08 -06:00
Conrad Irwin
e20896ae84 Allow opening files in a hosted project 2024-03-07 21:59:28 -07:00
8 changed files with 200 additions and 34 deletions

View file

@ -1,5 +1,5 @@
use crate::Result;
use rpc::proto;
use rpc::{proto, ConnectionId};
use sea_orm::{entity::prelude::*, DbErr};
use serde::{Deserialize, Serialize};
@ -267,3 +267,8 @@ impl Into<i32> for ChannelVisibility {
proto.into()
}
}
pub enum ProjectLocation {
Remote(ConnectionId),
Hosted(HostedProjectId),
}

View file

@ -912,12 +912,28 @@ impl Database {
}
/// Returns the host connection for a read-only request to join a shared project.
pub async fn host_for_read_only_project_request(
pub async fn location_for_read_only_project_request(
&self,
project_id: ProjectId,
connection_id: ConnectionId,
) -> Result<ConnectionId> {
let room_id = self.room_id_for_project(project_id).await?;
) -> Result<ProjectLocation> {
let project = self
.transaction(|tx| async move {
Ok(project::Entity::find_by_id(project_id)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("project {} not found", project_id))?)
})
.await?;
if let Some(hosted_project_id) = project.hosted_project_id {
return Ok(ProjectLocation::Hosted(hosted_project_id));
};
let Some(room_id) = project.room_id else {
return Err(anyhow!("project not in room"))?;
};
self.room_transaction(room_id, |tx| async move {
let current_participant = room_participant::Entity::find()
.filter(room_participant::Column::RoomId.eq(room_id))
@ -943,7 +959,7 @@ impl Database {
.await?
.ok_or_else(|| anyhow!("failed to read project host"))?;
Ok(host.connection())
Ok(ProjectLocation::Remote(host.connection()))
})
.await
.map(|guard| guard.into_inner())
@ -1097,7 +1113,7 @@ impl Database {
.ok_or_else(|| anyhow!("project {} not found", project_id))?;
Ok(project
.room_id
.ok_or_else(|| anyhow!("project not in room"))?)
.ok_or_else(|| anyhow!("project is not in a room"))?)
})
.await
}

125
crates/collab/src/hosted.rs Normal file
View file

@ -0,0 +1,125 @@
use anyhow::anyhow;
use rpc::proto::{
self, create_buffer_for_peer, CreateBufferForPeer, EntityMessage, OpenBufferResponse,
RequestMessage,
};
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
use crate::db::{buffer, worktree_entry, HostedProjectId, ProjectId};
use crate::rpc::{Response, Session};
use crate::Result;
pub(crate) trait ProjectRequest: EntityMessage + RequestMessage {
async fn handle_hosted_project_request(
self,
_hosted_project_id: HostedProjectId,
_response: Response<Self>,
_session: Session,
) -> Result<()> {
Err(anyhow!("not supported for hosted projects"))?
}
}
impl ProjectRequest for proto::GetHover {}
impl ProjectRequest for proto::GetDefinition {}
impl ProjectRequest for proto::GetTypeDefinition {}
impl ProjectRequest for proto::GetReferences {}
impl ProjectRequest for proto::SearchProject {}
impl ProjectRequest for proto::GetDocumentHighlights {}
impl ProjectRequest for proto::GetProjectSymbols {}
impl ProjectRequest for proto::OpenBufferForSymbol {}
impl ProjectRequest for proto::OpenBufferById {}
impl ProjectRequest for proto::SynchronizeBuffers {}
impl ProjectRequest for proto::InlayHints {}
impl ProjectRequest for proto::OpenBufferByPath {
async fn handle_hosted_project_request(
self,
_hosted_project_id: HostedProjectId,
response: Response<Self>,
session: Session,
) -> Result<()> {
let project_id = ProjectId(self.project_id as i32);
let worktree_id = self.worktree_id as i32;
let path = self.path.clone();
let (entry, buffer) = session
.db()
.await
.transaction({
let path = &path;
move |tx| async move {
let entry = worktree_entry::Entity::find()
.filter(
Condition::all()
.add(worktree_entry::Column::ProjectId.eq(project_id))
.add(worktree_entry::Column::WorktreeId.eq(worktree_id))
.add(worktree_entry::Column::Path.eq(path.clone())),
)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such file"))?;
/// TODO: use a different column here?
let buffer = buffer::Entity::find_by_id(BufferId(entry.inode))
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such buffer"))?;
Ok((entry, buffer))
}
})
.await?;
response.send(OpenBufferResponse {
buffer_id: entry.inode as u64,
})?;
session.peer.send(
session.connection_id,
CreateBufferForPeer {
project_id: project_id.to_proto(),
peer_id: None,
variant: Some(proto::create_buffer_for_peer::Variant::State(
proto::BufferState {
id: entry.inode as u64,
file: Some(proto::File {
worktree_id: entry.worktree_id as u64,
entry_id: Some(entry.inode as u64),
path,
mtime: Some(proto::Timestamp {
seconds: entry.mtime_seconds as u64,
nanos: entry.mtime_nanos as u32,
}),
is_deleted: entry.is_deleted,
}),
base_text: "Hello world".to_string(),
diff_base: Some("Hello world".to_string()),
line_ending: proto::LineEnding::Unix.into(),
saved_version: vec![],
saved_version_fingerprint: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
saved_mtime: Some(proto::Timestamp {
seconds: entry.mtime_seconds as u64,
nanos: entry.mtime_nanos as u32,
}),
},
)),
},
)?;
session.peer.send(
session.connection_id,
CreateBufferForPeer {
project_id: project_id.to_proto(),
peer_id: None,
variant: Some(create_buffer_for_peer::Variant::Chunk(proto::BufferChunk {
buffer_id: entry.inode as u64,
operations: vec![],
is_last: true,
})),
},
)?;
Ok(())
}
}

View file

@ -3,6 +3,7 @@ pub mod auth;
pub mod db;
pub mod env;
pub mod executor;
pub mod hosted;
pub mod rpc;
#[cfg(test)]

View file

@ -5,10 +5,11 @@ use crate::{
db::{
self, BufferId, ChannelId, ChannelRole, ChannelsForUser, CreatedChannelMessage, Database,
InviteMemberResult, MembershipUpdated, MessageId, NotificationId, Project, ProjectId,
RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId, ServerId, User,
UserId,
ProjectLocation, RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId,
ServerId, User, UserId,
},
executor::Executor,
hosted::ProjectRequest,
AppState, Error, Result,
};
use anyhow::anyhow;
@ -75,17 +76,17 @@ const MESSAGE_COUNT_PER_PAGE: usize = 100;
const MAX_MESSAGE_LEN: usize = 1024;
const NOTIFICATION_COUNT_PER_PAGE: usize = 50;
type MessageHandler =
pub(crate) type MessageHandler =
Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
struct Response<R> {
pub(crate) struct Response<R> {
peer: Arc<Peer>,
receipt: Receipt<R>,
responded: Arc<AtomicBool>,
}
impl<R: RequestMessage> Response<R> {
fn send(self, payload: R::Response) -> Result<()> {
pub(crate) fn send(self, payload: R::Response) -> Result<()> {
self.responded.store(true, SeqCst);
self.peer.respond(self.receipt, payload)?;
Ok(())
@ -93,18 +94,18 @@ impl<R: RequestMessage> Response<R> {
}
#[derive(Clone)]
struct Session {
user_id: UserId,
connection_id: ConnectionId,
db: Arc<tokio::sync::Mutex<DbHandle>>,
peer: Arc<Peer>,
connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
pub(crate) struct Session {
pub(crate) user_id: UserId,
pub(crate) connection_id: ConnectionId,
pub(crate) db: Arc<tokio::sync::Mutex<DbHandle>>,
pub(crate) peer: Arc<Peer>,
pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
pub(crate) live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
_executor: Executor,
}
impl Session {
async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
pub(crate) async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
#[cfg(test)]
tokio::task::yield_now().await;
let guard = self.db.lock().await;
@ -113,7 +114,7 @@ impl Session {
guard
}
async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
pub(crate) async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
#[cfg(test)]
tokio::task::yield_now().await;
let guard = self.connection_pool.lock();
@ -133,7 +134,7 @@ impl fmt::Debug for Session {
}
}
struct DbHandle(Arc<Database>);
pub(crate) struct DbHandle(Arc<Database>);
impl Deref for DbHandle {
type Target = Database;
@ -1933,20 +1934,30 @@ async fn forward_read_only_project_request<T>(
session: Session,
) -> Result<()>
where
T: EntityMessage + RequestMessage,
T: ProjectRequest,
{
let project_id = ProjectId::from_proto(request.remote_entity_id());
let host_connection_id = session
let location = session
.db()
.await
.host_for_read_only_project_request(project_id, session.connection_id)
.location_for_read_only_project_request(project_id, session.connection_id)
.await?;
let payload = session
.peer
.forward_request(session.connection_id, host_connection_id, request)
.await?;
response.send(payload)?;
Ok(())
match location {
ProjectLocation::Remote(connection_id) => {
let payload = session
.peer
.forward_request(session.connection_id, connection_id, request)
.await?;
response.send(payload)?;
Ok(())
}
ProjectLocation::Hosted(hosted_project_id) => {
request
.handle_hosted_project_request(hosted_project_id, response, session)
.await
}
}
}
/// forward a project request to the host. These requests are disallowed

View file

@ -7839,10 +7839,13 @@ impl Project {
}
let buffer_id = BufferId::new(state.id)?;
let buffer = cx.new_model(|_| {
Buffer::from_proto(this.replica_id(), this.capability(), state, buffer_file)
.unwrap()
});
let buffer = Buffer::from_proto(
this.replica_id(),
this.capability(),
state,
buffer_file,
)?;
let buffer = cx.new_model(|_| buffer);
this.incomplete_remote_buffers
.insert(buffer_id, Some(buffer));
}

View file

@ -38,6 +38,10 @@ macro_rules! messages {
}
}
fn into_payload(self) -> envelope::Payload {
envelope::Payload::$name(self)
}
fn from_envelope(envelope: Envelope) -> Option<Self> {
if let Some(envelope::Payload::$name(msg)) = envelope.payload {
Some(msg)

View file

@ -28,6 +28,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's
responding_to: Option<u32>,
original_sender_id: Option<PeerId>,
) -> Envelope;
fn into_payload(self) -> envelope::Payload;
fn from_envelope(envelope: Envelope) -> Option<Self>;
}