Merge branch 'main' into auto-deploy-collab

This commit is contained in:
Max Brunsfeld 2022-10-24 11:46:01 -07:00
commit c80395fc18
102 changed files with 4493 additions and 1390 deletions

View file

@ -22,7 +22,7 @@ use time::OffsetDateTime;
use tower::ServiceBuilder;
use tracing::instrument;
pub fn routes(rpc_server: &Arc<rpc::Server>, state: Arc<AppState>) -> Router<Body> {
pub fn routes(rpc_server: Arc<rpc::Server>, state: Arc<AppState>) -> Router<Body> {
Router::new()
.route("/user", get(get_authenticated_user))
.route("/users", get(get_users).post(create_user))
@ -50,7 +50,7 @@ pub fn routes(rpc_server: &Arc<rpc::Server>, state: Arc<AppState>) -> Router<Bod
.layer(
ServiceBuilder::new()
.layer(Extension(state))
.layer(Extension(rpc_server.clone()))
.layer(Extension(rpc_server))
.layer(middleware::from_fn(validate_api_token)),
)
}

View file

@ -30,6 +30,7 @@ use language::{
range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
};
use live_kit_client::MacOSDisplay;
use lsp::{self, FakeLanguageServer};
use parking_lot::Mutex;
use project::{
@ -47,14 +48,14 @@ use std::{
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
Arc,
},
time::Duration,
};
use theme::ThemeRegistry;
use unindent::Unindent as _;
use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
use workspace::{shared_screen::SharedScreen, Item, SplitDirection, ToggleFollow, Workspace};
#[ctor::ctor]
fn init_logger() {
@ -185,6 +186,37 @@ async fn test_basic_calls(
}
);
// User A shares their screen
let display = MacOSDisplay::new();
let events_b = active_call_events(cx_b);
active_call_a
.update(cx_a, |call, cx| {
call.room().unwrap().update(cx, |room, cx| {
room.set_display_sources(vec![display.clone()]);
room.share_screen(cx)
})
})
.await
.unwrap();
deterministic.run_until_parked();
assert_eq!(events_b.borrow().len(), 1);
let event = events_b.borrow().first().unwrap().clone();
if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event {
assert_eq!(participant_id, client_a.peer_id().unwrap());
room_b.read_with(cx_b, |room, _| {
assert_eq!(
room.remote_participants()[&client_a.peer_id().unwrap()]
.tracks
.len(),
1
);
});
} else {
panic!("unexpected event")
}
// User A leaves the room.
active_call_a.update(cx_a, |call, cx| {
call.hang_up(cx).unwrap();
@ -206,12 +238,13 @@ async fn test_basic_calls(
}
);
// User B leaves the room.
active_call_b.update(cx_b, |call, cx| {
call.hang_up(cx).unwrap();
assert!(call.room().is_none());
});
deterministic.run_until_parked();
// User B gets disconnected from the LiveKit server, which causes them
// to automatically leave the room.
server
.test_live_kit_server
.disconnect_client(client_b.peer_id().unwrap().to_string())
.await;
active_call_b.update(cx_b, |call, _| assert!(call.room().is_none()));
assert_eq!(
room_participants(&room_a, cx_a),
RoomParticipants {
@ -405,6 +438,63 @@ async fn test_leaving_room_on_disconnection(
pending: Default::default()
}
);
// Call user B again from client A.
active_call_a
.update(cx_a, |call, cx| {
call.invite(client_b.user_id().unwrap(), None, cx)
})
.await
.unwrap();
let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
// User B receives the call and joins the room.
let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
incoming_call_b.next().await.unwrap().unwrap();
active_call_b
.update(cx_b, |call, cx| call.accept_incoming(cx))
.await
.unwrap();
let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
deterministic.run_until_parked();
assert_eq!(
room_participants(&room_a, cx_a),
RoomParticipants {
remote: vec!["user_b".to_string()],
pending: Default::default()
}
);
assert_eq!(
room_participants(&room_b, cx_b),
RoomParticipants {
remote: vec!["user_a".to_string()],
pending: Default::default()
}
);
// User B gets disconnected from the LiveKit server, which causes it
// to automatically leave the room.
server
.test_live_kit_server
.disconnect_client(client_b.peer_id().unwrap().to_string())
.await;
deterministic.run_until_parked();
active_call_a.update(cx_a, |call, _| assert!(call.room().is_none()));
active_call_b.update(cx_b, |call, _| assert!(call.room().is_none()));
assert_eq!(
room_participants(&room_a, cx_a),
RoomParticipants {
remote: Default::default(),
pending: Default::default()
}
);
assert_eq!(
room_participants(&room_b, cx_b),
RoomParticipants {
remote: Default::default(),
pending: Default::default()
}
);
}
#[gpui::test(iterations = 10)]
@ -954,21 +1044,21 @@ async fn test_active_call_events(
deterministic.run_until_parked();
assert_eq!(mem::take(&mut *events_a.borrow_mut()), vec![]);
assert_eq!(mem::take(&mut *events_b.borrow_mut()), vec![]);
}
fn active_call_events(cx: &mut TestAppContext) -> Rc<RefCell<Vec<room::Event>>> {
let events = Rc::new(RefCell::new(Vec::new()));
let active_call = cx.read(ActiveCall::global);
cx.update({
let events = events.clone();
|cx| {
cx.subscribe(&active_call, move |_, event, _| {
events.borrow_mut().push(event.clone())
})
.detach()
}
});
events
}
fn active_call_events(cx: &mut TestAppContext) -> Rc<RefCell<Vec<room::Event>>> {
let events = Rc::new(RefCell::new(Vec::new()));
let active_call = cx.read(ActiveCall::global);
cx.update({
let events = events.clone();
|cx| {
cx.subscribe(&active_call, move |_, event, _| {
events.borrow_mut().push(event.clone())
})
.detach()
}
});
events
}
#[gpui::test(iterations = 10)]
@ -984,15 +1074,9 @@ async fn test_room_location(
client_a.fs.insert_tree("/a", json!({})).await;
client_b.fs.insert_tree("/b", json!({})).await;
let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
server
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
let active_call_b = cx_b.read(ActiveCall::global);
let a_notified = Rc::new(Cell::new(false));
cx_a.update({
let notified = a_notified.clone();
@ -1002,8 +1086,6 @@ async fn test_room_location(
}
});
let active_call_b = cx_b.read(ActiveCall::global);
let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
let b_notified = Rc::new(Cell::new(false));
cx_b.update({
let b_notified = b_notified.clone();
@ -1013,10 +1095,18 @@ async fn test_room_location(
}
});
room_a
.update(cx_a, |room, cx| room.set_location(Some(&project_a), cx))
let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
server
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
deterministic.run_until_parked();
assert!(a_notified.take());
assert_eq!(
@ -1071,8 +1161,8 @@ async fn test_room_location(
)]
);
room_b
.update(cx_b, |room, cx| room.set_location(Some(&project_b), cx))
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
deterministic.run_until_parked();
@ -1097,8 +1187,8 @@ async fn test_room_location(
)]
);
room_b
.update(cx_b, |room, cx| room.set_location(None, cx))
active_call_b
.update(cx_b, |call, cx| call.set_location(None, cx))
.await
.unwrap();
deterministic.run_until_parked();
@ -4968,7 +5058,11 @@ async fn test_contact_requests(
}
#[gpui::test(iterations = 10)]
async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
async fn test_following(
deterministic: Arc<Deterministic>,
cx_a: &mut TestAppContext,
cx_b: &mut TestAppContext,
) {
cx_a.foreground().forbid_parking();
cx_a.update(editor::init);
cx_b.update(editor::init);
@ -4980,6 +5074,7 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let active_call_b = cx_b.read(ActiveCall::global);
client_a
.fs
@ -4993,11 +5088,20 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
)
.await;
let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let project_id = active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
.unwrap();
let project_b = client_b.build_remote_project(project_id, cx_b).await;
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
// Client A opens some editors.
let workspace_a = client_a.build_workspace(&project_a, cx_a);
@ -5139,7 +5243,7 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
workspace_a.update(cx_a, |workspace, cx| {
workspace.activate_item(&editor_a2, cx)
});
cx_a.foreground().run_until_parked();
deterministic.run_until_parked();
assert_eq!(
workspace_b.read_with(cx_b, |workspace, cx| workspace
.active_item(cx)
@ -5169,9 +5273,62 @@ async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
editor_a1.id()
);
// Client B activates an external window, which causes a new screen-sharing item to be added to the pane.
let display = MacOSDisplay::new();
active_call_b
.update(cx_b, |call, cx| call.set_location(None, cx))
.await
.unwrap();
active_call_b
.update(cx_b, |call, cx| {
call.room().unwrap().update(cx, |room, cx| {
room.set_display_sources(vec![display.clone()]);
room.share_screen(cx)
})
})
.await
.unwrap();
deterministic.run_until_parked();
let shared_screen = workspace_a.read_with(cx_a, |workspace, cx| {
workspace
.active_item(cx)
.unwrap()
.downcast::<SharedScreen>()
.unwrap()
});
// Client B activates Zed again, which causes the previous editor to become focused again.
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
deterministic.run_until_parked();
assert_eq!(
workspace_a.read_with(cx_a, |workspace, cx| workspace
.active_item(cx)
.unwrap()
.id()),
editor_a1.id()
);
// Client B activates an external window again, and the previously-opened screen-sharing item
// gets activated.
active_call_b
.update(cx_b, |call, cx| call.set_location(None, cx))
.await
.unwrap();
deterministic.run_until_parked();
assert_eq!(
workspace_a.read_with(cx_a, |workspace, cx| workspace
.active_item(cx)
.unwrap()
.id()),
shared_screen.id()
);
// Following interrupts when client B disconnects.
client_b.disconnect(&cx_b.to_async()).unwrap();
cx_a.foreground().run_until_parked();
deterministic.run_until_parked();
assert_eq!(
workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
None
@ -5191,6 +5348,7 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let active_call_b = cx_b.read(ActiveCall::global);
// Client A shares a project.
client_a
@ -5206,6 +5364,10 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T
)
.await;
let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let project_id = active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
@ -5213,6 +5375,10 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T
// Client B joins the project.
let project_b = client_b.build_remote_project(project_id, cx_b).await;
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
// Client A opens some editors.
let workspace_a = client_a.build_workspace(&project_a, cx_a);
@ -5360,6 +5526,7 @@ async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
let active_call_a = cx_a.read(ActiveCall::global);
let active_call_b = cx_b.read(ActiveCall::global);
// Client A shares a project.
client_a
@ -5374,11 +5541,20 @@ async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont
)
.await;
let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
active_call_a
.update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
.await
.unwrap();
let project_id = active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
.unwrap();
let project_b = client_b.build_remote_project(project_id, cx_b).await;
active_call_b
.update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
.await
.unwrap();
// Client A opens some editors.
let workspace_a = client_a.build_workspace(&project_a, cx_a);
@ -6138,6 +6314,7 @@ struct TestServer {
connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
forbid_connections: Arc<AtomicBool>,
_test_db: TestDb,
test_live_kit_server: Arc<live_kit_client::TestServer>,
}
impl TestServer {
@ -6145,8 +6322,18 @@ impl TestServer {
foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>,
) -> Self {
static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
let test_db = TestDb::fake(background.clone());
let app_state = Self::build_app_state(&test_db).await;
let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
let live_kit_server = live_kit_client::TestServer::create(
format!("http://livekit.{}.test", live_kit_server_id),
format!("devkey-{}", live_kit_server_id),
format!("secret-{}", live_kit_server_id),
background.clone(),
)
.unwrap();
let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
let peer = Peer::new();
let notifications = mpsc::unbounded();
let server = Server::new(app_state.clone(), Some(notifications.0));
@ -6159,6 +6346,7 @@ impl TestServer {
connection_killers: Default::default(),
forbid_connections: Default::default(),
_test_db: test_db,
test_live_kit_server: live_kit_server,
}
}
@ -6354,9 +6542,13 @@ impl TestServer {
}
}
async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
async fn build_app_state(
test_db: &TestDb,
fake_server: &live_kit_client::TestServer,
) -> Arc<AppState> {
Arc::new(AppState {
db: test_db.db().clone(),
live_kit_client: Some(Arc::new(fake_server.create_api_client())),
config: Default::default(),
})
}
@ -6388,6 +6580,7 @@ impl Deref for TestServer {
impl Drop for TestServer {
fn drop(&mut self) {
self.peer.reset();
self.test_live_kit_server.teardown().unwrap();
}
}

View file

@ -9,6 +9,7 @@ mod db_tests;
#[cfg(test)]
mod integration_tests;
use crate::rpc::ResultExt as _;
use anyhow::anyhow;
use axum::{routing::get, Router};
use collab::{Error, Result};
@ -21,6 +22,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::signal;
use tracing_log::LogTracer;
use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer};
use util::ResultExt;
@ -33,6 +35,9 @@ pub struct Config {
pub database_url: String,
pub api_token: String,
pub invite_link_prefix: String,
pub live_kit_server: Option<String>,
pub live_kit_key: Option<String>,
pub live_kit_secret: Option<String>,
pub rust_log: Option<String>,
pub log_json: Option<bool>,
}
@ -45,9 +50,37 @@ pub struct MigrateConfig {
pub struct AppState {
db: Arc<dyn Db>,
live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
config: Config,
}
impl AppState {
async fn new(config: Config) -> Result<Arc<Self>> {
let db = PostgresDb::new(&config.database_url, 5).await?;
let live_kit_client = if let Some(((server, key), secret)) = config
.live_kit_server
.as_ref()
.zip(config.live_kit_key.as_ref())
.zip(config.live_kit_secret.as_ref())
{
Some(Arc::new(live_kit_server::api::LiveKitClient::new(
server.clone(),
key.clone(),
secret.clone(),
)) as Arc<dyn live_kit_server::api::Client>)
} else {
None
};
let this = Self {
db: Arc::new(db),
live_kit_client,
config,
};
Ok(Arc::new(this))
}
}
#[tokio::main]
async fn main() -> Result<()> {
if let Err(error) = env::load_dotenv() {
@ -83,14 +116,9 @@ async fn main() -> Result<()> {
}
Some("serve") => {
let config = envy::from_env::<Config>().expect("error loading config");
let db = PostgresDb::new(&config.database_url, 5).await?;
init_tracing(&config);
let state = Arc::new(AppState {
db: Arc::new(db),
config,
});
let state = AppState::new(config).await?;
let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port))
.expect("failed to bind TCP listener");
@ -98,12 +126,13 @@ async fn main() -> Result<()> {
rpc_server
.start_recording_project_activity(Duration::from_secs(5 * 60), rpc::RealExecutor);
let app = api::routes(&rpc_server, state.clone())
.merge(rpc::routes(rpc_server))
let app = api::routes(rpc_server.clone(), state.clone())
.merge(rpc::routes(rpc_server.clone()))
.merge(Router::new().route("/", get(handle_root)));
axum::Server::from_tcp(listener)?
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(graceful_shutdown(rpc_server, state))
.await?;
}
_ => {
@ -148,3 +177,52 @@ pub fn init_tracing(config: &Config) -> Option<()> {
None
}
async fn graceful_shutdown(rpc_server: Arc<rpc::Server>, state: Arc<AppState>) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
if let Some(live_kit) = state.live_kit_client.as_ref() {
let deletions = rpc_server
.store()
.await
.rooms()
.values()
.map(|room| {
let name = room.live_kit_room.clone();
async {
live_kit.delete_room(name).await.trace_err();
}
})
.collect::<Vec<_>>();
tracing::info!("deleting all live-kit rooms");
if let Err(_) = tokio::time::timeout(
Duration::from_secs(10),
futures::future::join_all(deletions),
)
.await
{
tracing::error!("timed out waiting for live-kit room deletion");
}
}
}

View file

@ -50,6 +50,7 @@ use std::{
},
time::Duration,
};
pub use store::{Store, Worktree};
use time::OffsetDateTime;
use tokio::{
sync::{Mutex, MutexGuard},
@ -58,8 +59,6 @@ use tokio::{
use tower::ServiceBuilder;
use tracing::{info_span, instrument, Instrument};
pub use store::{Store, Worktree};
lazy_static! {
static ref METRIC_CONNECTIONS: IntGauge =
register_int_gauge!("connections", "number of connections").unwrap();
@ -477,6 +476,7 @@ impl Server {
let mut projects_to_unshare = Vec::new();
let mut contacts_to_update = HashSet::default();
let mut room_left = None;
{
let mut store = self.store().await;
@ -509,23 +509,24 @@ impl Server {
});
}
if let Some(room) = removed_connection.room {
self.room_updated(&room);
room_left = Some(self.room_left(&room, connection_id));
}
contacts_to_update.insert(removed_connection.user_id);
for connection_id in removed_connection.canceled_call_connection_ids {
self.peer
.send(connection_id, proto::CallCanceled {})
.trace_err();
contacts_to_update.extend(store.user_id_for_connection(connection_id).ok());
}
if let Some(room) = removed_connection
.room_id
.and_then(|room_id| store.room(room_id))
{
self.room_updated(room);
}
contacts_to_update.insert(removed_connection.user_id);
};
if let Some(room_left) = room_left {
room_left.await.trace_err();
}
for user_id in contacts_to_update {
self.update_user_contacts(user_id).await.trace_err();
}
@ -607,13 +608,42 @@ impl Server {
response: Response<proto::CreateRoom>,
) -> Result<()> {
let user_id;
let room_id;
let room;
{
let mut store = self.store().await;
user_id = store.user_id_for_connection(request.sender_id)?;
room_id = store.create_room(request.sender_id)?;
room = store.create_room(request.sender_id)?.clone();
}
response.send(proto::CreateRoomResponse { id: room_id })?;
let live_kit_connection_info =
if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
if let Some(_) = live_kit
.create_room(room.live_kit_room.clone())
.await
.trace_err()
{
if let Some(token) = live_kit
.room_token(&room.live_kit_room, &request.sender_id.to_string())
.trace_err()
{
Some(proto::LiveKitConnectionInfo {
server_url: live_kit.url().into(),
token,
})
} else {
None
}
} else {
None
}
} else {
None
};
response.send(proto::CreateRoomResponse {
room: Some(room),
live_kit_connection_info,
})?;
self.update_user_contacts(user_id).await?;
Ok(())
}
@ -634,8 +664,27 @@ impl Server {
.send(recipient_id, proto::CallCanceled {})
.trace_err();
}
let live_kit_connection_info =
if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
if let Some(token) = live_kit
.room_token(&room.live_kit_room, &request.sender_id.to_string())
.trace_err()
{
Some(proto::LiveKitConnectionInfo {
server_url: live_kit.url().into(),
token,
})
} else {
None
}
} else {
None
};
response.send(proto::JoinRoomResponse {
room: Some(room.clone()),
live_kit_connection_info,
})?;
self.room_updated(room);
}
@ -645,6 +694,7 @@ impl Server {
async fn leave_room(self: Arc<Server>, message: TypedEnvelope<proto::LeaveRoom>) -> Result<()> {
let mut contacts_to_update = HashSet::default();
let room_left;
{
let mut store = self.store().await;
let user_id = store.user_id_for_connection(message.sender_id)?;
@ -683,9 +733,8 @@ impl Server {
}
}
if let Some(room) = left_room.room {
self.room_updated(room);
}
self.room_updated(&left_room.room);
room_left = self.room_left(&left_room.room, message.sender_id);
for connection_id in left_room.canceled_call_connection_ids {
self.peer
@ -695,6 +744,7 @@ impl Server {
}
}
room_left.await.trace_err();
for user_id in contacts_to_update {
self.update_user_contacts(user_id).await?;
}
@ -843,6 +893,29 @@ impl Server {
}
}
fn room_left(
&self,
room: &proto::Room,
connection_id: ConnectionId,
) -> impl Future<Output = Result<()>> {
let client = self.app_state.live_kit_client.clone();
let room_name = room.live_kit_room.clone();
let participant_count = room.participants.len();
async move {
if let Some(client) = client {
client
.remove_participant(room_name.clone(), connection_id.to_string())
.await?;
if participant_count == 0 {
client.delete_room(room_name).await?;
}
}
Ok(())
}
}
async fn share_project(
self: Arc<Server>,
request: TypedEnvelope<proto::ShareProject>,

View file

@ -1,9 +1,10 @@
use crate::db::{self, ChannelId, ProjectId, UserId};
use anyhow::{anyhow, Result};
use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
use nanoid::nanoid;
use rpc::{proto, ConnectionId};
use serde::Serialize;
use std::{mem, path::PathBuf, str, time::Duration};
use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration};
use time::OffsetDateTime;
use tracing::instrument;
use util::post_inc;
@ -85,12 +86,12 @@ pub struct Channel {
pub type ReplicaId = u16;
#[derive(Default)]
pub struct RemovedConnectionState {
pub struct RemovedConnectionState<'a> {
pub user_id: UserId,
pub hosted_projects: Vec<Project>,
pub guest_projects: Vec<LeftProject>,
pub contact_ids: HashSet<UserId>,
pub room_id: Option<RoomId>,
pub room: Option<Cow<'a, proto::Room>>,
pub canceled_call_connection_ids: Vec<ConnectionId>,
}
@ -103,7 +104,7 @@ pub struct LeftProject {
}
pub struct LeftRoom<'a> {
pub room: Option<&'a proto::Room>,
pub room: Cow<'a, proto::Room>,
pub unshared_projects: Vec<Project>,
pub left_projects: Vec<LeftProject>,
pub canceled_call_connection_ids: Vec<ConnectionId>,
@ -219,11 +220,11 @@ impl Store {
let left_room = self.leave_room(room_id, connection_id)?;
result.hosted_projects = left_room.unshared_projects;
result.guest_projects = left_room.left_projects;
result.room_id = Some(room_id);
result.room = Some(Cow::Owned(left_room.room.into_owned()));
result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
} else if connected_user.connection_ids.len() == 1 {
self.decline_call(room_id, connection_id)?;
result.room_id = Some(room_id);
let (room, _) = self.decline_call(room_id, connection_id)?;
result.room = Some(Cow::Owned(room.clone()));
}
}
@ -345,7 +346,7 @@ impl Store {
}
}
pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<RoomId> {
pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
let connection = self
.connections
.get_mut(&creator_connection_id)
@ -359,19 +360,23 @@ impl Store {
"can't create a room with an active call"
);
let mut room = proto::Room::default();
room.participants.push(proto::Participant {
user_id: connection.user_id.to_proto(),
peer_id: creator_connection_id.0,
projects: Default::default(),
location: Some(proto::ParticipantLocation {
variant: Some(proto::participant_location::Variant::External(
proto::participant_location::External {},
)),
}),
});
let room_id = post_inc(&mut self.next_room_id);
let room = proto::Room {
id: room_id,
participants: vec![proto::Participant {
user_id: connection.user_id.to_proto(),
peer_id: creator_connection_id.0,
projects: Default::default(),
location: Some(proto::ParticipantLocation {
variant: Some(proto::participant_location::Variant::External(
proto::participant_location::External {},
)),
}),
}],
pending_participant_user_ids: Default::default(),
live_kit_room: nanoid!(30),
};
self.rooms.insert(room_id, room);
connected_user.active_call = Some(Call {
caller_user_id: connection.user_id,
@ -379,7 +384,7 @@ impl Store {
connection_id: Some(creator_connection_id),
initial_project_id: None,
});
Ok(room_id)
Ok(self.rooms.get(&room_id).unwrap())
}
pub fn join_room(
@ -496,12 +501,14 @@ impl Store {
}
});
if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() {
self.rooms.remove(&room_id);
}
let room = if room.participants.is_empty() {
Cow::Owned(self.rooms.remove(&room_id).unwrap())
} else {
Cow::Borrowed(self.rooms.get(&room_id).unwrap())
};
Ok(LeftRoom {
room: self.rooms.get(&room_id),
room,
unshared_projects,
left_projects,
canceled_call_connection_ids,
@ -512,6 +519,10 @@ impl Store {
self.rooms.get(&room_id)
}
pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
&self.rooms
}
pub fn call(
&mut self,
room_id: RoomId,