Compare commits
2 commits
main
...
hosted-pro
Author | SHA1 | Date | |
---|---|---|---|
![]() |
7b0db3ba23 | ||
![]() |
e20896ae84 |
8 changed files with 200 additions and 34 deletions
|
@ -1,5 +1,5 @@
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
use rpc::proto;
|
use rpc::{proto, ConnectionId};
|
||||||
use sea_orm::{entity::prelude::*, DbErr};
|
use sea_orm::{entity::prelude::*, DbErr};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
@ -267,3 +267,8 @@ impl Into<i32> for ChannelVisibility {
|
||||||
proto.into()
|
proto.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum ProjectLocation {
|
||||||
|
Remote(ConnectionId),
|
||||||
|
Hosted(HostedProjectId),
|
||||||
|
}
|
||||||
|
|
|
@ -912,12 +912,28 @@ impl Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the host connection for a read-only request to join a shared project.
|
/// Returns the host connection for a read-only request to join a shared project.
|
||||||
pub async fn host_for_read_only_project_request(
|
pub async fn location_for_read_only_project_request(
|
||||||
&self,
|
&self,
|
||||||
project_id: ProjectId,
|
project_id: ProjectId,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
) -> Result<ConnectionId> {
|
) -> Result<ProjectLocation> {
|
||||||
let room_id = self.room_id_for_project(project_id).await?;
|
let project = self
|
||||||
|
.transaction(|tx| async move {
|
||||||
|
Ok(project::Entity::find_by_id(project_id)
|
||||||
|
.one(&*tx)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| anyhow!("project {} not found", project_id))?)
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some(hosted_project_id) = project.hosted_project_id {
|
||||||
|
return Ok(ProjectLocation::Hosted(hosted_project_id));
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(room_id) = project.room_id else {
|
||||||
|
return Err(anyhow!("project not in room"))?;
|
||||||
|
};
|
||||||
|
|
||||||
self.room_transaction(room_id, |tx| async move {
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let current_participant = room_participant::Entity::find()
|
let current_participant = room_participant::Entity::find()
|
||||||
.filter(room_participant::Column::RoomId.eq(room_id))
|
.filter(room_participant::Column::RoomId.eq(room_id))
|
||||||
|
@ -943,7 +959,7 @@ impl Database {
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| anyhow!("failed to read project host"))?;
|
.ok_or_else(|| anyhow!("failed to read project host"))?;
|
||||||
|
|
||||||
Ok(host.connection())
|
Ok(ProjectLocation::Remote(host.connection()))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.map(|guard| guard.into_inner())
|
.map(|guard| guard.into_inner())
|
||||||
|
@ -1097,7 +1113,7 @@ impl Database {
|
||||||
.ok_or_else(|| anyhow!("project {} not found", project_id))?;
|
.ok_or_else(|| anyhow!("project {} not found", project_id))?;
|
||||||
Ok(project
|
Ok(project
|
||||||
.room_id
|
.room_id
|
||||||
.ok_or_else(|| anyhow!("project not in room"))?)
|
.ok_or_else(|| anyhow!("project is not in a room"))?)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
125
crates/collab/src/hosted.rs
Normal file
125
crates/collab/src/hosted.rs
Normal file
|
@ -0,0 +1,125 @@
|
||||||
|
use anyhow::anyhow;
|
||||||
|
use rpc::proto::{
|
||||||
|
self, create_buffer_for_peer, CreateBufferForPeer, EntityMessage, OpenBufferResponse,
|
||||||
|
RequestMessage,
|
||||||
|
};
|
||||||
|
use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter};
|
||||||
|
|
||||||
|
use crate::db::{buffer, worktree_entry, HostedProjectId, ProjectId};
|
||||||
|
use crate::rpc::{Response, Session};
|
||||||
|
use crate::Result;
|
||||||
|
|
||||||
|
pub(crate) trait ProjectRequest: EntityMessage + RequestMessage {
|
||||||
|
async fn handle_hosted_project_request(
|
||||||
|
self,
|
||||||
|
_hosted_project_id: HostedProjectId,
|
||||||
|
_response: Response<Self>,
|
||||||
|
_session: Session,
|
||||||
|
) -> Result<()> {
|
||||||
|
Err(anyhow!("not supported for hosted projects"))?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl ProjectRequest for proto::GetHover {}
|
||||||
|
impl ProjectRequest for proto::GetDefinition {}
|
||||||
|
impl ProjectRequest for proto::GetTypeDefinition {}
|
||||||
|
impl ProjectRequest for proto::GetReferences {}
|
||||||
|
impl ProjectRequest for proto::SearchProject {}
|
||||||
|
impl ProjectRequest for proto::GetDocumentHighlights {}
|
||||||
|
impl ProjectRequest for proto::GetProjectSymbols {}
|
||||||
|
impl ProjectRequest for proto::OpenBufferForSymbol {}
|
||||||
|
impl ProjectRequest for proto::OpenBufferById {}
|
||||||
|
impl ProjectRequest for proto::SynchronizeBuffers {}
|
||||||
|
impl ProjectRequest for proto::InlayHints {}
|
||||||
|
|
||||||
|
impl ProjectRequest for proto::OpenBufferByPath {
|
||||||
|
async fn handle_hosted_project_request(
|
||||||
|
self,
|
||||||
|
_hosted_project_id: HostedProjectId,
|
||||||
|
response: Response<Self>,
|
||||||
|
session: Session,
|
||||||
|
) -> Result<()> {
|
||||||
|
let project_id = ProjectId(self.project_id as i32);
|
||||||
|
let worktree_id = self.worktree_id as i32;
|
||||||
|
let path = self.path.clone();
|
||||||
|
|
||||||
|
let (entry, buffer) = session
|
||||||
|
.db()
|
||||||
|
.await
|
||||||
|
.transaction({
|
||||||
|
let path = &path;
|
||||||
|
|
||||||
|
move |tx| async move {
|
||||||
|
let entry = worktree_entry::Entity::find()
|
||||||
|
.filter(
|
||||||
|
Condition::all()
|
||||||
|
.add(worktree_entry::Column::ProjectId.eq(project_id))
|
||||||
|
.add(worktree_entry::Column::WorktreeId.eq(worktree_id))
|
||||||
|
.add(worktree_entry::Column::Path.eq(path.clone())),
|
||||||
|
)
|
||||||
|
.one(&*tx)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| anyhow!("no such file"))?;
|
||||||
|
|
||||||
|
/// TODO: use a different column here?
|
||||||
|
let buffer = buffer::Entity::find_by_id(BufferId(entry.inode))
|
||||||
|
.one(&*tx)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| anyhow!("no such buffer"))?;
|
||||||
|
|
||||||
|
Ok((entry, buffer))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
response.send(OpenBufferResponse {
|
||||||
|
buffer_id: entry.inode as u64,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
session.peer.send(
|
||||||
|
session.connection_id,
|
||||||
|
CreateBufferForPeer {
|
||||||
|
project_id: project_id.to_proto(),
|
||||||
|
peer_id: None,
|
||||||
|
variant: Some(proto::create_buffer_for_peer::Variant::State(
|
||||||
|
proto::BufferState {
|
||||||
|
id: entry.inode as u64,
|
||||||
|
file: Some(proto::File {
|
||||||
|
worktree_id: entry.worktree_id as u64,
|
||||||
|
entry_id: Some(entry.inode as u64),
|
||||||
|
path,
|
||||||
|
mtime: Some(proto::Timestamp {
|
||||||
|
seconds: entry.mtime_seconds as u64,
|
||||||
|
nanos: entry.mtime_nanos as u32,
|
||||||
|
}),
|
||||||
|
is_deleted: entry.is_deleted,
|
||||||
|
}),
|
||||||
|
base_text: "Hello world".to_string(),
|
||||||
|
diff_base: Some("Hello world".to_string()),
|
||||||
|
line_ending: proto::LineEnding::Unix.into(),
|
||||||
|
saved_version: vec![],
|
||||||
|
saved_version_fingerprint: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
|
||||||
|
saved_mtime: Some(proto::Timestamp {
|
||||||
|
seconds: entry.mtime_seconds as u64,
|
||||||
|
nanos: entry.mtime_nanos as u32,
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
session.peer.send(
|
||||||
|
session.connection_id,
|
||||||
|
CreateBufferForPeer {
|
||||||
|
project_id: project_id.to_proto(),
|
||||||
|
peer_id: None,
|
||||||
|
variant: Some(create_buffer_for_peer::Variant::Chunk(proto::BufferChunk {
|
||||||
|
buffer_id: entry.inode as u64,
|
||||||
|
operations: vec![],
|
||||||
|
is_last: true,
|
||||||
|
})),
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ pub mod auth;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
pub mod env;
|
pub mod env;
|
||||||
pub mod executor;
|
pub mod executor;
|
||||||
|
pub mod hosted;
|
||||||
pub mod rpc;
|
pub mod rpc;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -5,10 +5,11 @@ use crate::{
|
||||||
db::{
|
db::{
|
||||||
self, BufferId, ChannelId, ChannelRole, ChannelsForUser, CreatedChannelMessage, Database,
|
self, BufferId, ChannelId, ChannelRole, ChannelsForUser, CreatedChannelMessage, Database,
|
||||||
InviteMemberResult, MembershipUpdated, MessageId, NotificationId, Project, ProjectId,
|
InviteMemberResult, MembershipUpdated, MessageId, NotificationId, Project, ProjectId,
|
||||||
RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId, ServerId, User,
|
ProjectLocation, RemoveChannelMemberResult, ReplicaId, RespondToChannelInvite, RoomId,
|
||||||
UserId,
|
ServerId, User, UserId,
|
||||||
},
|
},
|
||||||
executor::Executor,
|
executor::Executor,
|
||||||
|
hosted::ProjectRequest,
|
||||||
AppState, Error, Result,
|
AppState, Error, Result,
|
||||||
};
|
};
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
|
@ -75,17 +76,17 @@ const MESSAGE_COUNT_PER_PAGE: usize = 100;
|
||||||
const MAX_MESSAGE_LEN: usize = 1024;
|
const MAX_MESSAGE_LEN: usize = 1024;
|
||||||
const NOTIFICATION_COUNT_PER_PAGE: usize = 50;
|
const NOTIFICATION_COUNT_PER_PAGE: usize = 50;
|
||||||
|
|
||||||
type MessageHandler =
|
pub(crate) type MessageHandler =
|
||||||
Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
|
Box<dyn Send + Sync + Fn(Box<dyn AnyTypedEnvelope>, Session) -> BoxFuture<'static, ()>>;
|
||||||
|
|
||||||
struct Response<R> {
|
pub(crate) struct Response<R> {
|
||||||
peer: Arc<Peer>,
|
peer: Arc<Peer>,
|
||||||
receipt: Receipt<R>,
|
receipt: Receipt<R>,
|
||||||
responded: Arc<AtomicBool>,
|
responded: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: RequestMessage> Response<R> {
|
impl<R: RequestMessage> Response<R> {
|
||||||
fn send(self, payload: R::Response) -> Result<()> {
|
pub(crate) fn send(self, payload: R::Response) -> Result<()> {
|
||||||
self.responded.store(true, SeqCst);
|
self.responded.store(true, SeqCst);
|
||||||
self.peer.respond(self.receipt, payload)?;
|
self.peer.respond(self.receipt, payload)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -93,18 +94,18 @@ impl<R: RequestMessage> Response<R> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct Session {
|
pub(crate) struct Session {
|
||||||
user_id: UserId,
|
pub(crate) user_id: UserId,
|
||||||
connection_id: ConnectionId,
|
pub(crate) connection_id: ConnectionId,
|
||||||
db: Arc<tokio::sync::Mutex<DbHandle>>,
|
pub(crate) db: Arc<tokio::sync::Mutex<DbHandle>>,
|
||||||
peer: Arc<Peer>,
|
pub(crate) peer: Arc<Peer>,
|
||||||
connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
|
pub(crate) connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
|
||||||
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
|
pub(crate) live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
|
||||||
_executor: Executor,
|
_executor: Executor,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
|
pub(crate) async fn db(&self) -> tokio::sync::MutexGuard<DbHandle> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
let guard = self.db.lock().await;
|
let guard = self.db.lock().await;
|
||||||
|
@ -113,7 +114,7 @@ impl Session {
|
||||||
guard
|
guard
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
|
pub(crate) async fn connection_pool(&self) -> ConnectionPoolGuard<'_> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
let guard = self.connection_pool.lock();
|
let guard = self.connection_pool.lock();
|
||||||
|
@ -133,7 +134,7 @@ impl fmt::Debug for Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DbHandle(Arc<Database>);
|
pub(crate) struct DbHandle(Arc<Database>);
|
||||||
|
|
||||||
impl Deref for DbHandle {
|
impl Deref for DbHandle {
|
||||||
type Target = Database;
|
type Target = Database;
|
||||||
|
@ -1933,20 +1934,30 @@ async fn forward_read_only_project_request<T>(
|
||||||
session: Session,
|
session: Session,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
T: EntityMessage + RequestMessage,
|
T: ProjectRequest,
|
||||||
{
|
{
|
||||||
let project_id = ProjectId::from_proto(request.remote_entity_id());
|
let project_id = ProjectId::from_proto(request.remote_entity_id());
|
||||||
let host_connection_id = session
|
let location = session
|
||||||
.db()
|
.db()
|
||||||
.await
|
.await
|
||||||
.host_for_read_only_project_request(project_id, session.connection_id)
|
.location_for_read_only_project_request(project_id, session.connection_id)
|
||||||
.await?;
|
.await?;
|
||||||
let payload = session
|
|
||||||
.peer
|
match location {
|
||||||
.forward_request(session.connection_id, host_connection_id, request)
|
ProjectLocation::Remote(connection_id) => {
|
||||||
.await?;
|
let payload = session
|
||||||
response.send(payload)?;
|
.peer
|
||||||
Ok(())
|
.forward_request(session.connection_id, connection_id, request)
|
||||||
|
.await?;
|
||||||
|
response.send(payload)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
ProjectLocation::Hosted(hosted_project_id) => {
|
||||||
|
request
|
||||||
|
.handle_hosted_project_request(hosted_project_id, response, session)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// forward a project request to the host. These requests are disallowed
|
/// forward a project request to the host. These requests are disallowed
|
||||||
|
|
|
@ -7839,10 +7839,13 @@ impl Project {
|
||||||
}
|
}
|
||||||
|
|
||||||
let buffer_id = BufferId::new(state.id)?;
|
let buffer_id = BufferId::new(state.id)?;
|
||||||
let buffer = cx.new_model(|_| {
|
let buffer = Buffer::from_proto(
|
||||||
Buffer::from_proto(this.replica_id(), this.capability(), state, buffer_file)
|
this.replica_id(),
|
||||||
.unwrap()
|
this.capability(),
|
||||||
});
|
state,
|
||||||
|
buffer_file,
|
||||||
|
)?;
|
||||||
|
let buffer = cx.new_model(|_| buffer);
|
||||||
this.incomplete_remote_buffers
|
this.incomplete_remote_buffers
|
||||||
.insert(buffer_id, Some(buffer));
|
.insert(buffer_id, Some(buffer));
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,10 @@ macro_rules! messages {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn into_payload(self) -> envelope::Payload {
|
||||||
|
envelope::Payload::$name(self)
|
||||||
|
}
|
||||||
|
|
||||||
fn from_envelope(envelope: Envelope) -> Option<Self> {
|
fn from_envelope(envelope: Envelope) -> Option<Self> {
|
||||||
if let Some(envelope::Payload::$name(msg)) = envelope.payload {
|
if let Some(envelope::Payload::$name(msg)) = envelope.payload {
|
||||||
Some(msg)
|
Some(msg)
|
||||||
|
|
|
@ -28,6 +28,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's
|
||||||
responding_to: Option<u32>,
|
responding_to: Option<u32>,
|
||||||
original_sender_id: Option<PeerId>,
|
original_sender_id: Option<PeerId>,
|
||||||
) -> Envelope;
|
) -> Envelope;
|
||||||
|
fn into_payload(self) -> envelope::Payload;
|
||||||
fn from_envelope(envelope: Envelope) -> Option<Self>;
|
fn from_envelope(envelope: Envelope) -> Option<Self>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue