feat: cache workspace member & fix some bugs (#127)

* chore: insert collab member when inserting workspace member

* refactor: test directory

* chore: remove triggers

* test: add more test

* chore: cache workspace memeber role

* chore: update test
This commit is contained in:
Nathan.fooo 2023-10-20 19:43:36 +08:00 committed by GitHub
parent cb9cdb9280
commit 417d9f1d41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1022 additions and 334 deletions

View file

@ -1,16 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_user (uuid, email, name)\n VALUES ($1, $2, $3)\n ON CONFLICT (email) DO NOTHING;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "171f55ffb42bc6ae115ebf29f4cee7419690d3b13f1e27b93f03c399dd2838e6"
}

View file

@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH ins_user AS (\n INSERT INTO af_user (uuid, email, name)\n VALUES ($1, $2, $3) \n ON CONFLICT(email) DO NOTHING\n RETURNING uid\n ),\n owner_role AS (\n SELECT id FROM af_roles WHERE name = 'Owner'\n ),\n ins_workspace AS (\n INSERT INTO af_workspace (owner_uid)\n SELECT uid FROM ins_user\n RETURNING workspace_id, owner_uid\n ),\n ins_collab_member AS (\n INSERT INTO af_collab_member (uid, oid, permission_id)\n SELECT ins_workspace.owner_uid,\n ins_workspace.workspace_id::TEXT, \n (SELECT permission_id FROM af_role_permissions WHERE role_id = owner_role.id)\n FROM ins_workspace, owner_role\n ),\n ins_workspace_member AS (\n INSERT INTO af_workspace_member (uid, role_id, workspace_id)\n SELECT ins_workspace.owner_uid, owner_role.id, ins_workspace.workspace_id\n FROM ins_workspace, owner_role\n )\n SELECT COUNT(*) FROM ins_user;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "count",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text"
]
},
"nullable": [
null
]
},
"hash": "4ffd4dbcc6b90d4cbfa11983b38a5f2a7476a7fcb65aacd63b49793f7cd4521f"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT id\n FROM af_permissions\n WHERE access_level = $1\n ",
"query": "\n SELECT id\n FROM af_permissions\n WHERE access_level = $1\n ",
"describe": {
"columns": [
{
@ -18,5 +18,5 @@
false
]
},
"hash": "54346ddd488f0c8fde7125f17827bcc89ecbf285c4e2270b38bb4d7082b40db2"
"hash": "5752d8de85c68896b5d85d52941c252bedb826c473bf94d9f3945c1b15327acb"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT af_user.email,\n af_workspace_member.role_id AS role\n FROM public.af_workspace_member\n JOIN public.af_user ON af_workspace_member.uid = af_user.uid\n WHERE af_workspace_member.workspace_id = $1\n ORDER BY af_workspace_member.created_at ASC;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "email",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "role",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false
]
},
"hash": "5bfc7f18372997516648f96bc027b777bad8f48b47b4d71eafcc6e244327ee1a"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE af_workspace_member\n SET \n role_id = COALESCE($1, role_id)\n WHERE workspace_id = $2 AND uid = (\n SELECT uid FROM af_user WHERE email = $3\n )\n ",
"query": "\n UPDATE af_workspace_member\n SET \n role_id = $1 \n WHERE workspace_id = $2 AND uid = (\n SELECT uid FROM af_user WHERE email = $3\n )\n ",
"describe": {
"columns": [],
"parameters": {
@ -12,5 +12,5 @@
},
"nullable": []
},
"hash": "4162ec00fad0abe726492a5b916205eec1f004bcf71864dd59c84fad6f3b7e98"
"hash": "8069f4f5e77baf980972726cfed7322cf743927c68bcfd364dcf054f8b940c92"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT uid FROM af_user WHERE email = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "uid",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "b5024138772e13557df973c1c021daf74aab97b5874d7366c478c18ae2e89e58"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT af_user.email, af_workspace_member.role_id AS role\n FROM public.af_workspace_member\n JOIN public.af_user ON af_workspace_member.uid = af_user.uid\n WHERE af_workspace_member.workspace_id = $1\n ORDER BY af_workspace_member.created_at ASC;\n ",
"query": "\n SELECT af_user.email, af_workspace_member.role_id AS role\n FROM public.af_workspace_member\n JOIN public.af_user ON af_workspace_member.uid = af_user.uid\n WHERE af_workspace_member.workspace_id = $1 \n AND af_workspace_member.uid = $2 \n ",
"describe": {
"columns": [
{
@ -16,7 +16,8 @@
],
"parameters": {
"Left": [
"Uuid"
"Uuid",
"Int8"
]
},
"nullable": [
@ -24,5 +25,5 @@
false
]
},
"hash": "7d2f1f6aebc941414e9213a782a23e5c988ccabe2e71bf0e3ae741dda4c753e4"
"hash": "dcc1051df3f343a277e47e6f0b23fef5e15c386724b453a513a9ddaae321abf5"
}

View file

