Compare commits
2 commits
main
...
hosted-pro
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7b0db3ba23 | ||
![]() |
e20896ae84 |
8 changed files with 200 additions and 34 deletions
|
@ -1,5 +1,5 @@
|
|||
use crate::Result;
|
||||
use rpc::proto;
|
||||
use rpc::{proto, ConnectionId};
|
||||
use sea_orm::{entity::prelude::*, DbErr};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
|
@ -267,3 +267,8 @@ impl Into<i32> for ChannelVisibility {
|
|||
proto.into()
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ProjectLocation {
|
||||
Remote(ConnectionId),
|
||||
Hosted(HostedProjectId),
|
||||
}
|
||||
|
|
|
@ -912,12 +912,28 @@ impl Database {
|
|||
}
|
||||
|
||||
/// Returns the host connection for a read-only request to join a shared project.
|
||||
pub async fn host_for_read_only_project_request(
|
||||
pub async fn location_for_read_only_project_request(
|
||||
&self,
|
||||
project_id: ProjectId,
|
||||
connection_id: ConnectionId,
|
||||
) -> Result<ConnectionId> {
|
||||
let room_id = self.room_id_for_project(project_id).await?;
|
||||
) -> Result<ProjectLocation> {
|
||||
let project = self
|
||||
.transaction(|tx| async move {
|
||||
Ok(project::Entity::find_by_id(project_id)
|
||||
.one(&*tx)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow!("project {} not found", project_id))?)
|
||||
})
|
||||
.await?;
|
||||
|
||||
if let Some(hosted_project_id) = project.hosted_project_id {
|
||||
return Ok(ProjectLocation::Hosted(hosted_project_id));
|
||||
};
|
||||
|
||||
let Some(room_id) = project.room_id else {
|
||||
return Err(anyhow!("project not in room"))?;
|
||||
};
|
||||
|
||||
self.room_transaction(room_id, |tx| async move {
|
||||
let current_participant = room_participant::Entity::find()
|
||||
.filter(room_participant::Column::RoomId.eq(room_id))
|
||||
|
@ -943,7 +959,7 @@ impl Database {
|
|||
.await?
|
||||
.ok_or_else(|| anyhow!("failed to read project host"))?;
|
||||
|
||||
Ok(host.connection())
|
||||
Ok(ProjectLocation::Remote(host.connection()))
|
||||
})
|
||||
.await
|
||||
.map(|guard| guard.into_inner())
|
||||
|
@ -1097,7 +1113,7 @@ impl Database {
|
|||
.ok_or_else(|| anyhow!("project {} not found", project_id))?;
|
||||
Ok(project
|
||||
.room_id
|
||||
.ok_or_else(|| anyhow!("project not in room"))?)
|
||||
.ok_or_else(|| anyhow!("project is not in a room"))?)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
|
125
crates/collab/src/hosted.rs
Normal file
125
crates/collab/src/hosted.rs
Normal file
|
@ -0,0 +1,125 @@
|
|||
use anyhow::anyhow;
|
||||
use rpc::proto::{
|
||||
self, create_buffer_for_peer, CreateBufferForPeer, EntityMessage, OpenBufferResponse,
|
||||
RequestMessage,
|
||||
};
|
||||
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||
|
||||
use crate::db::{buffer, worktree_entry, HostedProjectId, ProjectId};
|
||||
use crate::rpc::{Response, Session};
|
||||
use crate::Result;
|
||||
|
||||
pub(crate) trait ProjectRequest: EntityMessage + RequestMessage {
|
||||
async fn handle_hosted_project_request(
|
||||
self,
|
||||
_hosted_project_id: HostedProjectId,
|
||||
_response: Response<Self>,
|
||||
_session: Session,
|
||||
) -> Result<()> {
|
||||
Err(anyhow!("not supported for hosted projects"))?
|
||||
}
|
||||
}
|
||||
impl ProjectRequest for proto::GetHover {}
|
||||
impl ProjectRequest for proto::GetDefinition {}
|
||||
impl ProjectRequest for proto::GetTypeDefinition {}
|
||||
impl ProjectRequest for proto::GetReferences {}
|
||||
impl ProjectRequest for proto::SearchProject {}
|
||||
impl ProjectRequest for proto::GetDocumentHighlights {}
|
||||
impl ProjectRequest for proto::GetProjectSymbols {}
|
||||
impl ProjectRequest for proto::OpenBufferForSymbol {}
|
||||
impl ProjectRequest for proto::OpenBufferById {}
|
||||
impl ProjectRequest for proto::SynchronizeBuffers {}
|
||||
impl ProjectRequest for proto::InlayHints {}
|
||||
|
||||
impl ProjectRequest for proto::OpenBufferByPath {
|
||||
async fn handle_hosted_project_request(
|
||||
self,
|
||||
_hosted_project_id: HostedProjectId,
|
||||
response: Response<Self>,
|
||||
session: Session,
|
||||
) -> Result<()> {
|
||||
let project_id = ProjectId(self.project_id as i32);
|
||||
let worktree_id = self.worktree_id as i32;
|
||||
let path = self.path.clone();
|
||||
|
||||
let (entry, buffer) = session
|
||||
.db()
|
||||
.await
|
||||
.transaction({
|
||||
let path = &path;
|
||||
|
||||
move |tx| async move {
|
||||
let entry = worktree_entry::Entity::find()
|
||||
.filter(
|
||||
Condition::all()
|
||||
.add(worktree_entry::Column::ProjectId.eq(project_id))
|
||||
.add(worktree_entry::Column::WorktreeId.eq(worktree_id))
|
||||
.add(worktree_entry::Column::Path.eq(path.clone())),
|
||||
)
|
||||
.one(&*tx)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow!("no such file"))?;
|
||||
|
||||
/// TODO: use a different column here?
|
||||
let buffer = buffer::Entity::find_by_id(BufferId(entry.inode))
|
||||
.one(&*tx)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow!("no such buffer"))?;
|
||||
|
||||
Ok((entry, buffer))
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
|
||||
response.send(OpenBufferResponse {
|
||||
buffer_id: entry.inode as u64,
|
||||
})?;
|
||||
|
||||
session.peer.send(
|
||||
session.connection_id,
|
||||
CreateBufferForPeer {
|
||||
project_id: project_id.to_proto(),
|
||||
peer_id: None,
|
||||
variant: Some(proto::create_buffer_for_peer::Variant::State(
|
||||
proto::BufferState {
|
||||
id: entry.inode as u64,
|
||||
file: Some(proto::File {
|
||||
worktree_id: entry.worktree_id as u64,
|
||||
entry_id: Some(entry.inode as u64),
|
||||
path,
|
||||
mtime: Some(proto::Timestamp {
|
||||
seconds: entry.mtime_seconds as u64,
|
||||
nanos: entry.mtime_nanos as u32,
|
||||
}),
|
||||
is_deleted: entry.is_deleted,
|
||||
}),
|
||||
base_text: "Hello world".to_string(),
|
||||
diff_base: Some("Hello world".to_string()),
|
||||
line_ending: proto::LineEnding::Unix.into(),
|
||||
saved_version: vec![],
|
||||
saved_version_fingerprint: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
|
||||
saved_mtime: Some(proto::Timestamp {
|
||||
seconds: entry.mtime_seconds as u64,
|
||||
nanos: entry.mtime_nanos as u32,
|
||||
}),
|
||||
},
|
||||
)),
|
||||
},
|
||||
)?;
|
||||
|
||||
session.peer.send(
|
||||
session.connection_id,
|
||||
CreateBufferForPeer {
|
||||
project_id: project_id.to_proto(),
|
||||
peer_id: None,
|
||||
variant: Some(create_buffer_for_peer::Variant::Chunk(proto::BufferChunk {
|
||||
buffer_id: entry.inode as u64,
|
||||
operations: vec![],
|
||||
is_last: true,
|
||||
})),
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ pub mod auth;
|
|||
pub mod db;
|
||||
pub mod env;
|
||||
pub mod executor;
|
||||
pub mod hosted;
|
||||
pub mod rpc;
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -5,10 +5,11 @@ use crate::{
|
|||
db::{
|
||||
self, BufferId, ChannelId, ChannelRole, ChannelsForUser, CreatedChannelMessage, Database,
|
||||
InviteMemberResult, MembershipUpdated, MessageId, NotificationId, Project, ProjectId,
|
||||
RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId, ServerId, User,
|
||||
UserId,
|
||||
ProjectLocation, RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId,
|
||||
ServerId, User, UserId,
|
||||
},
|
||||
executor::Executor,
|
||||
hosted::ProjectRequest,
|
||||
AppState, Error, Result,
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
|
@ -75,17 +76,17 @@ const MESSAGE_COUNT_PER_PAGE: usize = 100;
|
|||
const MAX_MESSAGE_LEN: usize = 1024;
|
||||
const NOTIFICATION_COUNT_PER_PAGE: usize = 50;
|
||||
|
||||
type MessageHandler =
|
||||
pub(crate) type MessageHandler =
|
||||
Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
|
||||
|
||||
struct Response<R> {
|
||||
pub(crate) struct Response<R> {
|
||||
peer: Arc<Peer>,
|
||||
receipt: Receipt<R>,
|
||||
responded: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<R: RequestMessage> Response<R> {
|
||||
fn send(self, payload: R::Response) -> Result<()> {
|
||||
pub(crate) fn send(self, payload: R::Response) -> Result<()> {
|
||||
self.responded.store(true, SeqCst);
|
||||
self.peer.respond(self.receipt, payload)?;
|
||||
Ok(())
|
||||
|
@ -93,18 +94,18 @@ impl<R: RequestMessage> Response<R> {
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Session {
|
||||
user_id: UserId,
|
||||
connection_id: ConnectionId,
|
||||
db: Arc<tokio::sync::Mutex<DbHandle>>,
|
||||
peer: Arc<Peer>,
|
||||
connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
|
||||
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
|
||||
pub(crate) struct Session {
|
||||
pub(crate) user_id: UserId,
|
||||
pub(crate) connection_id: ConnectionId,
|
||||
pub(crate) db: Arc<tokio::sync::Mutex<DbHandle>>,
|
||||
pub(crate) peer: Arc<Peer>,
|
||||
pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
|
||||
pub(crate) live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
|
||||
_executor: Executor,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
|
||||
pub(crate) async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
|
||||
#[cfg(test)]
|
||||
tokio::task::yield_now().await;
|
||||
let guard = self.db.lock().await;
|
||||
|
@ -113,7 +114,7 @@ impl Session {
|
|||
guard
|
||||
}
|
||||
|
||||
async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
|
||||
pub(crate) async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
|
||||
#[cfg(test)]
|
||||
tokio::task::yield_now().await;
|
||||
let guard = self.connection_pool.lock();
|
||||
|
@ -133,7 +134,7 @@ impl fmt::Debug for Session {
|
|||
}
|
||||
}
|
||||
|
||||
struct DbHandle(Arc<Database>);
|
||||
pub(crate) struct DbHandle(Arc<Database>);
|
||||
|
||||
impl Deref for DbHandle {
|
||||
type Target = Database;
|
||||
|
@ -1933,20 +1934,30 @@ async fn forward_read_only_project_request<T>(
|
|||
session: Session,
|
||||
) -> Result<()>
|
||||
where
|
||||
T: EntityMessage + RequestMessage,
|
||||
T: ProjectRequest,
|
||||
{
|
||||
let project_id = ProjectId::from_proto(request.remote_entity_id());
|
||||
let host_connection_id = session
|
||||
let location = session
|
||||
.db()
|
||||
.await
|
||||
.host_for_read_only_project_request(project_id, session.connection_id)
|
||||
.location_for_read_only_project_request(project_id, session.connection_id)
|
||||
.await?;
|
||||
let payload = session
|
||||
.peer
|
||||
.forward_request(session.connection_id, host_connection_id, request)
|
||||
.await?;
|
||||
response.send(payload)?;
|
||||
Ok(())
|
||||
|
||||
match location {
|
||||
ProjectLocation::Remote(connection_id) => {
|
||||
let payload = session
|
||||
.peer
|
||||
.forward_request(session.connection_id, connection_id, request)
|
||||
.await?;
|
||||
response.send(payload)?;
|
||||
Ok(())
|
||||
}
|
||||
ProjectLocation::Hosted(hosted_project_id) => {
|
||||
request
|
||||
.handle_hosted_project_request(hosted_project_id, response, session)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// forward a project request to the host. These requests are disallowed
|
||||
|
|
|
@ -7839,10 +7839,13 @@ impl Project {
|
|||
}
|
||||
|
||||
let buffer_id = BufferId::new(state.id)?;
|
||||
let buffer = cx.new_model(|_| {
|
||||
Buffer::from_proto(this.replica_id(), this.capability(), state, buffer_file)
|
||||
.unwrap()
|
||||
});
|
||||
let buffer = Buffer::from_proto(
|
||||
this.replica_id(),
|
||||
this.capability(),
|
||||
state,
|
||||
buffer_file,
|
||||
)?;
|
||||
let buffer = cx.new_model(|_| buffer);
|
||||
this.incomplete_remote_buffers
|
||||
.insert(buffer_id, Some(buffer));
|
||||
}
|
||||
|
|
|
@ -38,6 +38,10 @@ macro_rules! messages {
|
|||
}
|
||||
}
|
||||
|
||||
fn into_payload(self) -> envelope::Payload {
|
||||
envelope::Payload::$name(self)
|
||||
}
|
||||
|
||||
fn from_envelope(envelope: Envelope) -> Option<Self> {
|
||||
if let Some(envelope::Payload::$name(msg)) = envelope.payload {
|
||||
Some(msg)
|
||||
|
|
|
@ -28,6 +28,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's
|
|||
responding_to: Option<u32>,
|
||||
original_sender_id: Option<PeerId>,
|
||||
) -> Envelope;
|
||||
fn into_payload(self) -> envelope::Payload;
|
||||
fn from_envelope(envelope: Envelope) -> Option<Self>;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue