mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
feat: create collab handlers (#34)
* feat: logged user after sign up/sign in * test: fix test * chore: update sqlx files * chore: add router * chore: parser user uuid from token
This commit is contained in:
parent
0d59211e55
commit
f0f58f98f3
27 changed files with 287 additions and 167 deletions
15
.sqlx/query-4cd579c6421d05807fb8433d14ea312db0977353e34ef04e2bab31e009151bb2.json
generated
Normal file
15
.sqlx/query-4cd579c6421d05807fb8433d14ea312db0977353e34ef04e2bab31e009151bb2.json
generated
Normal file
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO af_user (uuid, email)\n SELECT $1, $2\n WHERE NOT EXISTS (\n SELECT 1 FROM public.af_user WHERE email = $2\n )\n AND NOT EXISTS (\n SELECT 1 FROM public.af_user WHERE uuid = $1\n )\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "4cd579c6421d05807fb8433d14ea312db0977353e34ef04e2bab31e009151bb2"
|
||||
}
|
22
.sqlx/query-5d408d36790ade4da1ceeb68b4a183aa7d9abc27b0ec42c2a3c5af26ad80f128.json
generated
Normal file
22
.sqlx/query-5d408d36790ade4da1ceeb68b4a183aa7d9abc27b0ec42c2a3c5af26ad80f128.json
generated
Normal file
|
@ -0,0 +1,22 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT uid FROM af_user WHERE uuid = $1\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "uid",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "5d408d36790ade4da1ceeb68b4a183aa7d9abc27b0ec42c2a3c5af26ad80f128"
|
||||
}
|
|
@ -1,14 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO af_user (uuid)\n SELECT $1\n WHERE NOT EXISTS (\n SELECT 1 FROM public.af_user WHERE uuid = $1\n )\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "e09ecf50811c2a1f424ce638e11a4de140168e90be85ef9cdc2223ae48d85239"
|
||||
}
|
3
Cargo.lock
generated
3
Cargo.lock
generated
|
@ -789,8 +789,10 @@ checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877"
|
|||
dependencies = [
|
||||
"android-tzdata",
|
||||
"iana-time-zone",
|
||||
"js-sys",
|
||||
"num-traits",
|
||||
"serde",
|
||||
"wasm-bindgen",
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
|
@ -3363,6 +3365,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"collab",
|
||||
"collab-define",
|
||||
"collab-sync-protocol",
|
||||
|
|
|
@ -42,6 +42,6 @@ pkill -f appflowy_cloud || true
|
|||
# To generate the .sqlx files, we need to run the following command
|
||||
# After the .sqlx files are generated, we build in SQLX_OFFLINE=true
|
||||
# where we don't need to connect to the database
|
||||
cargo sqlx database create && cargo sqlx migrate run && cargo sqlx prepare
|
||||
cargo sqlx database create && cargo sqlx migrate run && cargo sqlx prepare --workspace
|
||||
|
||||
cargo run
|
||||
|
|
|
@ -8,9 +8,9 @@ use gotrue::models::{AccessTokenResponse, User};
|
|||
use shared_entity::error::AppError;
|
||||
|
||||
use shared_entity::server_error::ErrorCode;
|
||||
use storage::entities::AfUserProfileView;
|
||||
use storage::entities::AfWorkspace;
|
||||
use storage::entities::AfWorkspaces;
|
||||
use storage::entities::AFUserProfileView;
|
||||
use storage::entities::AFWorkspace;
|
||||
use storage::entities::AFWorkspaces;
|
||||
|
||||
pub struct Client {
|
||||
http_client: reqwest::Client,
|
||||
|
@ -31,30 +31,30 @@ impl Client {
|
|||
self.token.as_ref()
|
||||
}
|
||||
|
||||
pub async fn profile(&self) -> Result<AfUserProfileView, AppError> {
|
||||
pub async fn profile(&self) -> Result<AFUserProfileView, AppError> {
|
||||
let url = format!("{}/api/user/profile", self.base_url);
|
||||
let resp = self
|
||||
.http_client_with_auth(Method::GET, &url)?
|
||||
.send()
|
||||
.await?;
|
||||
let profile = AppResponse::<AfUserProfileView>::from_response(resp)
|
||||
let profile = AppResponse::<AFUserProfileView>::from_response(resp)
|
||||
.await?
|
||||
.into_data()?
|
||||
.ok_or::<AppError>(ErrorCode::MissingPayload.into())?;
|
||||
Ok(profile)
|
||||
}
|
||||
|
||||
pub async fn workspaces(&mut self) -> Result<AfWorkspaces, AppError> {
|
||||
pub async fn workspaces(&mut self) -> Result<AFWorkspaces, AppError> {
|
||||
let url = format!("{}/api/user/workspaces", self.base_url);
|
||||
let resp = self
|
||||
.http_client_with_auth(Method::GET, &url)?
|
||||
.send()
|
||||
.await?;
|
||||
let workspaces = AppResponse::<Vec<AfWorkspace>>::from_response(resp)
|
||||
let workspaces = AppResponse::<Vec<AFWorkspace>>::from_response(resp)
|
||||
.await?
|
||||
.into_data()?
|
||||
.ok_or::<AppError>(ErrorCode::MissingPayload.into())?;
|
||||
Ok(AfWorkspaces(workspaces))
|
||||
Ok(AFWorkspaces(workspaces))
|
||||
}
|
||||
|
||||
pub async fn sign_in_password(&mut self, email: &str, password: &str) -> Result<(), AppError> {
|
||||
|
|
|
@ -27,9 +27,9 @@ pub struct CollabManager<S> {
|
|||
/// Keep track of all collab groups
|
||||
groups: Arc<CollabGroupCache<S>>,
|
||||
/// Keep track of all object ids that a user is subscribed to
|
||||
edit_collab_by_user: Arc<RwLock<HashMap<i64, HashSet<EditCollab>>>>,
|
||||
edit_collab_by_user: Arc<RwLock<HashMap<String, HashSet<EditCollab>>>>,
|
||||
/// Keep track of all client streams
|
||||
client_stream_by_user: Arc<RwLock<HashMap<i64, RealtimeClientStream>>>,
|
||||
client_stream_by_user: Arc<RwLock<HashMap<String, RealtimeClientStream>>>,
|
||||
}
|
||||
|
||||
impl<S> CollabManager<S>
|
||||
|
@ -69,7 +69,7 @@ where
|
|||
self
|
||||
.client_stream_by_user
|
||||
.write()
|
||||
.insert(*new_conn.user.user_id(), stream);
|
||||
.insert(new_conn.user.id().to_string(), stream);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -83,13 +83,10 @@ where
|
|||
type Result = Result<(), RealtimeError>;
|
||||
fn handle(&mut self, msg: Disconnect<U>, _: &mut Context<Self>) -> Self::Result {
|
||||
tracing::trace!("[💭Server]: {} disconnect", msg.user);
|
||||
self
|
||||
.client_stream_by_user
|
||||
.write()
|
||||
.remove(msg.user.user_id());
|
||||
self.client_stream_by_user.write().remove(msg.user.id());
|
||||
|
||||
// Remove the user from all collab groups that the user is subscribed to
|
||||
let edits = self.edit_collab_by_user.write().remove(msg.user.user_id());
|
||||
let edits = self.edit_collab_by_user.write().remove(msg.user.id());
|
||||
if let Some(edits) = edits {
|
||||
if !edits.is_empty() {
|
||||
let groups = self.groups.clone();
|
||||
|
@ -133,11 +130,11 @@ where
|
|||
|
||||
async fn forward_message_to_collab_group<U>(
|
||||
client_msg: &ClientMessage<U>,
|
||||
client_streams: &Arc<RwLock<HashMap<i64, RealtimeClientStream>>>,
|
||||
client_streams: &Arc<RwLock<HashMap<String, RealtimeClientStream>>>,
|
||||
) where
|
||||
U: RealtimeUser,
|
||||
{
|
||||
if let Some(client_stream) = client_streams.read().get(client_msg.user.user_id()) {
|
||||
if let Some(client_stream) = client_streams.read().get(client_msg.user.id()) {
|
||||
tracing::trace!(
|
||||
"[💭Server]: receives: [oid:{}|msg_id:{:?}]",
|
||||
client_msg.content.object_id(),
|
||||
|
@ -158,8 +155,8 @@ async fn forward_message_to_collab_group<U>(
|
|||
async fn subscribe_collab_group_change_if_need<U, S>(
|
||||
client_msg: &ClientMessage<U>,
|
||||
groups: &Arc<CollabGroupCache<S>>,
|
||||
edit_collab_by_user: &Arc<RwLock<HashMap<i64, HashSet<EditCollab>>>>,
|
||||
client_streams: &Arc<RwLock<HashMap<i64, RealtimeClientStream>>>,
|
||||
edit_collab_by_user: &Arc<RwLock<HashMap<String, HashSet<EditCollab>>>>,
|
||||
client_streams: &Arc<RwLock<HashMap<String, RealtimeClientStream>>>,
|
||||
) -> Result<(), RealtimeError>
|
||||
where
|
||||
U: RealtimeUser,
|
||||
|
@ -204,7 +201,7 @@ where
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
match client_streams.write().get_mut(client_msg.user.user_id()) {
|
||||
match client_streams.write().get_mut(client_msg.user.id()) {
|
||||
None => tracing::error!("🔴The client stream is not found"),
|
||||
Some(client_stream) => {
|
||||
if let Some(collab_group) = groups.write().get_mut(object_id) {
|
||||
|
@ -221,7 +218,7 @@ where
|
|||
|
||||
edit_collab_by_user
|
||||
.write()
|
||||
.entry(*client_msg.user.user_id())
|
||||
.entry(client_msg.user.id().to_string())
|
||||
.or_default()
|
||||
.insert(EditCollab {
|
||||
object_id: object_id.to_string(),
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::fmt::{Debug, Display};
|
|||
use std::hash::Hash;
|
||||
|
||||
pub trait RealtimeUser: Clone + Debug + Send + Sync + 'static + Display {
|
||||
fn user_id(&self) -> &i64;
|
||||
fn id(&self) -> &str;
|
||||
}
|
||||
|
||||
#[derive(Debug, Message, Clone)]
|
||||
|
|
|
@ -179,7 +179,7 @@ pub async fn establish_ws_connection(
|
|||
) -> Result<HttpResponse> {
|
||||
tracing::trace!("{:?}", request);
|
||||
let user = TestLoggedUser {
|
||||
user_id: token.as_str().parse().unwrap(),
|
||||
user_id: token.as_str().to_string(),
|
||||
};
|
||||
let client = CollabSession::new(
|
||||
user,
|
||||
|
@ -248,7 +248,7 @@ pub async fn init_state(config: Config) -> State {
|
|||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct TestLoggedUser {
|
||||
pub user_id: i64,
|
||||
pub user_id: String,
|
||||
}
|
||||
|
||||
impl Display for TestLoggedUser {
|
||||
|
@ -258,7 +258,7 @@ impl Display for TestLoggedUser {
|
|||
}
|
||||
|
||||
impl RealtimeUser for TestLoggedUser {
|
||||
fn user_id(&self) -> &i64 {
|
||||
fn id(&self) -> &str {
|
||||
&self.user_id
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ sqlx = { version = "0.7", default-features = false, features = ["postgres", "chr
|
|||
tracing = { version = "0.1.37" }
|
||||
validator = { version = "0.16", features = ["validator_derive", "derive"] }
|
||||
uuid = { version = "1.4.1", features = ["serde", "v4"] }
|
||||
chrono = {version="0.4",features = ["serde"]}
|
||||
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
|
|
@ -54,7 +54,7 @@ pub struct QueryCollabParams {
|
|||
}
|
||||
|
||||
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
|
||||
pub struct AfWorkspace {
|
||||
pub struct AFWorkspace {
|
||||
pub workspace_id: uuid::Uuid,
|
||||
pub database_storage_id: Option<sqlx::types::uuid::Uuid>,
|
||||
pub owner_uid: Option<i64>,
|
||||
|
@ -65,7 +65,7 @@ pub struct AfWorkspace {
|
|||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow, Deserialize, Serialize)]
|
||||
pub struct AfUserProfileView {
|
||||
pub struct AFUserProfileView {
|
||||
pub uid: Option<i64>,
|
||||
pub uuid: Option<uuid::Uuid>,
|
||||
pub email: Option<String>,
|
||||
|
@ -79,10 +79,10 @@ pub struct AfUserProfileView {
|
|||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow, Deserialize, Serialize)]
|
||||
pub struct AfWorkspaces(pub Vec<AfWorkspace>);
|
||||
pub struct AFWorkspaces(pub Vec<AFWorkspace>);
|
||||
|
||||
impl AfWorkspaces {
|
||||
pub fn get_latest(&self, profile: AfUserProfileView) -> Option<AfWorkspace> {
|
||||
impl AFWorkspaces {
|
||||
pub fn get_latest(&self, profile: AFUserProfileView) -> Option<AFWorkspace> {
|
||||
match profile.latest_workspace_id {
|
||||
Some(ws_id) => self.0.iter().find(|ws| ws.workspace_id == ws_id).cloned(),
|
||||
None => None,
|
||||
|
|
|
@ -3,33 +3,51 @@ use sqlx::{
|
|||
PgPool,
|
||||
};
|
||||
|
||||
use crate::entities::{AfUserProfileView, AfWorkspace};
|
||||
use crate::entities::{AFUserProfileView, AFWorkspace};
|
||||
|
||||
pub async fn create_user_if_not_exists(
|
||||
pool: &PgPool,
|
||||
gotrue_uuid: uuid::Uuid,
|
||||
gotrue_uuid: &uuid::Uuid,
|
||||
email: &str,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO af_user (uuid)
|
||||
SELECT $1
|
||||
INSERT INTO af_user (uuid, email)
|
||||
SELECT $1, $2
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM public.af_user WHERE email = $2
|
||||
)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM public.af_user WHERE uuid = $1
|
||||
)
|
||||
"#,
|
||||
gotrue_uuid
|
||||
gotrue_uuid,
|
||||
email
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_user_id(pool: &PgPool, gotrue_uuid: &uuid::Uuid) -> Result<i64, sqlx::Error> {
|
||||
let uid = sqlx::query!(
|
||||
r#"
|
||||
SELECT uid FROM af_user WHERE uuid = $1
|
||||
"#,
|
||||
gotrue_uuid
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await?
|
||||
.uid;
|
||||
Ok(uid)
|
||||
}
|
||||
|
||||
pub async fn select_all_workspaces_owned(
|
||||
pool: &PgPool,
|
||||
owner_uuid: &Uuid,
|
||||
) -> Result<Vec<AfWorkspace>, sqlx::Error> {
|
||||
) -> Result<Vec<AFWorkspace>, sqlx::Error> {
|
||||
sqlx::query_as!(
|
||||
AfWorkspace,
|
||||
AFWorkspace,
|
||||
r#"
|
||||
SELECT * FROM public.af_workspace WHERE owner_uid = (
|
||||
SELECT uid FROM public.af_user WHERE uuid = $1
|
||||
|
@ -44,9 +62,9 @@ pub async fn select_all_workspaces_owned(
|
|||
pub async fn select_user_profile_view_by_uuid(
|
||||
pool: &PgPool,
|
||||
user_uuid: &Uuid,
|
||||
) -> Result<Option<AfUserProfileView>, sqlx::Error> {
|
||||
) -> Result<Option<AFUserProfileView>, sqlx::Error> {
|
||||
sqlx::query_as!(
|
||||
AfUserProfileView,
|
||||
AFUserProfileView,
|
||||
r#"
|
||||
SELECT *
|
||||
FROM public.af_user_profile_view WHERE uuid = $1
|
||||
|
|
|
@ -4,7 +4,7 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
|||
-- user table
|
||||
CREATE TABLE IF NOT EXISTS af_user (
|
||||
uid BIGSERIAL PRIMARY KEY,
|
||||
uuid UUID NOT NULL, -- related to gotrue
|
||||
uuid UUID NOT NULL , -- related to gotrue
|
||||
email TEXT NOT NULL DEFAULT '' UNIQUE, -- not needed when authenticated with gotrue
|
||||
password TEXT NOT NULL DEFAULT '', -- not needed when authenticated with gotrue
|
||||
name TEXT NOT NULL DEFAULT '',
|
||||
|
|
30
src/api/collaborate.rs
Normal file
30
src/api/collaborate.rs
Normal file
|
@ -0,0 +1,30 @@
|
|||
use crate::component::auth::jwt::UserUuid;
|
||||
use crate::state::State;
|
||||
use actix_web::web::Data;
|
||||
use actix_web::{web, HttpResponse, Scope};
|
||||
|
||||
pub fn collab_scope() -> Scope {
|
||||
web::scope("/api/collab").service(
|
||||
web::resource("/")
|
||||
.route(web::post().to(create_collab_handler))
|
||||
.route(web::get().to(retrieve_collab_handler))
|
||||
.route(web::put().to(update_collab_handler))
|
||||
.route(web::delete().to(delete_collab_handler)),
|
||||
)
|
||||
}
|
||||
|
||||
async fn create_collab_handler(_uuid: UserUuid, _state: Data<State>) -> HttpResponse {
|
||||
HttpResponse::Ok().body("create_handler")
|
||||
}
|
||||
|
||||
async fn retrieve_collab_handler() -> HttpResponse {
|
||||
HttpResponse::Ok().body("retrieve_handler")
|
||||
}
|
||||
|
||||
async fn update_collab_handler() -> HttpResponse {
|
||||
HttpResponse::Ok().body("update_handler")
|
||||
}
|
||||
|
||||
async fn delete_collab_handler() -> HttpResponse {
|
||||
HttpResponse::Ok().body("delete_handler")
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
mod collaborate;
|
||||
mod user;
|
||||
mod ws;
|
||||
|
||||
pub use collaborate::collab_scope;
|
||||
pub use user::user_scope;
|
||||
pub use ws::ws_scope;
|
||||
|
|
|
@ -9,7 +9,7 @@ use crate::domain::{UserEmail, UserName, UserPassword};
|
|||
use crate::state::State;
|
||||
use gotrue::models::{AccessTokenResponse, User};
|
||||
use shared_entity::data::{AppResponse, JsonAppResponse};
|
||||
use storage::entities::{AfUserProfileView, AfWorkspaces};
|
||||
use storage::entities::{AFUserProfileView, AFWorkspaces};
|
||||
|
||||
use crate::component::auth::jwt::{Authorization, UserUuid};
|
||||
use actix_web::web::{Data, Json};
|
||||
|
@ -39,16 +39,16 @@ pub fn user_scope() -> Scope {
|
|||
async fn profile_handler(
|
||||
uuid: UserUuid,
|
||||
state: Data<State>,
|
||||
) -> Result<JsonAppResponse<AfUserProfileView>> {
|
||||
let profile = biz::user::user_profile(&state.pg_pool, &uuid.0).await?;
|
||||
) -> Result<JsonAppResponse<AFUserProfileView>> {
|
||||
let profile = biz::user::user_profile(&state.pg_pool, &uuid).await?;
|
||||
Ok(AppResponse::Ok().with_data(profile).into())
|
||||
}
|
||||
|
||||
async fn workspaces_handler(
|
||||
uuid: UserUuid,
|
||||
state: Data<State>,
|
||||
) -> Result<JsonAppResponse<AfWorkspaces>> {
|
||||
let workspaces = biz::user::user_workspaces(&state.pg_pool, &uuid.0).await?;
|
||||
) -> Result<JsonAppResponse<AFWorkspaces>> {
|
||||
let workspaces = biz::user::user_workspaces(&state.pg_pool, &uuid).await?;
|
||||
Ok(AppResponse::Ok().with_data(workspaces).into())
|
||||
}
|
||||
|
||||
|
@ -69,6 +69,7 @@ async fn sign_out_handler(auth: Authorization, state: Data<State>) -> Result<Jso
|
|||
.logout(&auth.token)
|
||||
.await
|
||||
.map_err(InternalServerError::new)?;
|
||||
|
||||
Ok(AppResponse::Ok().into())
|
||||
}
|
||||
|
||||
|
@ -84,6 +85,7 @@ async fn sign_in_password_handler(
|
|||
req.password,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(AppResponse::Ok().with_data(token).into())
|
||||
}
|
||||
|
||||
|
@ -92,12 +94,13 @@ async fn sign_up_handler(
|
|||
state: Data<State>,
|
||||
) -> Result<JsonAppResponse<()>> {
|
||||
biz::user::sign_up(
|
||||
&state.pg_pool,
|
||||
&state.gotrue_client,
|
||||
&req.email,
|
||||
&req.password,
|
||||
&state.pg_pool,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(AppResponse::Ok().into())
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use crate::component::auth::LoggedUser;
|
||||
use crate::state::State;
|
||||
use actix::Addr;
|
||||
use actix_web::web::{Data, Path, Payload};
|
||||
|
@ -8,7 +7,8 @@ use realtime::core::{CollabManager, CollabSession};
|
|||
|
||||
use std::time::Duration;
|
||||
|
||||
use realtime::entities::RealtimeUser;
|
||||
use crate::component::auth::jwt::{authorization_from_token, UserUuid};
|
||||
|
||||
use storage::collab::CollabPostgresDBStorageImpl;
|
||||
|
||||
pub fn ws_scope() -> Scope {
|
||||
|
@ -24,9 +24,10 @@ pub async fn establish_ws_connection(
|
|||
server: Data<Addr<CollabManager<CollabPostgresDBStorageImpl>>>,
|
||||
) -> Result<HttpResponse> {
|
||||
tracing::trace!("{:?}", request);
|
||||
let user = LoggedUser::from_token(&state.config.application.server_key, token.as_str())?;
|
||||
let auth = authorization_from_token(token.as_str(), &state)?;
|
||||
let user_uuid = UserUuid::from_auth(auth)?;
|
||||
let client = CollabSession::new(
|
||||
user,
|
||||
user_uuid,
|
||||
server.get_ref().clone(),
|
||||
Duration::from_secs(state.config.websocket.heartbeat_interval as u64),
|
||||
Duration::from_secs(state.config.websocket.client_timeout as u64),
|
||||
|
@ -39,9 +40,3 @@ pub async fn establish_ws_connection(
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
impl RealtimeUser for LoggedUser {
|
||||
fn user_id(&self) -> &i64 {
|
||||
self.expose_secret()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::api::{user_scope, ws_scope};
|
||||
use crate::api::{collab_scope, user_scope, ws_scope};
|
||||
use crate::component::auth::HEADER_TOKEN;
|
||||
use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, TlsConfig};
|
||||
use crate::middleware::cors::default_cors;
|
||||
|
@ -98,6 +98,7 @@ where
|
|||
.app_data(web::JsonConfig::default().limit(4096))
|
||||
.service(user_scope())
|
||||
.service(ws_scope())
|
||||
.service(collab_scope())
|
||||
.app_data(Data::new(collab_server.clone()))
|
||||
.app_data(Data::new(state.clone()))
|
||||
});
|
||||
|
|
|
@ -8,44 +8,48 @@ use gotrue::{
|
|||
};
|
||||
|
||||
use shared_entity::{error::AppError, server_error};
|
||||
use storage::entities::{AfUserProfileView, AfWorkspaces};
|
||||
use storage::entities::{AFUserProfileView, AFWorkspaces};
|
||||
use validator::validate_email;
|
||||
|
||||
use crate::domain::validate_password;
|
||||
use sqlx::{types::uuid, PgPool};
|
||||
use tracing::instrument;
|
||||
|
||||
#[instrument(level = "info", skip_all, err)]
|
||||
pub async fn sign_up(
|
||||
gotrue_client: &Client,
|
||||
email: &str,
|
||||
password: &str,
|
||||
pg_pool: &PgPool,
|
||||
) -> Result<(), AppError> {
|
||||
validate_email_password(email, password)?;
|
||||
let user = gotrue_client.sign_up(email, password).await??;
|
||||
tracing::info!("user sign up: {:?}", user);
|
||||
if user.confirmed_at.is_some() {
|
||||
let gotrue_uuid = uuid::Uuid::from_str(&user.id)?;
|
||||
storage::workspace::create_user_if_not_exists(pg_pool, &gotrue_uuid, &user.email).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
pub async fn user_workspaces(
|
||||
pg_pool: &PgPool,
|
||||
uuid: &uuid::Uuid,
|
||||
) -> Result<AfWorkspaces, AppError> {
|
||||
) -> Result<AFWorkspaces, AppError> {
|
||||
let workspaces = storage::workspace::select_all_workspaces_owned(pg_pool, uuid).await?;
|
||||
Ok(AfWorkspaces(workspaces))
|
||||
Ok(AFWorkspaces(workspaces))
|
||||
}
|
||||
|
||||
pub async fn user_profile(
|
||||
pg_pool: &PgPool,
|
||||
uuid: &uuid::Uuid,
|
||||
) -> Result<AfUserProfileView, AppError> {
|
||||
) -> Result<AFUserProfileView, AppError> {
|
||||
let profile = storage::workspace::select_user_profile_view_by_uuid(pg_pool, uuid)
|
||||
.await?
|
||||
.ok_or(sqlx::Error::RowNotFound)?;
|
||||
Ok(profile)
|
||||
}
|
||||
|
||||
pub async fn sign_up(
|
||||
pg_pool: &PgPool,
|
||||
gotrue_client: &Client,
|
||||
email: &str,
|
||||
password: &str,
|
||||
) -> Result<(), AppError> {
|
||||
validate_email_password(email, password)?;
|
||||
let user = gotrue_client.sign_up(email, password).await??;
|
||||
if user.confirmed_at.is_some() {
|
||||
storage::workspace::create_user_if_not_exists(pg_pool, uuid::Uuid::from_str(&user.id)?).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all, err)]
|
||||
pub async fn sign_in(
|
||||
pg_pool: &PgPool,
|
||||
gotrue_client: &Client,
|
||||
|
@ -54,8 +58,9 @@ pub async fn sign_in(
|
|||
) -> Result<AccessTokenResponse, AppError> {
|
||||
let grant = Grant::Password(PasswordGrant { email, password });
|
||||
let token = gotrue_client.token(&grant).await??;
|
||||
storage::workspace::create_user_if_not_exists(pg_pool, uuid::Uuid::from_str(&token.user.id)?)
|
||||
.await?;
|
||||
|
||||
let gotrue_uuid = uuid::Uuid::from_str(&token.user.id)?;
|
||||
storage::workspace::create_user_if_not_exists(pg_pool, &gotrue_uuid, &token.user.email).await?;
|
||||
Ok(token)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
use actix_http::Payload;
|
||||
use actix_web::{web::Data, FromRequest, HttpRequest};
|
||||
use jsonwebtoken::{decode, Algorithm, DecodingKey, Validation};
|
||||
use realtime::entities::RealtimeUser;
|
||||
use secrecy::ExposeSecret;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::ops::Deref;
|
||||
|
||||
use crate::state::State;
|
||||
|
||||
|
@ -10,8 +13,46 @@ lazy_static::lazy_static! {
|
|||
pub static ref VALIDATION: Validation = Validation::new(Algorithm::HS256);
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct UserUuid(pub uuid::Uuid);
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct UserUuid {
|
||||
uuid: uuid::Uuid,
|
||||
uuid_str: String,
|
||||
}
|
||||
|
||||
impl UserUuid {
|
||||
fn new(uuid: uuid::Uuid) -> Self {
|
||||
Self {
|
||||
uuid,
|
||||
uuid_str: uuid.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_auth(auth: Authorization) -> Result<Self, actix_web::Error> {
|
||||
let uuid = auth
|
||||
.claims
|
||||
.sub
|
||||
.ok_or(actix_web::error::ErrorUnauthorized(
|
||||
"Invalid Authorization header, missing sub(uuid)",
|
||||
))
|
||||
.map(|sub| {
|
||||
uuid::Uuid::parse_str(&sub).map_err(|e| {
|
||||
actix_web::error::ErrorUnauthorized(format!(
|
||||
"Invalid Authorization header, invalid sub(uuid): {}",
|
||||
e
|
||||
))
|
||||
})
|
||||
})?;
|
||||
Ok(Self::new(uuid?))
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for UserUuid {
|
||||
type Target = uuid::Uuid;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.uuid
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRequest for UserUuid {
|
||||
type Error = actix_web::Error;
|
||||
|
@ -21,20 +62,27 @@ impl FromRequest for UserUuid {
|
|||
fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
|
||||
let auth = get_auth_from_request(req);
|
||||
match auth {
|
||||
Ok(auth) => {
|
||||
let sub = auth.claims.sub.ok_or(actix_web::error::ErrorUnauthorized(
|
||||
"Invalid Authorization header, missing sub(uuid)",
|
||||
));
|
||||
match sub {
|
||||
Ok(sub) => std::future::ready(Ok(UserUuid(uuid::Uuid::parse_str(&sub).unwrap()))),
|
||||
Err(e) => std::future::ready(Err(e)),
|
||||
}
|
||||
Ok(auth) => match UserUuid::from_auth(auth) {
|
||||
Ok(uuid) => std::future::ready(Ok(uuid)),
|
||||
Err(e) => std::future::ready(Err(e)),
|
||||
},
|
||||
Err(e) => std::future::ready(Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for UserUuid {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.uuid_str)
|
||||
}
|
||||
}
|
||||
|
||||
impl RealtimeUser for UserUuid {
|
||||
fn id(&self) -> &str {
|
||||
&self.uuid_str
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Authorization {
|
||||
pub token: String,
|
||||
|
@ -63,38 +111,42 @@ impl FromRequest for Authorization {
|
|||
|
||||
fn get_auth_from_request(req: &HttpRequest) -> Result<Authorization, actix_web::Error> {
|
||||
let state = req.app_data::<Data<State>>().unwrap();
|
||||
let bearer = req.headers().get("Authorization");
|
||||
match bearer {
|
||||
None => Err(actix_web::error::ErrorUnauthorized(
|
||||
"No Authorization header",
|
||||
)),
|
||||
Some(bearer) => {
|
||||
let bearer = bearer.to_str();
|
||||
match bearer {
|
||||
Err(e) => Err(actix_web::error::ErrorUnauthorized(e)),
|
||||
Ok(bearer) => {
|
||||
let pair_opt = bearer.split_once("Bearer "); // Authorization: Bearer <token>
|
||||
match pair_opt {
|
||||
None => Err(actix_web::error::ErrorUnauthorized(
|
||||
"Invalid Authorization header, missing Bearer",
|
||||
)),
|
||||
Some(pair) => {
|
||||
match GoTrueJWTClaims::verify(
|
||||
pair.1,
|
||||
state.config.gotrue.jwt_secret.expose_secret().as_bytes(),
|
||||
) {
|
||||
Err(e) => Err(actix_web::error::ErrorUnauthorized(e)),
|
||||
Ok(t) => Ok(Authorization {
|
||||
token: pair.1.to_string(),
|
||||
claims: t,
|
||||
}),
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
let bearer = req
|
||||
.headers()
|
||||
.get("Authorization")
|
||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("No Authorization header"))?;
|
||||
|
||||
let bearer_str = bearer
|
||||
.to_str()
|
||||
.map_err(actix_web::error::ErrorUnauthorized)?;
|
||||
|
||||
let (_, token) = bearer_str.split_once("Bearer ").ok_or_else(|| {
|
||||
actix_web::error::ErrorUnauthorized("Invalid Authorization header, missing Bearer")
|
||||
})?;
|
||||
|
||||
authorization_from_token(token, state)
|
||||
}
|
||||
|
||||
pub fn authorization_from_token(
|
||||
token: &str,
|
||||
state: &Data<State>,
|
||||
) -> Result<Authorization, actix_web::Error> {
|
||||
let claims = gotrue_jwt_claims_from_token(token, state)?;
|
||||
Ok(Authorization {
|
||||
token: token.to_string(),
|
||||
claims,
|
||||
})
|
||||
}
|
||||
|
||||
fn gotrue_jwt_claims_from_token(
|
||||
token: &str,
|
||||
state: &Data<State>,
|
||||
) -> Result<GoTrueJWTClaims, actix_web::Error> {
|
||||
GoTrueJWTClaims::verify(
|
||||
token,
|
||||
state.config.gotrue.jwt_secret.expose_secret().as_bytes(),
|
||||
)
|
||||
.map_err(actix_web::error::ErrorUnauthorized)
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
|
|
@ -328,18 +328,3 @@ pub fn logged_user_from_request(
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn uid_from_request(
|
||||
request: &HttpRequest,
|
||||
server_key: &Secret<String>,
|
||||
) -> Result<Secret<String>, AuthError> {
|
||||
match request.headers().get(HEADER_TOKEN) {
|
||||
Some(header) => match header.to_str() {
|
||||
Ok(val) => {
|
||||
Token::decode_token(server_key, val).map(|claim| Secret::new(claim.uid.to_string()))
|
||||
},
|
||||
Err(_) => Err(AuthError::Unauthorized),
|
||||
},
|
||||
None => Err(AuthError::Unauthorized),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
mod ws;
|
|
@ -1,13 +0,0 @@
|
|||
use crate::util::{spawn_server, TestUser};
|
||||
use collab_ws::{WSClient, WSClientConfig};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn ws_conn_test() {
|
||||
let server = spawn_server().await;
|
||||
let test_user = TestUser::generate();
|
||||
let token = test_user.register(&server).await;
|
||||
|
||||
let address = format!("{}/{}", server.ws_addr, token);
|
||||
let client = WSClient::new(address, WSClientConfig::default());
|
||||
let _ = client.connect().await;
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
mod constants;
|
||||
pub mod constants;
|
||||
mod sign_in;
|
||||
mod sign_out;
|
||||
mod sign_up;
|
||||
mod update;
|
||||
mod utils;
|
||||
pub mod utils;
|
||||
|
|
1
tests/collab/mod.rs
Normal file
1
tests/collab/mod.rs
Normal file
|
@ -0,0 +1 @@
|
|||
mod storage_test;
|
16
tests/collab/storage_test.rs
Normal file
16
tests/collab/storage_test.rs
Normal file
|
@ -0,0 +1,16 @@
|
|||
use crate::client::constants::LOCALHOST_URL;
|
||||
use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD};
|
||||
use client_api::Client;
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_collab_test() {
|
||||
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
|
||||
c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD)
|
||||
.await
|
||||
.unwrap();
|
||||
let token = c.token().unwrap();
|
||||
assert!(token.user.confirmed_at.is_some());
|
||||
|
||||
let workspaces = c.workspaces().await.unwrap();
|
||||
assert!(!workspaces.0.is_empty());
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
// mod api;
|
||||
mod client;
|
||||
mod gotrue;
|
||||
|
||||
mod collab;
|
||||
|
|
Loading…
Add table
Reference in a new issue