@ -9,6 +9,7 @@ use database_entity::{
use sqlx::postgres::PgRow;
use sqlx::{Error, PgPool, Row, Transaction};
use std::collections::HashMap;
use std::fmt::Debug;
use std::{ops::DerefMut, str::FromStr};
use tracing::{error, event, instrument};
use uuid::Uuid;
@ -49,7 +50,7 @@ pub async fn collab_exists(pg_pool: &PgPool, oid: &str) -> Result<bool, sqlx::Er
///
#[instrument(level = "trace", skip(tx, params), fields(oid=%params.object_id), err)]
pub async fn insert_af_collab(
pub async fn insert_into_af_collab(
tx: &mut Transaction<'_, sqlx::Postgres>,
uid: &i64,
params: &InsertCollabParams,
@ -67,12 +68,6 @@ pub async fn insert_af_collab(
match existing_workspace_id {
Some(existing_workspace_id) => {
if existing_workspace_id == workspace_id {
event!(
tracing::Level::TRACE,
"Update collab row:{}",
params.object_id
);
sqlx::query!(
"UPDATE af_collab \
SET blob = $2, len = $3, partition_key = $4, encrypt = $5, owner_uid = $6 WHERE oid = $1",
@ -89,6 +84,11 @@ pub async fn insert_af_collab(
"user:{} update af_collab:{} failed",
uid, params.object_id
))?;
event!(
tracing::Level::TRACE,
"did update collab row:{}",
params.object_id
);
} else {
return Err(DatabaseError::Internal(anyhow::anyhow!(
"Inserting a row with an existing object_id but different workspace_id"
@ -108,14 +108,6 @@ pub async fn insert_af_collab(
.fetch_one(tx.deref_mut())
.await?;
event!(
tracing::Level::TRACE,
"Insert new collab row: {}:{}:{}",
uid,
params.object_id,
params.workspace_id
);
sqlx::query!(
r#"
INSERT INTO af_collab_member (uid, oid, permission_id)
@ -152,13 +144,21 @@ pub async fn insert_af_collab(
"Insert new af_collab failed: {}:{}:{}",
uid, params.object_id, params.collab_type
))?;
event!(
tracing::Level::TRACE,
"did insert new collab row: {}:{}:{}",
uid,
params.object_id,
params.workspace_id
);
},
}
Ok(())
}
pub async fn select_collab_blob(
pub async fn select_blob_from_af_collab(
pg_pool: &PgPool,
collab_type: &CollabType,
object_id: &str,
@ -311,25 +311,21 @@ pub async fn get_all_snapshots(
Ok(AFCollabSnapshots(snapshots))
}
#[instrument(skip(pg_pool), err)]
pub async fn insert_collab_member(
#[instrument(skip(txn), err)]
pub async fn upsert_collab_member_with_txn<T: AsRef<str> + Debug>(
uid: i64,
oid: &str,
oid: T,
access_level: &AFAccessLevel,
pg_pool: &PgPool,
txn: &mut Transaction<'_, sqlx::Postgres>,
) -> Result<(), DatabaseError> {
let oid = oid.as_ref();
let access_level: i32 = access_level.clone().into();
let mut txn = pg_pool
.begin()
.await
.context("failed to acquire a transaction to insert collab member")?;
let permission_id = sqlx::query_scalar!(
r#"
SELECT id
FROM af_permissions
WHERE access_level = $1
"#,
SELECT id
FROM af_permissions
WHERE access_level = $1
"#,
access_level
)
.fetch_one(txn.deref_mut())
@ -355,11 +351,27 @@ pub async fn insert_collab_member(
uid, oid
))?;
Ok(())
}
#[instrument(skip(pg_pool), err)]
pub async fn insert_collab_member(
uid: i64,
oid: &str,
access_level: &AFAccessLevel,
pg_pool: &PgPool,
) -> Result<(), DatabaseError> {
let mut txn = pg_pool
.begin()
.await
.context("failed to acquire a transaction to insert collab member")?;
upsert_collab_member_with_txn(uid, oid, access_level, &mut txn).await?;
txn
.commit()
.await
.context("failed to commit the transaction to insert collab member")?;
Ok(())
}

View file

@ -216,7 +216,7 @@ impl CollabStorage for CollabStoragePgImpl {
.begin()
.await
.context("Failed to acquire a Postgres transaction to insert collab")?;
collab_db_ops::insert_af_collab(&mut transaction, uid, &params).await?;
collab_db_ops::insert_into_af_collab(&mut transaction, uid, &params).await?;
transaction
.commit()
.await
@ -225,8 +225,12 @@ impl CollabStorage for CollabStoragePgImpl {
}
async fn get_collab(&self, _uid: &i64, params: QueryCollabParams) -> DatabaseResult<RawData> {
match collab_db_ops::select_collab_blob(&self.pg_pool, &params.collab_type, &params.object_id)
.await
match collab_db_ops::select_blob_from_af_collab(
&self.pg_pool,
&params.collab_type,
&params.object_id,
)
.await
{
Ok(data) => {
debug_assert!(!data.is_empty());

View file

@ -23,6 +23,15 @@ pub async fn update_user_name(
Ok(())
}
/// Attempts to create a new user in the database if they do not already exist.
///
/// This function will:
/// - Insert a new user record into the `af_user` table if the email is unique.
/// - If the user is newly created, it will also:
/// - Create a new workspace for the user in the `af_workspace` table.
/// - Assign the user a role in the `af_workspace_member` table.
/// - Add the user to the `af_collab_member` table with the appropriate permissions.
///
#[instrument(skip_all, err)]
pub async fn create_user_if_not_exists(
pool: &PgPool,
@ -32,23 +41,45 @@ pub async fn create_user_if_not_exists(
) -> Result<bool, DatabaseError> {
let affected_rows = sqlx::query!(
r#"
INSERT INTO af_user (uuid, email, name)
VALUES ($1, $2, $3)
ON CONFLICT (email) DO NOTHING;
"#,
WITH ins_user AS (
INSERT INTO af_user (uuid, email, name)
VALUES ($1, $2, $3)
ON CONFLICT(email) DO NOTHING
RETURNING uid
),
owner_role AS (
SELECT id FROM af_roles WHERE name = 'Owner'
),
ins_workspace AS (
INSERT INTO af_workspace (owner_uid)
SELECT uid FROM ins_user
RETURNING workspace_id, owner_uid
),
ins_collab_member AS (
INSERT INTO af_collab_member (uid, oid, permission_id)
SELECT ins_workspace.owner_uid,
ins_workspace.workspace_id::TEXT,
(SELECT permission_id FROM af_role_permissions WHERE role_id = owner_role.id)
FROM ins_workspace, owner_role
),
ins_workspace_member AS (
INSERT INTO af_workspace_member (uid, role_id, workspace_id)
SELECT ins_workspace.owner_uid, owner_role.id, ins_workspace.workspace_id
FROM ins_workspace, owner_role
)
SELECT COUNT(*) FROM ins_user;
"#,
user_uuid,
email,
name
)
.execute(pool)
.fetch_one(pool)
.await
.context(format!(
"Fail to insert user with uuid: {}, name: {}, email: {}",
user_uuid, name, email
))?
.rows_affected();
Ok(affected_rows > 0)
))?;
Ok(affected_rows.count.unwrap_or(0) > 0)
}
pub async fn select_uid_from_uuid<'a, E: Executor<'a, Database = Postgres>>(
@ -66,3 +97,18 @@ pub async fn select_uid_from_uuid<'a, E: Executor<'a, Database = Postgres>>(
.uid;
Ok(uid)
}
pub async fn select_uid_from_email<'a, E: Executor<'a, Database = Postgres>>(
executor: E,
email: &str,
) -> Result<i64, DatabaseError> {
let uid = sqlx::query!(
r#"
SELECT uid FROM af_user WHERE email = $1
"#,
email
)
.fetch_one(executor)
.await?
.uid;
Ok(uid)
}

View file

@ -3,7 +3,9 @@ use sqlx::{
Executor, PgPool, Postgres, Transaction,
};
use std::ops::DerefMut;
use tracing::{event, instrument};
use crate::user::select_uid_from_email;
use database_entity::database_error::DatabaseError;
use database_entity::{AFRole, AFUserProfileView, AFWorkspace, AFWorkspaceMember};
@ -126,10 +128,10 @@ pub async fn select_user_can_edit_collab(
Ok(permission_check.unwrap_or(false))
}
pub async fn insert_workspace_member(
pub async fn insert_workspace_member_with_txn(
txn: &mut Transaction<'_, sqlx::Postgres>,
workspace_id: &uuid::Uuid,
member_email: String,
member_email: &str,
role: AFRole,
) -> Result<(), DatabaseError> {
let role_id: i32 = role.into();
@ -153,6 +155,8 @@ pub async fn insert_workspace_member(
Ok(())
}
#[inline]
#[instrument(level = "trace", skip(pool, email, role), err)]
pub async fn upsert_workspace_member(
pool: &PgPool,
workspace_id: &Uuid,
@ -163,12 +167,20 @@ pub async fn upsert_workspace_member(
return Ok(());
}
let role_id: Option<i32> = role.map(|role| role.into());
event!(
tracing::Level::TRACE,
"update workspace member: workspace_id:{}, uid {:?}, role:{:?}",
workspace_id,
select_uid_from_email(pool, email).await,
role
);
let role_id: i32 = role.unwrap().into();
sqlx::query!(
r#"
UPDATE af_workspace_member
SET
role_id = COALESCE($1, role_id)
role_id = $1
WHERE workspace_id = $2 AND uid = (
SELECT uid FROM af_user WHERE email = $3
)
@ -187,7 +199,7 @@ pub async fn delete_workspace_members(
_user_uuid: &Uuid,
txn: &mut Transaction<'_, sqlx::Postgres>,
workspace_id: &Uuid,
member_email: String,
member_email: &str,
) -> Result<(), DatabaseError> {
let is_owner = sqlx::query_scalar!(
r#"
@ -237,16 +249,17 @@ pub async fn delete_workspace_members(
}
/// returns a list of workspace members, sorted by their creation time.
pub async fn select_workspace_members(
pub async fn select_workspace_member_list(
pg_pool: &PgPool,
workspace_id: &uuid::Uuid,
) -> Result<Vec<AFWorkspaceMember>, DatabaseError> {
let members = sqlx::query_as!(
AFWorkspaceMember,
r#"
SELECT af_user.email, af_workspace_member.role_id AS role
SELECT af_user.email,
af_workspace_member.role_id AS role
FROM public.af_workspace_member
JOIN public.af_user ON af_workspace_member.uid = af_user.uid
JOIN public.af_user ON af_workspace_member.uid = af_user.uid
WHERE af_workspace_member.workspace_id = $1
ORDER BY af_workspace_member.created_at ASC;
"#,
@ -257,6 +270,28 @@ pub async fn select_workspace_members(
Ok(members)
}
pub async fn select_workspace_member(
pg_pool: &PgPool,
uid: &i64,
workspace_id: &Uuid,
) -> Result<AFWorkspaceMember, DatabaseError> {
let member = sqlx::query_as!(
AFWorkspaceMember,
r#"
SELECT af_user.email, af_workspace_member.role_id AS role
FROM public.af_workspace_member
JOIN public.af_user ON af_workspace_member.uid = af_user.uid
WHERE af_workspace_member.workspace_id = $1
AND af_workspace_member.uid = $2
"#,
workspace_id,
uid,
)
.fetch_one(pg_pool)
.await?;
Ok(member)
}
pub async fn select_user_profile_view_by_uuid(
pool: &PgPool,
user_uuid: &Uuid,

View file

@ -16,7 +16,7 @@ use actix_web_actors::ws::ProtocolError;
use database::collab::CollabStorage;
use realtime_entity::collab_msg::CollabMessage;
use std::time::{Duration, Instant};
use tracing::error;
use tracing::{error, warn};
pub struct ClientSession<
U: Unpin + RealtimeUser,
@ -67,21 +67,27 @@ where
fn forward_binary(&self, bytes: Bytes) -> Result<(), RealtimeError> {
tracing::debug!("Receive binary message with len: {}", bytes.len());
let message = RealtimeMessage::from_vec(bytes.to_vec())?;
match CollabMessage::from_vec(&message.payload) {
Ok(collab_msg) => {
self.server.do_send(ClientMessage {
business_id: message.business_id,
user: self.user.clone(),
content: collab_msg,
});
match RealtimeMessage::from_vec(bytes.to_vec()) {
Ok(message) => {
match CollabMessage::from_vec(&message.payload) {
Ok(collab_msg) => {
self.server.do_send(ClientMessage {
business_id: message.business_id,
user: self.user.clone(),
content: collab_msg,
});
},
Err(e) => {
warn!("Parser realtime payload failed: {:?}", e);
},
}
Ok(())
},
Err(e) => {
tracing::warn!("Parser realtime payload failed: {:?}", e);
Err(err) => {
error!("Parse realtime message failed: {:?}", err);
Ok(())
},
}
Ok(())
}
}

View file

@ -16,17 +16,6 @@ CREATE POLICY af_workspace_policy ON af_workspace FOR ALL TO anon,
authenticated USING (true);
ALTER TABLE af_workspace FORCE ROW LEVEL SECURITY;
-- This trigger is fired after an insert operation on the af_user table. It automatically creates a workspace
-- in the af_workspace table with the uid of the new user profile as the owner_uid
CREATE OR REPLACE FUNCTION create_af_workspace_func() RETURNS TRIGGER AS $$BEGIN
INSERT INTO af_workspace (owner_uid)
VALUES (NEW.uid);
RETURN NEW;
END $$LANGUAGE plpgsql;
CREATE TRIGGER create_af_workspace_trigger
AFTER
INSERT ON af_user FOR EACH ROW EXECUTE FUNCTION create_af_workspace_func();
-- af_workspace_member contains all the members associated with a workspace and their roles.
CREATE TABLE IF NOT EXISTS af_workspace_member (
uid BIGINT NOT NULL,
@ -72,27 +61,6 @@ CREATE TRIGGER af_workspace_member_change_trigger
-- Index
CREATE UNIQUE INDEX idx_af_workspace_member ON af_workspace_member (uid, workspace_id, role_id);
-- This trigger is fired after an insert operation on the af_workspace table. It automatically creates a workspace
-- member in the af_workspace_member table. If the user is the owner of the workspace, they are given the role 'Owner'.
CREATE OR REPLACE FUNCTION manage_af_workspace_member_role_func() RETURNS TRIGGER AS $$ BEGIN
INSERT INTO af_workspace_member (uid, role_id, workspace_id)
VALUES (
NEW.owner_uid,
(
SELECT id
FROM af_roles
WHERE name = 'Owner'
),
NEW.workspace_id
) ON CONFLICT (uid, workspace_id) DO
UPDATE
SET role_id = EXCLUDED.role_id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER manage_af_workspace_member_role_trigger
AFTER
INSERT ON af_workspace FOR EACH ROW EXECUTE FUNCTION manage_af_workspace_member_role_func();
-- Insert a workspace member if the user with given uid is the owner of the workspace
CREATE OR REPLACE FUNCTION insert_af_workspace_member_if_owner(
p_uid BIGINT,

View file

@ -7,7 +7,7 @@ use actix_web::web::{Data, Json};
use actix_web::Result;
use actix_web::{web, Scope};
use database::collab::CollabStorage;
use database::user::select_uid_from_uuid;
use database::user::{select_uid_from_email, select_uid_from_uuid};
use database_entity::database_error::DatabaseError;
use database_entity::*;
use shared_entity::app_error::AppError;
@ -68,6 +68,7 @@ async fn list_handler(
#[instrument(skip(payload, state), err)]
async fn add_workspace_members_handler(
required_id: RequestId,
user_uuid: UserUuid,
workspace_id: web::Path<Uuid>,
payload: Json<CreateWorkspaceMembers>,
@ -78,9 +79,19 @@ async fn add_workspace_members_handler(
&state.pg_pool,
&user_uuid,
&workspace_id,
create_members.0,
&create_members.0,
)
.await?;
for member in create_members.0 {
let uid = select_uid_from_email(&state.pg_pool, &member.email)
.await
.map_err(AppError::from)?;
state
.workspace_access_control
.update_member(&uid, &workspace_id, member.role)
.await;
}
Ok(AppResponse::Ok().into())
}
@ -107,14 +118,25 @@ async fn remove_workspace_member_handler(
.0
.into_iter()
.map(|member| member.0)
.collect();
.collect::<Vec<String>>();
workspace::ops::remove_workspace_members(
&user_uuid,
&state.pg_pool,
workspace_id.into_inner(),
member_emails,
&workspace_id,
&member_emails,
)
.await?;
for email in member_emails {
let uid = select_uid_from_email(&state.pg_pool, &email)
.await
.map_err(AppError::from)?;
state
.workspace_access_control
.remove_member(&uid, &workspace_id)
.await;
}
Ok(AppResponse::Ok().into())
}
@ -126,7 +148,18 @@ async fn update_workspace_member_handler(
) -> Result<JsonAppResponse<()>> {
let workspace_id = workspace_id.into_inner();
let changeset = payload.into_inner();
workspace::ops::update_workspace_member(&state.pg_pool, &workspace_id, changeset).await?;
workspace::ops::update_workspace_member(&state.pg_pool, &workspace_id, &changeset).await?;
if let Some(role) = changeset.role {
let uid = select_uid_from_email(&state.pg_pool, &changeset.email)
.await
.map_err(AppError::from)?;
state
.workspace_access_control
.update_member(&uid, &workspace_id, role)
.await;
}
Ok(AppResponse::Ok().into())
}
@ -234,7 +267,12 @@ async fn add_collab_member_handler(
payload: Json<InsertCollabMemberParams>,
state: Data<AppState>,
) -> Result<Json<AppResponse<()>>> {
biz::collab::ops::create_collab_member(&state.pg_pool, &payload.into_inner()).await?;
let payload = payload.into_inner();
biz::collab::ops::create_collab_member(&state.pg_pool, &payload).await?;
state
.collab_access_control
.update_member(&payload.uid, &payload.object_id, payload.access_level)
.await;
Ok(Json(AppResponse::Ok()))
}
@ -245,7 +283,14 @@ async fn update_collab_member_handler(
payload: Json<UpdateCollabMemberParams>,
state: Data<AppState>,
) -> Result<Json<AppResponse<()>>> {
biz::collab::ops::upsert_collab_member(&state.pg_pool, &user_uuid, &payload.into_inner()).await?;
let payload = payload.into_inner();
biz::collab::ops::upsert_collab_member(&state.pg_pool, &user_uuid, &payload).await?;
state
.collab_access_control
.update_member(&payload.uid, &payload.object_id, payload.access_level)
.await;
Ok(Json(AppResponse::Ok()))
}
#[instrument(skip(state, payload), err)]
@ -255,7 +300,8 @@ async fn get_collab_member_handler(
payload: Json<CollabMemberIdentify>,
state: Data<AppState>,
) -> Result<Json<AppResponse<AFCollabMember>>> {
let member = biz::collab::ops::get_collab_member(&state.pg_pool, &payload.into_inner()).await?;
let payload = payload.into_inner();
let member = biz::collab::ops::get_collab_member(&state.pg_pool, &payload).await?;
Ok(Json(AppResponse::Ok().with_data(member)))
}
@ -266,7 +312,13 @@ async fn remove_collab_member_handler(
payload: Json<CollabMemberIdentify>,
state: Data<AppState>,
) -> Result<Json<AppResponse<()>>> {
biz::collab::ops::delete_collab_member(&state.pg_pool, &payload.into_inner()).await?;
let payload = payload.into_inner();
biz::collab::ops::delete_collab_member(&state.pg_pool, &payload).await?;
state
.collab_access_control
.remove_member(&payload.uid, &payload.object_id)
.await;
Ok(Json(AppResponse::Ok()))
}

View file

@ -1,70 +1,44 @@
use crate::biz::collab::member_listener::{CollabMemberAction, CollabMemberChange};
use crate::biz::workspace::access_control::WorkspaceAccessControl;
use crate::middleware::access_control_mw::{AccessResource, HttpAccessControlService};
use actix_router::{Path, Url};
use actix_web::http::Method;
use async_trait::async_trait;
use database::collab::CollabStorageAccessControl;
use database::user::select_uid_from_uuid;
use database_entity::database_error::DatabaseError;
use database_entity::{AFAccessLevel, AFRole};
use realtime::collaborate::{CollabAccessControl, CollabUserId};
use shared_entity::app_error::AppError;
use shared_entity::error_code::ErrorCode;
use sqlx::PgPool;
use crate::biz::workspace::access_control::WorkspaceAccessControl;
use actix_router::{Path, Url};
use actix_web::http::Method;
use database::collab::CollabStorageAccessControl;
use database::user::select_uid_from_uuid;
use database_entity::database_error::DatabaseError;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{instrument, trace, warn};
use tracing::{instrument, warn};
use uuid::Uuid;
#[derive(Clone)]
pub struct CollabHttpAccessControl<AC: CollabAccessControl>(pub Arc<AC>);
#[async_trait]
impl<AC> HttpAccessControlService for CollabHttpAccessControl<AC>
where
AC: CollabAccessControl,
AppError: From<<AC as CollabAccessControl>::Error>,
{
fn resource(&self) -> AccessResource {
AccessResource::Collab
}
async fn check_collab_permission(
&self,
oid: &str,
user_uuid: &Uuid,
method: Method,
_path: Path<Url>,
) -> Result<(), AppError> {
trace!("oid: {:?}, user_uuid: {:?}", oid, user_uuid);
let can_access = self
.0
.can_access_http_method(CollabUserId::UserUuid(user_uuid), oid, &method)
.await
.map_err(AppError::from)?;
if !can_access {
return Err(AppError::new(
ErrorCode::NotEnoughPermissions,
format!(
"Not enough permissions to access the collab: {} with http method: {}",
oid, method
),
));
}
Ok(())
}
}
type MemberStatusByUid = HashMap<i64, CollabMemberStatusByOid>;
/// Represents the access level of a collaboration object identified by its OID.
/// - Key: OID of the collaboration object.
/// - Value: The user's role within the collaboration.
///
type CollabMemberStatusByOid = HashMap<String, MemberStatus>;
/// Use to check if the user is allowed to send or receive the [CollabMessage]
/// Represents the access levels of various collaboration objects for a user.
/// - Key: User's UID.
/// - Value: A mapping between the collaboration object's OID and the user's access level within that collaboration.
///
/// uid -> oid -> access level of the user in the collab
///
type MemberStatusByUid = HashMap<i64, CollabMemberStatusByOid>;
/// Used to cache the access level of a user for collaboration objects.
/// The cache will be updated after the user's access level for a collaboration object is changed.
/// The change is broadcasted by the `CollabMemberListener` or set by the [CollabAccessControlImpl::update_member] method.
///
/// TODO(nathan): broadcast the member access level changes to all connected devices
///
pub struct CollabAccessControlImpl {
pg_pool: PgPool,
member_status_by_uid: Arc<RwLock<MemberStatusByUid>>,
@ -72,55 +46,39 @@ pub struct CollabAccessControlImpl {
#[derive(Clone, Debug)]
enum MemberStatus {
/// Mark the user is not the member of the collab.
/// it don't need to query the database to get the access level of the user in the collab
/// when the user is not the member of the collab
Deleted,
/// The user is the member of the collab
Valid(AFAccessLevel),
}
impl CollabAccessControlImpl {
pub fn new(pg_pool: PgPool, mut listener: broadcast::Receiver<CollabMemberChange>) -> Self {
pub fn new(pg_pool: PgPool, listener: broadcast::Receiver<CollabMemberChange>) -> Self {
let member_status_by_uid = Arc::new(RwLock::new(HashMap::new()));
// Update the role of the user when the role of the collab member is changed
let cloned_pg_pool = pg_pool.clone();
let cloned_member_status_by_uid = member_status_by_uid.clone();
tokio::spawn(async move {
while let Ok(change) = listener.recv().await {
match change.action_type {
CollabMemberAction::INSERT | CollabMemberAction::UPDATE => {
if let (Some(oid), Some(uid)) = (change.new_oid(), change.new_uid()) {
let _ =
refresh_from_db(uid, oid, &cloned_pg_pool, &cloned_member_status_by_uid).await;
} else {
warn!("The oid or uid is None")
}
},
CollabMemberAction::DELETE => {
if let (Some(oid), Some(uid)) = (change.old_oid(), change.old_uid()) {
if let Some(inner_map) = cloned_member_status_by_uid.write().await.get_mut(uid) {
inner_map.insert(oid.to_string(), MemberStatus::Deleted);
}
} else {
warn!("The oid or uid is None")
}
},
}
}
});
// Listen to the changes of the collab member and update the memory cache
spawn_listen_on_collab_member_change(listener, pg_pool.clone(), member_status_by_uid.clone());
Self {
pg_pool,
member_status_by_uid,
}
}
/// Return the role of the user in the collab
async fn get_role_state(&self, uid: &i64, oid: &str) -> Option<MemberStatus> {
self
.member_status_by_uid
.read()
.await
.get(uid)
.map(|map| map.get(oid).cloned())?
/// The member's access level may be altered by PostgreSQL notifications. However, there are instances
/// where these notifications aren't received promptly, leading to potential inconsistencies in the user's access level.
/// Therefore, it's essential to update the user's access level in the cache whenever there's a change.
pub async fn update_member(&self, uid: &i64, oid: &str, access_level: AFAccessLevel) {
update_collab_member_status(uid, oid, access_level, &self.member_status_by_uid).await;
}
pub async fn remove_member(&self, uid: &i64, oid: &str) {
if let Some(inner_map) = self.member_status_by_uid.write().await.get_mut(uid) {
if let Entry::Occupied(mut entry) = inner_map.entry(oid.to_string()) {
entry.insert(MemberStatus::Deleted);
}
}
}
#[inline]
@ -129,35 +87,98 @@ impl CollabAccessControlImpl {
uid: &i64,
oid: &str,
) -> Result<AFAccessLevel, AppError> {
let member_status = match self.get_role_state(uid, oid).await {
None => refresh_from_db(uid, oid, &self.pg_pool, &self.member_status_by_uid).await?,
let member_status = self
.member_status_by_uid
.read()
.await
.get(uid)
.and_then(|map| map.get(oid).cloned());
let member_status = match member_status {
None => {
reload_collab_member_status_from_db(uid, oid, &self.pg_pool, &self.member_status_by_uid)
.await?
},
Some(status) => status,
};
match member_status {
MemberStatus::Deleted => Err(AppError::new(
ErrorCode::NotEnoughPermissions,
"The user is not the member of the collab",
format!("user:{} is not a member of collab:{}", uid, oid),
)),
MemberStatus::Valid(access_level) => Ok(access_level),
}
}
}
fn spawn_listen_on_collab_member_change(
mut listener: broadcast::Receiver<CollabMemberChange>,
pg_pool: PgPool,
member_status_by_uid: Arc<RwLock<MemberStatusByUid>>,
) {
tokio::spawn(async move {
while let Ok(change) = listener.recv().await {
match change.action_type {
CollabMemberAction::INSERT | CollabMemberAction::UPDATE => {
if let (Some(oid), Some(uid)) = (change.new_oid(), change.new_uid()) {
if let Err(err) =
reload_collab_member_status_from_db(uid, oid, &pg_pool, &member_status_by_uid).await
{
warn!(
"Failed to reload the collab member status from db: {:?}, error: {}",
change, err
);
}
} else {
warn!("The oid or uid is None")
}
},
CollabMemberAction::DELETE => {
if let (Some(oid), Some(uid)) = (change.old_oid(), change.old_uid()) {
if let Some(inner_map) = member_status_by_uid.write().await.get_mut(uid) {
inner_map.insert(oid.to_string(), MemberStatus::Deleted);
}
} else {
warn!("The oid or uid is None")
}
},
}
}
});
}
#[inline]
async fn refresh_from_db(
async fn update_collab_member_status(
uid: &i64,
oid: &str,
access_level: AFAccessLevel,
member_status_by_uid: &Arc<RwLock<MemberStatusByUid>>,
) {
let mut outer_map = member_status_by_uid.write().await;
let inner_map = outer_map.entry(*uid).or_insert_with(HashMap::new);
inner_map.insert(oid.to_string(), MemberStatus::Valid(access_level));
}
#[inline]
async fn reload_collab_member_status_from_db(
uid: &i64,
oid: &str,
pg_pool: &PgPool,
member_status_by_uid: &Arc<RwLock<MemberStatusByUid>>,
) -> Result<MemberStatus, AppError> {
let member = database::collab::select_collab_member(uid, oid, pg_pool).await?;
let mut outer_map = member_status_by_uid.write().await;
let inner_map = outer_map.entry(*uid).or_insert_with(HashMap::new);
let status = MemberStatus::Valid(member.permission.access_level);
inner_map.insert(member.oid, status.clone());
let status = MemberStatus::Valid(member.permission.access_level.clone());
update_collab_member_status(
uid,
oid,
member.permission.access_level,
member_status_by_uid,
)
.await;
Ok(status)
}
#[async_trait]
impl CollabAccessControl for CollabAccessControlImpl {
type Error = AppError;
@ -235,6 +256,45 @@ impl CollabAccessControl for CollabAccessControlImpl {
}
}
#[derive(Clone)]
pub struct CollabHttpAccessControl<AC: CollabAccessControl>(pub Arc<AC>);
#[async_trait]
impl<AC> HttpAccessControlService for CollabHttpAccessControl<AC>
where
AC: CollabAccessControl,
AppError: From<<AC as CollabAccessControl>::Error>,
{
fn resource(&self) -> AccessResource {
AccessResource::Collab
}
async fn check_collab_permission(
&self,
oid: &str,
user_uuid: &Uuid,
method: Method,
_path: Path<Url>,
) -> Result<(), AppError> {
let can_access = self
.0
.can_access_http_method(CollabUserId::UserUuid(user_uuid), oid, &method)
.await
.map_err(AppError::from)?;
if !can_access {
return Err(AppError::new(
ErrorCode::NotEnoughPermissions,
format!(
"Not enough permissions to access the collab: {} with http method: {}",
oid, method
),
));
}
Ok(())
}
}
#[derive(Clone)]
pub struct CollabStorageAccessControlImpl<CollabAC, WorkspaceAC> {
pub(crate) collab_access_control: Arc<CollabAC>,

View file

@ -31,7 +31,7 @@ pub async fn upsert_collab(
let owner_uid = user::select_uid_from_uuid(pg_pool, user_uuid).await?;
let mut tx = pg_pool.begin().await?;
database::collab::insert_af_collab(&mut tx, &owner_uid, params).await?;
database::collab::insert_into_af_collab(&mut tx, &owner_uid, params).await?;
tx.commit().await?;
Ok(())
}

View file

@ -92,22 +92,24 @@ where
// Check if the user has enough permissions to insert collab
let has_permission = if self.is_collab_exist(&params.object_id).await? {
event!(
tracing::Level::TRACE,
"user:{} try to update exist collab:{}",
uid,
params.object_id
);
// If the collab already exists, check if the user has enough permissions to update collab
self
let level = self
.access_control
.get_collab_access_level(uid, &params.object_id)
.await
.context(format!(
"Can't find the access level when user:{} try to insert collab",
uid
))?
.can_write()
))?;
event!(
tracing::Level::TRACE,
"user:{} with {:?} try to update exist collab:{}",
uid,
level,
params.object_id
);
level.can_write()
} else {
// If the collab doesn't exist, check if the user has enough permissions to create collab.
// If the user is the owner or member of the workspace, the user can create collab.
@ -117,9 +119,9 @@ where
.await?;
event!(
tracing::Level::TRACE,
"user:{}:{:?} insert new collab:{}",
uid,
"[{:?}]user:{} try to insert new collab:{}",
role,
uid,
params.object_id
);
role.can_create_collab()

View file

@ -1,19 +1,19 @@
use crate::biz::workspace::member_listener::WorkspaceMemberChange;
use crate::biz::workspace::member_listener::{WorkspaceMemberAction, WorkspaceMemberChange};
use crate::component::auth::jwt::UserUuid;
use crate::middleware::access_control_mw::{AccessResource, HttpAccessControlService};
use actix_http::Method;
use anyhow::Context;
use async_trait::async_trait;
use database::user::select_uid_from_uuid;
use database::workspace::select_user_role;
use database_entity::AFRole;
use shared_entity::app_error::AppError;
use shared_entity::error_code::ErrorCode;
use sqlx::PgPool;
use std::ops::DerefMut;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{instrument, trace};
use tokio::sync::{broadcast, RwLock};
use tracing::{instrument, trace, warn};
use uuid::Uuid;
#[async_trait]
@ -26,14 +26,155 @@ pub trait WorkspaceAccessControl: Send + Sync + 'static {
async fn get_role_from_uid(&self, uid: &i64, workspace_id: &Uuid) -> Result<AFRole, AppError>;
}
/// Represents the role of the user in the workspace by the workspace id.
/// - Key: workspace id of
/// - Value: the member status of the user in the workspace
type MemberStatusByWorkspaceId = HashMap<Uuid, MemberStatus>;
/// Represents the role of the various workspaces for a user
/// - Key: User's UID
/// - Value: A mapping between the workspace id and the member status of the user in the workspace
type MemberStatusByUid = HashMap<i64, MemberStatusByWorkspaceId>;
#[derive(Clone, Debug)]
enum MemberStatus {
/// Mark the user is not the member of the workspace.
/// it don't need to query the database to get the role of the user in the workspace
/// when the user is not the member of the workspace.
Deleted,
/// The user is the member of the workspace
Valid(AFRole),
}
pub struct WorkspaceAccessControlImpl {
pg_pool: PgPool,
member_status_by_uid: Arc<RwLock<MemberStatusByUid>>,
}
impl WorkspaceAccessControlImpl {
pub fn new(pg_pool: PgPool, _listener: broadcast::Receiver<WorkspaceMemberChange>) -> Self {
WorkspaceAccessControlImpl { pg_pool }
pub fn new(pg_pool: PgPool, listener: broadcast::Receiver<WorkspaceMemberChange>) -> Self {
let member_status_by_uid = Arc::new(RwLock::new(HashMap::new()));
spawn_listen_on_workspace_member_change(
listener,
pg_pool.clone(),
member_status_by_uid.clone(),
);
WorkspaceAccessControlImpl {
pg_pool,
member_status_by_uid,
}
}
pub async fn update_member(&self, uid: &i64, workspace_id: &Uuid, role: AFRole) {
update_workspace_member_status(uid, workspace_id, role, &self.member_status_by_uid).await;
}
pub async fn remove_member(&self, uid: &i64, workspace_id: &Uuid) {
if let Some(inner_map) = self.member_status_by_uid.write().await.get_mut(uid) {
if let Entry::Occupied(mut entry) = inner_map.entry(*workspace_id) {
entry.insert(MemberStatus::Deleted);
}
}
}
#[inline]
async fn get_user_workspace_role(
&self,
uid: &i64,
workspace_id: &Uuid,
) -> Result<AFRole, AppError> {
let member_status = self
.member_status_by_uid
.read()
.await
.get(uid)
.and_then(|map| map.get(workspace_id).cloned());
let member_status = match member_status {
None => {
reload_workspace_member_status_from_db(
uid,
workspace_id,
&self.pg_pool,
&self.member_status_by_uid,
)
.await?
},
Some(status) => status,
};
match member_status {
MemberStatus::Deleted => Err(AppError::new(
ErrorCode::NotEnoughPermissions,
format!("user:{} is not a member of workspace:{}", uid, workspace_id),
)),
MemberStatus::Valid(access_level) => Ok(access_level),
}
}
}
#[inline]
async fn update_workspace_member_status(
uid: &i64,
workspace_id: &Uuid,
role: AFRole,
member_status_by_uid: &Arc<RwLock<MemberStatusByUid>>,
) {
let mut outer_map = member_status_by_uid.write().await;
let inner_map = outer_map.entry(*uid).or_insert_with(HashMap::new);
inner_map.insert(*workspace_id, MemberStatus::Valid(role));
}
#[inline]
async fn reload_workspace_member_status_from_db(
uid: &i64,
workspace_id: &Uuid,
pg_pool: &PgPool,
member_status_by_uid: &Arc<RwLock<MemberStatusByUid>>,
) -> Result<MemberStatus, AppError> {
let member = database::workspace::select_workspace_member(pg_pool, uid, workspace_id).await?;
let status = MemberStatus::Valid(member.role.clone());
update_workspace_member_status(uid, workspace_id, member.role, member_status_by_uid).await;
Ok(status)
}
fn spawn_listen_on_workspace_member_change(
mut listener: broadcast::Receiver<WorkspaceMemberChange>,
pg_pool: PgPool,
member_status_by_uid: Arc<RwLock<MemberStatusByUid>>,
) {
tokio::spawn(async move {
while let Ok(change) = listener.recv().await {
match change.action_type {
WorkspaceMemberAction::INSERT | WorkspaceMemberAction::UPDATE => match change.new {
None => {
warn!("The workspace member change can't be None when the action is INSERT or UPDATE")
},
Some(change) => {
if let Err(err) = reload_workspace_member_status_from_db(
&change.uid,
&change.workspace_id,
&pg_pool,
&member_status_by_uid,
)
.await
{
warn!("Failed to reload workspace member status from db: {}", err);
}
},
},
WorkspaceMemberAction::DELETE => match change.old {
None => warn!("The workspace member change can't be None when the action is DELETE"),
Some(change) => {
if let Some(inner_map) = member_status_by_uid.write().await.get_mut(&change.uid) {
inner_map.insert(change.workspace_id, MemberStatus::Deleted);
}
},
},
}
}
});
}
#[async_trait]
@ -43,22 +184,13 @@ impl WorkspaceAccessControl for WorkspaceAccessControlImpl {
user_uuid: &Uuid,
workspace_id: &Uuid,
) -> Result<AFRole, AppError> {
let mut txn = self
.pg_pool
.begin()
.await
.context("failed to acquire a transaction to query role")?;
let uid = select_uid_from_uuid(txn.deref_mut(), user_uuid).await?;
let role = select_user_role(txn.deref_mut(), &uid, workspace_id).await?;
txn
.commit()
.await
.context("failed to commit transaction to query role")?;
let uid = select_uid_from_uuid(&self.pg_pool, user_uuid).await?;
let role = self.get_user_workspace_role(&uid, workspace_id).await?;
Ok(role)
}
async fn get_role_from_uid(&self, uid: &i64, workspace_id: &Uuid) -> Result<AFRole, AppError> {
let role = select_user_role(&self.pg_pool, uid, workspace_id).await?;
let role = self.get_user_workspace_role(uid, workspace_id).await?;
Ok(role)
}
}

View file

@ -1,6 +1,6 @@
use crate::biz::pg_listener::PostgresDBListener;
use collab::preclude::Uuid;
use serde::Deserialize;
use uuid::Uuid;
#[allow(clippy::upper_case_acronyms)]
#[derive(Deserialize, Clone, Debug)]

View file

@ -1,34 +1,45 @@
use crate::component::auth::jwt::UserUuid;
use anyhow::Context;
use database::collab::upsert_collab_member_with_txn;
use database::user::select_uid_from_email;
use database::workspace::{
delete_workspace_members, insert_workspace_member, select_all_workspaces_owned,
select_workspace_members, upsert_workspace_member,
delete_workspace_members, insert_workspace_member_with_txn, select_all_workspaces_owned,
select_workspace_member_list, upsert_workspace_member,
};
use database_entity::{AFWorkspaceMember, AFWorkspaces};
use database_entity::{AFAccessLevel, AFRole, AFWorkspaceMember, AFWorkspaces};
use shared_entity::app_error::AppError;
use shared_entity::dto::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset};
use sqlx::{types::uuid, PgPool};
use std::ops::DerefMut;
use uuid::Uuid;
pub async fn get_workspaces(
pg_pool: &PgPool,
user_uuid: &uuid::Uuid,
) -> Result<AFWorkspaces, AppError> {
pub async fn get_workspaces(pg_pool: &PgPool, user_uuid: &Uuid) -> Result<AFWorkspaces, AppError> {
let workspaces = select_all_workspaces_owned(pg_pool, user_uuid).await?;
Ok(AFWorkspaces(workspaces))
}
pub async fn add_workspace_members(
pg_pool: &PgPool,
_user_uuid: &uuid::Uuid,
workspace_id: &uuid::Uuid,
members: Vec<CreateWorkspaceMember>,
_user_uuid: &Uuid,
workspace_id: &Uuid,
members: &[CreateWorkspaceMember],
) -> Result<(), AppError> {
let mut txn = pg_pool
.begin()
.await
.context("Begin transaction to insert workspace members")?;
for member in members {
insert_workspace_member(&mut txn, workspace_id, member.email, member.role).await?;
let access_level = match &member.role {
AFRole::Owner => AFAccessLevel::FullAccess,
AFRole::Member => AFAccessLevel::ReadAndWrite,
AFRole::Guest => AFAccessLevel::ReadOnly,
};
let uid = select_uid_from_email(txn.deref_mut(), &member.email).await?;
insert_workspace_member_with_txn(&mut txn, workspace_id, &member.email, member.role.clone())
.await?;
upsert_collab_member_with_txn(uid, workspace_id.to_string(), &access_level, &mut txn).await?;
}
txn
@ -41,8 +52,8 @@ pub async fn add_workspace_members(
pub async fn remove_workspace_members(
user_uuid: &UserUuid,
pg_pool: &PgPool,
workspace_id: uuid::Uuid,
member_emails: Vec<String>,
workspace_id: &Uuid,
member_emails: &[String],
) -> Result<(), AppError> {
let mut txn = pg_pool
.begin()
@ -50,7 +61,7 @@ pub async fn remove_workspace_members(
.context("Begin transaction to delete workspace members")?;
for email in member_emails {
delete_workspace_members(user_uuid, &mut txn, &workspace_id, email).await?;
delete_workspace_members(user_uuid, &mut txn, workspace_id, email.as_str()).await?;
}
txn
@ -62,18 +73,23 @@ pub async fn remove_workspace_members(
pub async fn get_workspace_members(
pg_pool: &PgPool,
_user_uuid: &uuid::Uuid,
workspace_id: &uuid::Uuid,
_user_uuid: &Uuid,
workspace_id: &Uuid,
) -> Result<Vec<AFWorkspaceMember>, AppError> {
Ok(select_workspace_members(pg_pool, workspace_id).await?)
Ok(select_workspace_member_list(pg_pool, workspace_id).await?)
}
#[allow(dead_code)]
pub async fn update_workspace_member(
pg_pool: &PgPool,
workspace_id: &uuid::Uuid,
changeset: WorkspaceMemberChangeset,
workspace_id: &Uuid,
changeset: &WorkspaceMemberChangeset,
) -> Result<(), AppError> {
upsert_workspace_member(pg_pool, workspace_id, &changeset.email, changeset.role).await?;
upsert_workspace_member(
pg_pool,
workspace_id,
&changeset.email,
changeset.role.clone(),
)
.await?;
Ok(())
}

View file

@ -23,6 +23,12 @@ pub enum AccessResource {
Collab,
}
/// The access control service for http request.
/// It is used to check the permission of the request if the request is related to workspace or collab.
/// If the request is not related to workspace or collab, it will be skipped.
///
/// The collab and workspace access control can be separated into different traits. Currently, they are
/// combined into one trait.
#[async_trait]
pub trait HttpAccessControlService: Send + Sync {
fn resource(&self) -> AccessResource;
@ -84,11 +90,14 @@ where
}
}
pub type AccessControlServices = Arc<HashMap<AccessResource, Arc<dyn HttpAccessControlService>>>;
pub type HttpAccessControlServices =
Arc<HashMap<AccessResource, Arc<dyn HttpAccessControlService>>>;
/// Implement the access control for the workspace and collab.
/// It will check the permission of the request if the request is related to workspace or collab.
#[derive(Clone, Default)]
pub struct WorkspaceAccessControl {
access_control_services: AccessControlServices,
access_control_services: HttpAccessControlServices,
}
impl WorkspaceAccessControl {
@ -108,7 +117,7 @@ impl WorkspaceAccessControl {
}
impl Deref for WorkspaceAccessControl {
type Target = AccessControlServices;
type Target = HttpAccessControlServices;
fn deref(&self) -> &Self::Target {
&self.access_control_services
@ -141,9 +150,17 @@ where
}
}
/// Each request will be handled by this middleware. It will check the permission of the request
/// if the request is related to workspace or collab. The [WORKSPACE_ID_PATH] and [COLLAB_OBJECT_ID_PATH]
/// are used to identify the workspace and collab.
///
/// For example, if the request path is `/api/workspace/{workspace_id}/collab/{object_id}`, then the
/// [WorkspaceAccessControlMiddleware] will check the permission of the workspace and collab.
///
///
pub struct WorkspaceAccessControlMiddleware<S> {
service: S,
access_control_service: AccessControlServices,
access_control_service: HttpAccessControlServices,
}
impl<S, B> Service<ServiceRequest> for WorkspaceAccessControlMiddleware<S>

View file

@ -1,4 +1,4 @@
use crate::realtime::test_client::{assert_client_collab, assert_remote_collab, TestClient};
use crate::util::test_client::{assert_client_collab, assert_server_collab, TestClient};
use collab_entity::CollabType;
use database_entity::AFAccessLevel;
use serde_json::json;
@ -32,7 +32,7 @@ async fn recv_updates_without_permission_test() {
}
#[tokio::test]
async fn recv_editing_updates_with_readonly_permission_test() {
async fn recv_remote_updates_with_readonly_permission_test() {
let collab_type = CollabType::Document;
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
@ -70,7 +70,7 @@ async fn recv_editing_updates_with_readonly_permission_test() {
"name": "AppFlowy"
});
assert_client_collab(&mut client_2, &object_id, expected.clone(), 10).await;
assert_remote_collab(
assert_server_collab(
&workspace_id,
&mut client_1.api_client,
&object_id,
@ -82,7 +82,7 @@ async fn recv_editing_updates_with_readonly_permission_test() {
}
#[tokio::test]
async fn recv_remote_updates_with_readonly_permission_test() {
async fn init_sync_with_readonly_permission_test() {
let collab_type = CollabType::Document;
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
@ -104,7 +104,7 @@ async fn recv_remote_updates_with_readonly_permission_test() {
let expected = json!({
"name": "AppFlowy"
});
assert_remote_collab(
assert_server_collab(
&workspace_id,
&mut client_1.api_client,
&object_id,
@ -115,7 +115,7 @@ async fn recv_remote_updates_with_readonly_permission_test() {
.await;
// Add client 2 as the member of the collab with readonly permission.
// client 2 can pull the latest updates but it's not allowed to send local changes.
// client 2 can pull the latest updates via the init sync. But it's not allowed to send local changes.
client_1
.add_client_as_collab_member(
&workspace_id,
@ -174,7 +174,7 @@ async fn edit_collab_with_readonly_permission_test() {
)
.await;
assert_remote_collab(
assert_server_collab(
&workspace_id,
&mut client_1.api_client,
&object_id,
@ -185,6 +185,56 @@ async fn edit_collab_with_readonly_permission_test() {
.await;
}
#[tokio::test]
async fn edit_collab_with_read_and_write_permission_test() {
let collab_type = CollabType::Document;
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.current_workspace_id().await;
let object_id = client_1
.create_collab(&workspace_id, collab_type.clone())
.await;
// Add client 2 as the member of the collab then the client 2 will receive the update.
client_1
.add_client_as_collab_member(
&workspace_id,
&object_id,
&client_2,
AFAccessLevel::ReadAndWrite,
)
.await;
client_2
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
// client 2 edit the collab and then the server will broadcast the update
client_2
.collab_by_object_id
.get_mut(&object_id)
.unwrap()
.collab
.lock()
.insert("name", "AppFlowy");
let expected = json!({
"name": "AppFlowy"
});
assert_client_collab(&mut client_2, &object_id, expected.clone(), 5).await;
assert_server_collab(
&workspace_id,
&mut client_1.api_client,
&object_id,
&collab_type,
5,
expected,
)
.await;
}
#[tokio::test]
async fn edit_collab_with_full_access_permission_test() {
let collab_type = CollabType::Document;
@ -224,7 +274,7 @@ async fn edit_collab_with_full_access_permission_test() {
});
assert_client_collab(&mut client_2, &object_id, expected.clone(), 5).await;
assert_remote_collab(
assert_server_collab(
&workspace_id,
&mut client_1.api_client,
&object_id,
@ -234,3 +284,82 @@ async fn edit_collab_with_full_access_permission_test() {
)
.await;
}
#[tokio::test]
async fn edit_collab_with_full_access_then_readonly_permission() {
let collab_type = CollabType::Document;
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.current_workspace_id().await;
let object_id = client_1
.create_collab(&workspace_id, collab_type.clone())
.await;
// Add client 2 as the member of the collab then the client 2 will receive the update.
client_1
.add_client_as_collab_member(
&workspace_id,
&object_id,
&client_2,
AFAccessLevel::FullAccess,
)
.await;
// client 2 edit the collab and then the server will broadcast the update
{
client_2
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
client_2
.collab_by_object_id
.get_mut(&object_id)
.unwrap()
.collab
.lock()
.insert("title", "hello world");
client_2.wait_object_sync_complete(&object_id).await;
}
// update the permission from full access to readonly, then the server will reject the subsequent
// updates generated by client 2
{
client_1
.update_collab_member_access_level(
&workspace_id,
&object_id,
&client_2,
AFAccessLevel::ReadOnly,
)
.await;
client_2
.collab_by_object_id
.get_mut(&object_id)
.unwrap()
.collab
.lock()
.insert("subtitle", "Writing Rust, fun");
}
assert_client_collab(
&mut client_2,
&object_id,
json!({
"title": "hello world",
"subtitle": "Writing Rust, fun"
}),
5,
)
.await;
assert_server_collab(
&workspace_id,
&mut client_1.api_client,
&object_id,
&collab_type,
5,
json!({
"title": "hello world"
}),
)
.await;
}

View file

@ -1,8 +1,13 @@
use client_api::Client;
mod member_test;
mod member_crud;
mod storage_test;
mod edit_permission;
mod multi_devices_edit;
mod single_device_edit;
mod workspace_collab;
pub(crate) async fn workspace_id_from_client(c: &Client) -> String {
c.get_workspaces()
.await

View file

@ -1,5 +1,5 @@
use crate::realtime::test_client::{assert_client_collab, assert_remote_collab, TestClient};
use crate::user::utils::generate_unique_registered_user;
use crate::util::test_client::{assert_client_collab, assert_server_collab, TestClient};
use std::time::Duration;
use collab_entity::CollabType;
@ -49,7 +49,7 @@ async fn edit_collab_with_ws_reconnect_sync_test() {
// Wait for the messages to be sent
test_client.wait_object_sync_complete(&object_id).await;
assert_remote_collab(
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,

View file

@ -1,4 +1,4 @@
use crate::realtime::test_client::{assert_remote_collab, TestClient};
use crate::util::test_client::{assert_server_collab, TestClient};
use collab_entity::CollabType;
use serde_json::json;
@ -35,7 +35,7 @@ async fn realtime_write_single_collab_test() {
});
test_client.wait_object_sync_complete(&object_id).await;
assert_remote_collab(
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
@ -73,7 +73,7 @@ async fn realtime_write_multiple_collab_test() {
// Wait for the messages to be sent
for object_id in object_ids {
assert_remote_collab(
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
@ -252,7 +252,7 @@ async fn multiple_collab_edit_test() {
.insert("title", "I am client 2");
client_2.wait_object_sync_complete(&object_id_2).await;
assert_remote_collab(
assert_server_collab(
&workspace_id_1,
&mut client_1.api_client,
&object_id_1,
@ -264,7 +264,7 @@ async fn multiple_collab_edit_test() {
)
.await;
assert_remote_collab(
assert_server_collab(
&workspace_id_2,
&mut client_2.api_client,
&object_id_2,

View file

@ -0,0 +1,106 @@
use crate::util::test_client::{assert_client_collab, assert_server_collab, TestClient};
use collab_entity::CollabType;
use database_entity::AFRole;
use serde_json::json;
#[tokio::test]
async fn edit_workspace_without_permission() {
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.current_workspace_id().await;
client_1.open_workspace(&workspace_id).await;
client_2.open_workspace(&workspace_id).await;
client_1
.collab_by_object_id
.get_mut(&workspace_id)
.unwrap()
.collab
.lock()
.insert("name", "AppFlowy");
client_1.wait_object_sync_complete(&workspace_id).await;
assert_client_collab(&mut client_1, &workspace_id, json!({"name": "AppFlowy"}), 3).await;
assert_client_collab(&mut client_2, &workspace_id, json!({}), 3).await;
}
#[tokio::test]
async fn init_sync_workspace_with_guest_permission() {
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.current_workspace_id().await;
client_1.open_workspace(&workspace_id).await;
// add client 2 as the member of the workspace then the client 2 will receive the update.
client_1
.add_client_as_workspace_member(&workspace_id, &client_2, AFRole::Guest)
.await;
client_2.open_workspace(&workspace_id).await;
client_1
.collab_by_object_id
.get_mut(&workspace_id)
.unwrap()
.collab
.lock()
.insert("name", "AppFlowy");
client_1.wait_object_sync_complete(&workspace_id).await;
assert_client_collab(&mut client_1, &workspace_id, json!({"name": "AppFlowy"}), 3).await;
assert_client_collab(&mut client_2, &workspace_id, json!({"name": "AppFlowy"}), 3).await;
}
#[tokio::test]
async fn edit_workspace_with_guest_permission() {
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.current_workspace_id().await;
client_1.open_workspace(&workspace_id).await;
// add client 2 as the member of the workspace then the client 2 will receive the update.
client_1
.add_client_as_workspace_member(&workspace_id, &client_2, AFRole::Guest)
.await;
client_1
.collab_by_object_id
.get_mut(&workspace_id)
.unwrap()
.collab
.lock()
.insert("name", "zack");
client_1.wait_object_sync_complete(&workspace_id).await;
client_2.open_workspace(&workspace_id).await;
// make sure the client 2 has received the remote updates before the client 2 edits the collab
client_2
.wait_object_sync_complete_with_secs(&workspace_id, 10)
.await;
client_2
.collab_by_object_id
.get_mut(&workspace_id)
.unwrap()
.collab
.lock()
.insert("name", "nathan");
client_2
.wait_object_sync_complete_with_secs(&workspace_id, 5)
.await;
assert_client_collab(&mut client_1, &workspace_id, json!({"name": "zack"}), 3).await;
assert_client_collab(&mut client_2, &workspace_id, json!({"name": "nathan"}), 3).await;
assert_server_collab(
&workspace_id,
&mut client_1.api_client,
&workspace_id,
&CollabType::Folder,
5,
json!({
"name": "zack"
}),
)
.await;
}

View file

@ -1,15 +1,12 @@
use client_api::Client;
use std::sync::Once;
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
mod collab;
mod gotrue;
mod realtime;
mod user;
mod util;
mod file_storage;
mod websocket;
mod workspace;
pub const LOCALHOST_URL: &str = "http://localhost:8000";
@ -32,19 +29,3 @@ pub const DEV_GOTRUE: &str = "https://test.appflowy.cloud/gotrue";
pub fn test_appflowy_cloud_client() -> Client {
Client::new(DEV_URL, DEV_WS, DEV_GOTRUE)
}
pub fn setup_log() {
static START: Once = Once::new();
START.call_once(|| {
let level = "trace";
let mut filters = vec![];
filters.push(format!("client_api={}", level));
std::env::set_var("RUST_LOG", filters.join(","));
let subscriber = Subscriber::builder()
.with_ansi(true)
.with_env_filter(EnvFilter::from_default_env())
.finish();
subscriber.try_init().unwrap();
});
}

View file

@ -1,5 +0,0 @@
mod collab_permisson;
mod edit_collab;
mod multi_devices_edit_collab;
mod test_client;
mod ws_connect;

22
tests/util/mod.rs Normal file
View file

@ -0,0 +1,22 @@
use std::sync::Once;
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
pub(crate) mod test_client;
pub fn setup_log() {
static START: Once = Once::new();
START.call_once(|| {
let level = "trace";
let mut filters = vec![];
filters.push(format!("client_api={}", level));
std::env::set_var("RUST_LOG", filters.join(","));
let subscriber = Subscriber::builder()
.with_ansi(true)
.with_env_filter(EnvFilter::from_default_env())
.finish();
subscriber.try_init().unwrap();
});
}

View file

@ -6,7 +6,9 @@ use collab::core::collab_state::SyncState;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::Collab;
use collab_entity::CollabType;
use database_entity::{AFAccessLevel, AFRole, InsertCollabMemberParams, QueryCollabParams};
use database_entity::{
AFAccessLevel, AFRole, InsertCollabMemberParams, QueryCollabParams, UpdateCollabMemberParams,
};
use serde_json::Value;
use shared_entity::dto::workspace_dto::CreateWorkspaceMember;
use sqlx::types::Uuid;
@ -14,10 +16,10 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{timeout, Duration};
use tokio_stream::StreamExt;
use tracing::debug;
use crate::localhost_client;
use crate::user::utils::{generate_unique_registered_user, User};
use crate::{localhost_client, setup_log};
use crate::util::setup_log;
pub(crate) struct TestClient {
pub ws_client: WSClient,
@ -39,7 +41,6 @@ impl TestClient {
Self::new(device_id, registered_user).await
}
#[allow(dead_code)]
pub(crate) async fn add_client_as_workspace_member(
&self,
workspace_id: &str,
@ -47,7 +48,6 @@ impl TestClient {
role: AFRole,
) {
let profile = other_client.api_client.get_profile().await.unwrap();
debug!("profile: {:?}", profile);
let email = profile.email.unwrap();
self
.api_client
@ -82,6 +82,32 @@ impl TestClient {
.unwrap();
}
pub(crate) async fn update_collab_member_access_level(
&self,
workspace_id: &str,
object_id: &str,
other_client: &TestClient,
access_level: AFAccessLevel,
) {
let uid = other_client
.api_client
.get_profile()
.await
.unwrap()
.uid
.unwrap();
self
.api_client
.update_collab_member(UpdateCollabMemberParams {
uid,
workspace_id: workspace_id.to_string(),
object_id: object_id.to_string(),
access_level,
})
.await
.unwrap();
}
pub(crate) async fn user_with_new_device(registered_user: User) -> Self {
let device_id = Uuid::new_v4().to_string();
Self::new(device_id, registered_user).await
@ -114,6 +140,12 @@ impl TestClient {
}
pub(crate) async fn wait_object_sync_complete(&self, object_id: &str) {
self
.wait_object_sync_complete_with_secs(object_id, 20)
.await;
}
pub(crate) async fn wait_object_sync_complete_with_secs(&self, object_id: &str, secs: u64) {
let mut sync_state = self
.collab_by_object_id
.get(object_id)
@ -122,8 +154,8 @@ impl TestClient {
.lock()
.subscribe_sync_state();
const TIMEOUT_DURATION: Duration = Duration::from_secs(20);
while let Ok(Some(state)) = timeout(TIMEOUT_DURATION, sync_state.next()).await {
let duration = Duration::from_secs(secs);
while let Ok(Some(state)) = timeout(duration, sync_state.next()).await {
if state == SyncState::SyncFinished {
break;
}
@ -195,6 +227,12 @@ impl TestClient {
object_id
}
pub(crate) async fn open_workspace(&mut self, workspace_id: &str) {
self
.open_collab(workspace_id, workspace_id, CollabType::Folder)
.await;
}
#[allow(clippy::await_holding_lock)]
pub(crate) async fn open_collab(
&mut self,
@ -247,7 +285,7 @@ impl TestClient {
}
}
pub async fn assert_remote_collab(
pub async fn assert_server_collab(
workspace_id: &str,
client: &mut client_api::Client,
object_id: &str,
@ -274,11 +312,13 @@ pub async fn assert_remote_collab(
Ok(data) => {
let json = Collab::new_with_raw_data(CollabOrigin::Empty, &object_id, vec![data.to_vec()], vec![]).unwrap().to_json_value();
if retry_count > 10 {
dbg!(workspace_id, object_id);
assert_json_eq!(json, expected);
break;
break;
}
if json == expected {
dbg!(workspace_id, object_id);
break;
}
tokio::time::sleep(Duration::from_millis(1000)).await;

1
tests/websocket/mod.rs Normal file
View file

@ -0,0 +1 @@
mod connect;

View file

@ -1 +1 @@
mod member;
mod member_crud;