refactor: reduce select all workspace memebers call (#216)

* refactor: reduce select all workspace memebers call

* chore: docker build

* refactor: move tests

* chore: expose enforcer on tests

* refactor: update workspace member when recv pg notification

* chore: update test

* chore: commit sqlx file
This commit is contained in:
Nathan.fooo 2023-12-17 02:46:05 +08:00 committed by GitHub
parent 377d7ad8f7
commit b39621e389
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 1563 additions and 1086 deletions

View file

@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM public.af_permissions WHERE id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "access_level",
"type_info": "Int4"
},
{
"ordinal": 3,
"name": "description",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Int4"
]
},
"nullable": [
false,
false,
false,
true
]
},
"hash": "2593b975fcf2dcf0129a1390fd8e2888d440e07c904d7eb3ca14957be8bc6069"
}

View file

@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT p.id, p.name, p.access_level, p.description FROM af_permissions p\n JOIN af_role_permissions rp ON p.id = rp.permission_id\n WHERE rp.role_id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "access_level",
"type_info": "Int4"
},
{
"ordinal": 3,
"name": "description",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Int4"
]
},
"nullable": [
false,
false,
false,
true
]
},
"hash": "f05042dd22f862603e63f63d47b93e579545c79cabe15d32304a47ca7665a55f"
}

107
Cargo.lock generated
View file

@ -736,6 +736,17 @@ dependencies = [
"webpki-roots 0.22.6",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -1136,6 +1147,9 @@ dependencies = [
"rhai",
"ritelinked",
"serde",
"slog",
"slog-async",
"slog-term",
"thiserror",
"tokio",
]
@ -1855,6 +1869,16 @@ dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-next"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
dependencies = [
"cfg-if",
"dirs-sys-next",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
@ -1866,6 +1890,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "dirs-sys-next"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "displaydoc"
version = "0.2.4"
@ -2386,6 +2421,15 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.2"
@ -2684,7 +2728,7 @@ version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.2",
"rustix",
"windows-sys",
]
@ -3156,7 +3200,16 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.2",
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
@ -4745,6 +4798,37 @@ dependencies = [
"autocfg",
]
[[package]]
name = "slog"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06"
[[package]]
name = "slog-async"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c8038f898a2c79507940990f05386455b3a317d8f18d4caea7cbc3d5096b84"
dependencies = [
"crossbeam-channel",
"slog",
"take_mut",
"thread_local",
]
[[package]]
name = "slog-term"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87d29185c55b7b258b4f120eab00f48557d4d9bc814f41713f449d35b0f8977c"
dependencies = [
"atty",
"slog",
"term",
"thread_local",
"time",
]
[[package]]
name = "smallstr"
version = "0.3.0"
@ -5161,6 +5245,12 @@ dependencies = [
"libc",
]
[[package]]
name = "take_mut"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tap"
version = "1.0.1"
@ -5191,6 +5281,17 @@ dependencies = [
"utf-8",
]
[[package]]
name = "term"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f"
dependencies = [
"dirs-next",
"rustversion",
"winapi",
]
[[package]]
name = "thiserror"
version = "1.0.48"
@ -5240,6 +5341,8 @@ checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48"
dependencies = [
"deranged",
"itoa",
"libc",
"num_threads",
"serde",
"time-core",
"time-macros",

View file

@ -66,7 +66,7 @@ axum_session = "0.7.0"
uuid = "1.4.1"
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
prost = "0.12.1"
casbin = "2.0.9"
casbin = { version = "2.0.9", features = ["logging"] }
# collab
collab = { version = "0.1.0", features = ["async-plugin"] }
@ -145,6 +145,7 @@ bincode = "1.3.3"
lto = true
opt-level = 3
codegen-units = 1
debug = true
[profile.profiling]
inherits = "release"

View file

@ -222,6 +222,12 @@ impl From<i32> for AFRole {
}
}
impl From<i64> for AFRole {
fn from(value: i64) -> Self {
Self::from(value as i32)
}
}
impl From<Option<i32>> for AFRole {
fn from(value: Option<i32>) -> Self {
match value {
@ -291,7 +297,7 @@ impl From<i32> for AFAccessLevel {
30 => AFAccessLevel::ReadAndWrite,
50 => AFAccessLevel::FullAccess,
_ => {
error!("Invalid role id: {}", value);
error!("Invalid access level: {}", value);
AFAccessLevel::ReadOnly
},
}

View file

@ -1,4 +1,4 @@
use crate::dto::AFRole;
use crate::dto::{AFAccessLevel, AFRole};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
@ -82,3 +82,11 @@ pub struct AFBlobMetadataRow {
pub struct AFUserNotification {
pub payload: Option<AFUserRow>,
}
#[derive(FromRow, Debug, Clone)]
pub struct AFPermissionRow {
pub id: i32,
pub name: String,
pub access_level: AFAccessLevel,
pub description: Option<String>,
}

View file

@ -382,7 +382,7 @@ pub async fn delete_collab_member(uid: i64, oid: &str, pg_pool: &PgPool) -> Resu
Ok(())
}
#[inline]
#[instrument(level = "info", skip_all, err)]
pub async fn select_all_collab_members(
pg_pool: &PgPool,
) -> Result<Vec<(String, Vec<AFCollabMember>)>, AppError> {
@ -446,7 +446,6 @@ pub async fn select_collab_member(
Ok(member)
}
#[instrument(level = "trace", skip(row), err)]
fn collab_member_try_from_row(row: PgRow) -> Result<AFCollabMember, sqlx::Error> {
let access_level = AFAccessLevel::from(row.try_get::<i32, _>(4)?);
let permission = AFPermission {

View file

@ -34,7 +34,11 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static {
) -> Result<(), AppError>;
/// Returns the role of the user in the workspace.
async fn get_user_role(&self, uid: &i64, workspace_id: &str) -> Result<AFRole, AppError>;
async fn get_user_workspace_role(
&self,
uid: &i64,
workspace_id: &str,
) -> Result<AFRole, AppError>;
}
/// Represents a storage mechanism for collaborations.

View file

@ -9,7 +9,9 @@ use tracing::{event, instrument};
use crate::user::select_uid_from_email;
use app_error::AppError;
use database_entity::pg_row::{AFUserProfileRow, AFWorkspaceMemberRow, AFWorkspaceRow};
use database_entity::pg_row::{
AFPermissionRow, AFUserProfileRow, AFWorkspaceMemberRow, AFWorkspaceRow,
};
/// Checks whether a user, identified by a UUID, is an 'Owner' of a workspace, identified by its
/// workspace_id.
@ -390,3 +392,37 @@ pub async fn select_all_user_workspaces(
.await?;
Ok(workspaces)
}
pub async fn select_permission(
pool: &PgPool,
permission_id: &i64,
) -> Result<Option<AFPermissionRow>, AppError> {
let permission = sqlx::query_as!(
AFPermissionRow,
r#"
SELECT * FROM public.af_permissions WHERE id = $1
"#,
*permission_id as i32
)
.fetch_optional(pool)
.await?;
Ok(permission)
}
pub async fn select_permission_from_role_id(
pool: &PgPool,
role_id: &i64,
) -> Result<Option<AFPermissionRow>, AppError> {
let permission = sqlx::query_as!(
AFPermissionRow,
r#"
SELECT p.id, p.name, p.access_level, p.description FROM af_permissions p
JOIN af_role_permissions rp ON p.id = rp.permission_id
WHERE rp.role_id = $1
"#,
*role_id as i32
)
.fetch_optional(pool)
.await?;
Ok(permission)
}

View file

@ -5,6 +5,7 @@ use app_error::AppError;
use async_trait::async_trait;
use crate::collaborate::{CollabAccessControl, CollabUserId};
use anyhow::anyhow;
use collab::core::awareness::Awareness;
use collab::core::collab::TransactionMutExt;
use collab::core::collab_plugin::EncodedCollabV1;
@ -17,7 +18,7 @@ use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::time::interval;
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, instrument, trace};
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::Encode;
use yrs::{ReadTxn, StateVector, Transact, Update};
@ -36,6 +37,7 @@ impl<S, U, AC> CollabStoragePlugin<S, U, AC>
where
S: CollabStorage,
U: RealtimeUser,
AC: CollabAccessControl,
{
pub fn new(
uid: i64,
@ -60,6 +62,47 @@ where
spawn_period_check(&plugin);
plugin
}
#[instrument(level = "trace", skip_all)]
async fn insert_new_collab(&self, doc: &Doc, object_id: &str) -> Result<(), AppError> {
debug!(
"create new collab, cache full access of {} for user:{}",
object_id, self.uid
);
match encoded_v1_from_doc(doc).encode_to_bytes() {
Ok(encoded_collab_v1) => {
let _ = self
.access_control
.cache_collab_access_level(
CollabUserId::from(&self.uid),
object_id,
AFAccessLevel::FullAccess,
)
.await;
let params = InsertCollabParams::from_raw_data(
object_id.to_string(),
self.collab_type.clone(),
encoded_collab_v1,
&self.workspace_id,
);
self
.storage
.insert_collab(&self.uid, params)
.await
.map_err(|err| {
error!("fail to create new collab in plugin: {:?}", err);
err
})
},
Err(err) => Err(AppError::Internal(anyhow!(
"fail to encode doc to bytes: {:?}",
err
))),
}
}
}
/// Spawns an asynchronous task to periodically check and perform flush operations for collaboration groups.
@ -135,39 +178,11 @@ where
},
Err(err) => match &err {
AppError::RecordNotFound(_) => {
debug!(
"create new collab, cache full access of {} for user:{}",
object_id, self.uid
);
let _ = self
.access_control
.cache_collab_access_level(
CollabUserId::from(&self.uid),
object_id,
AFAccessLevel::FullAccess,
)
.await;
//
match encoded_v1_from_doc(doc).encode_to_bytes() {
Ok(encoded_collab_v1) => {
let params = InsertCollabParams::from_raw_data(
object_id.to_string(),
self.collab_type.clone(),
encoded_collab_v1,
&self.workspace_id,
);
if let Err(err) = self.storage.insert_collab(&self.uid, params).await {
error!("fail to create new collab in plugin: {:?}", err);
}
},
Err(err) => {
error!("fail to encode EncodedCollabV1 to bytes: {:?}", err);
},
if let Err(err) = self.insert_new_collab(doc, object_id).await {
error!("Insert collab {:?}", err);
}
},
_ => error!("{:?}", err),
_ => error!("{}", err),
},
}
}
@ -192,23 +207,31 @@ where
}
fn flush(&self, object_id: &str, doc: &Doc) {
let encoded_collab_v1 = match encoded_v1_from_doc(doc).encode_to_bytes() {
Ok(data) => data,
Err(err) => {
error!("Error encoding: {:?}", err);
return;
},
};
let params = InsertCollabParams::from_raw_data(
object_id.to_string(),
self.collab_type.clone(),
encoded_collab_v1,
&self.workspace_id,
);
let storage = self.storage.clone();
let uid = self.uid;
let workspace_id = self.workspace_id.clone();
let collab_type = self.collab_type.clone();
let object_id = object_id.to_string();
if let Ok(encoded_collab_v1) = encoded_v1_from_doc(doc).encode_to_bytes() {
let params =
InsertCollabParams::from_raw_data(object_id, collab_type, encoded_collab_v1, &workspace_id);
let object_id = params.object_id.clone();
tokio::spawn(async move {
match storage.insert_collab(&uid, params).await {
Ok(_) => info!("[realtime] flushing collab: {}", object_id),
Err(err) => error!("save collab failed: {:?}", err),
}
});
}
tokio::spawn(async move {
info!("[realtime] Flushed collab: {}", params.object_id);
match storage.insert_collab(&uid, params).await {
Ok(_) => {},
Err(err) => error!("Failed to save collab: {:?}", err),
}
});
}
}

View file

@ -7,8 +7,9 @@ use casbin::{CoreApi, MgmtApi};
use database::user::select_uid_from_uuid;
use sqlx::PgPool;
use tokio::sync::{broadcast, RwLock};
use tracing::instrument;
use tracing::log::warn;
use tracing::{event, instrument};
use uuid::Uuid;
use crate::biz::{
@ -19,8 +20,10 @@ use crate::biz::{
},
};
use app_error::AppError;
use database::workspace::select_permission;
use database_entity::dto::{AFAccessLevel, AFCollabMember, AFRole};
use crate::biz::casbin::enforcer_ext::{enforcer_remove, enforcer_update};
use realtime::collaborate::{CollabAccessControl, CollabUserId};
use super::{
@ -63,7 +66,7 @@ impl CasbinAccessControl {
) -> Self {
let enforcer = Arc::new(RwLock::new(enforcer));
spawn_listen_on_workspace_member_change(workspace_listener, enforcer.clone());
spawn_listen_on_collab_member_change(collab_listener, enforcer.clone());
spawn_listen_on_collab_member_change(pg_pool.clone(), collab_listener, enforcer.clone());
Self { pg_pool, enforcer }
}
pub fn new_collab_access_control(&self) -> CasbinCollabAccessControl {
@ -78,73 +81,24 @@ impl CasbinAccessControl {
}
}
/// Update permission for a user.
///
/// [`ObjectType::Workspace`] has to be paired with [`ActionType::Role`],
/// [`ObjectType::Collab`] has to be paired with [`ActionType::Level`],
#[instrument(level = "trace", skip(self, obj, act), err)]
async fn update(
/// Only expose this method for testing
#[cfg(debug_assertions)]
pub fn get_enforcer(&self) -> &Arc<RwLock<casbin::Enforcer>> {
&self.enforcer
}
pub async fn update(
&self,
uid: &i64,
obj: &ObjectType<'_>,
act: &ActionType,
) -> Result<bool, AppError> {
let (obj_id, action) = match (obj, act) {
(ObjectType::Workspace(_), ActionType::Role(role)) => {
Ok((obj.to_string(), i32::from(role.clone()).to_string()))
},
(ObjectType::Collab(_), ActionType::Level(level)) => {
Ok((obj.to_string(), i32::from(*level).to_string()))
},
_ => Err(AppError::Internal(anyhow!(
"invalid object type and action type combination: object={:?}, action={:?}",
obj,
act
))),
}?;
let mut enforcer = self.enforcer.write().await;
let current_policy = enforcer
.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![obj_id.clone()])
.into_iter()
.find(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string());
if let Some(current_policy) = current_policy {
enforcer
.remove_policy(current_policy)
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error removing policy: {e:?}")))?;
}
event!(
tracing::Level::TRACE,
"updating policy: object={}, user={},action={}",
obj_id,
uid,
action
);
enforcer
.add_policy(vec![uid.to_string(), obj_id, action])
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error adding policy: {e:?}")))
enforcer_update(&self.enforcer, uid, obj, act).await
}
async fn remove(&self, uid: &i64, obj: &ObjectType<'_>) -> Result<bool, AppError> {
let obj_id = obj.to_string();
pub async fn remove(&self, uid: &i64, obj: &ObjectType<'_>) -> Result<bool, AppError> {
let mut enforcer = self.enforcer.write().await;
let policies = enforcer.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![obj_id]);
let rem = policies
.into_iter()
.filter(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string())
.collect();
enforcer
.remove_policies(rem)
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error enforce: {e:?}")))
enforcer_remove(&mut enforcer, uid, obj).await
}
/// Get uid which is used for all operations.
@ -172,6 +126,7 @@ impl CasbinAccessControl {
}
fn spawn_listen_on_collab_member_change(
pg_pool: PgPool,
mut listener: broadcast::Receiver<CollabMemberNotification>,
enforcer: Arc<RwLock<casbin::Enforcer>>,
) {
@ -179,23 +134,33 @@ fn spawn_listen_on_collab_member_change(
while let Ok(change) = listener.recv().await {
match change.action_type {
CollabMemberAction::INSERT | CollabMemberAction::UPDATE => {
if let (Some(_), Some(_)) = (change.new_oid(), change.new_uid()) {
if let Err(err) = enforcer.write().await.load_policy().await {
warn!(
"Failed to reload the collab member status from db: {:?}, error: {}",
change, err
);
if let Some(member_row) = change.new {
if let Ok(Some(row)) = select_permission(&pg_pool, &member_row.permission_id).await {
if let Err(err) = enforcer_update(
&enforcer,
&member_row.uid,
&ObjectType::Collab(&member_row.oid),
&ActionType::Level(row.access_level),
)
.await
{
warn!(
"Failed to update the user:{} collab{} access control, error: {}",
member_row.uid, member_row.oid, err
);
}
}
} else {
warn!("The oid or uid is None")
warn!("The new collab member is None")
}
},
CollabMemberAction::DELETE => {
if let (Some(_), Some(_)) = (change.old_oid(), change.old_uid()) {
if let Err(err) = enforcer.write().await.load_policy().await {
if let (Some(oid), Some(uid)) = (change.old_oid(), change.old_uid()) {
let mut enforcer = enforcer.write().await;
if let Err(err) = enforcer_remove(&mut enforcer, uid, &ObjectType::Collab(oid)).await {
warn!(
"Failed to reload the collab member status from db: {:?}, error: {}",
change, err
"Failed to remove the user:{} collab{} access control, error: {}",
uid, oid, err
);
}
} else {
@ -218,17 +183,37 @@ fn spawn_listen_on_workspace_member_change(
None => {
warn!("The workspace member change can't be None when the action is INSERT or UPDATE")
},
Some(_) => {
if let Err(err) = enforcer.write().await.load_policy().await {
warn!("Failed to reload workspace member status from db: {}", err);
Some(member_row) => {
if let Err(err) = enforcer_update(
&enforcer,
&member_row.uid,
&ObjectType::Workspace(&member_row.workspace_id.to_string()),
&ActionType::Role(AFRole::from(member_row.role_id)),
)
.await
{
warn!(
"Failed to update the user:{} workspace:{} access control, error: {}",
member_row.uid, member_row.workspace_id, err
);
}
},
},
WorkspaceMemberAction::DELETE => match change.old {
None => warn!("The workspace member change can't be None when the action is DELETE"),
Some(_) => {
if let Err(err) = enforcer.write().await.load_policy().await {
warn!("Failed to reload workspace member status from db: {}", err);
Some(member_row) => {
let mut enforcer = enforcer.write().await;
if let Err(err) = enforcer_remove(
&mut enforcer,
&member_row.uid,
&ObjectType::Workspace(&member_row.workspace_id.to_string()),
)
.await
{
warn!(
"Failed to remove the user:{} workspace: {} access control, error: {}",
member_row.uid, member_row.workspace_id, err
);
}
},
},
@ -243,6 +228,7 @@ pub struct CasbinCollabAccessControl {
}
impl CasbinCollabAccessControl {
#[instrument(level = "trace", skip_all)]
pub async fn update_member(&self, uid: &i64, oid: &str, access_level: AFAccessLevel) {
let _ = self
.casbin_access_control
@ -309,6 +295,7 @@ impl CollabAccessControl for CasbinCollabAccessControl {
)))
}
#[instrument(level = "trace", skip_all)]
async fn cache_collab_access_level(
&self,
user: CollabUserId<'_>,
@ -412,7 +399,7 @@ impl WorkspaceAccessControl for CasbinWorkspaceAccessControl {
self.get_role_from_uid(&uid, workspace_id).await
}
async fn get_role_from_uid(&self, uid: &i64, workspace_id: &Uuid) -> Result<AFRole, AppError> {
let policies_future = self
let policies = self
.casbin_access_control
.enforcer
.read()
@ -422,7 +409,7 @@ impl WorkspaceAccessControl for CasbinWorkspaceAccessControl {
vec![ObjectType::Workspace(&workspace_id.to_string()).to_string()],
);
let role = match policies_future
let role = match policies
.into_iter()
.find(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string())
{
@ -444,6 +431,7 @@ impl WorkspaceAccessControl for CasbinWorkspaceAccessControl {
})
}
#[instrument(level = "trace", skip_all)]
async fn update_member(
&self,
uid: &i64,
@ -469,591 +457,3 @@ impl WorkspaceAccessControl for CasbinWorkspaceAccessControl {
Ok(())
}
}
#[cfg(test)]
mod tests {
use anyhow::Context;
use casbin::{DefaultModel, Enforcer};
use shared_entity::dto::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset};
use crate::biz::{
self,
casbin::{adapter::PgAdapter, tests},
pg_listener::PgListeners,
};
use super::*;
#[sqlx::test(migrations = false)]
async fn test_workspace_access_control_get_role(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_workspace_access_control();
let user = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
assert_eq!(
AFRole::Owner,
access_control
.get_role_from_uuid(&user.uuid, &workspace.workspace_id)
.await?
);
assert_eq!(
AFRole::Owner,
access_control
.get_role_from_uid(&user.uid, &workspace.workspace_id)
.await?
);
let member = tests::create_user(&pool).await?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&member.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: member.email.clone(),
role: AFRole::Member,
}],
)
.await
.context("adding users to workspace")?;
assert_eq!(
AFRole::Member,
access_control
.get_role_from_uuid(&member.uuid, &workspace.workspace_id)
.await?
);
assert_eq!(
AFRole::Member,
access_control
.get_role_from_uid(&member.uid, &workspace.workspace_id)
.await?
);
// wait for update message
let mut workspace_listener = listeners.subscribe_workspace_member_change();
biz::workspace::ops::update_workspace_member(
&pool,
&workspace.workspace_id,
&WorkspaceMemberChangeset {
email: member.email.clone(),
role: Some(AFRole::Guest),
name: None,
},
)
.await
.context("update user workspace role")?;
let _ = workspace_listener.recv().await;
assert_eq!(
AFRole::Guest,
access_control
.get_role_from_uid(&member.uid, &workspace.workspace_id)
.await?
);
// wait for delete message
let mut workspace_listener = listeners.subscribe_workspace_member_change();
biz::workspace::ops::remove_workspace_members(
&user.uuid,
&pool,
&workspace.workspace_id,
&[member.email.clone()],
)
.await
.context("removing users from workspace")?;
let _ = workspace_listener.recv().await;
assert!(access_control
.get_role_from_uid(&member.uid, &workspace.workspace_id)
.await
.expect_err("user should not be part of workspace")
.is_not_enough_permissions());
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_get_access_level(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = tests::create_user(&pool).await?;
let owner = tests::create_user(&pool).await?;
let member = tests::create_user(&pool).await?;
let guest = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let members = vec![
CreateWorkspaceMember {
email: owner.email.clone(),
role: AFRole::Owner,
},
CreateWorkspaceMember {
email: member.email.clone(),
role: AFRole::Member,
},
CreateWorkspaceMember {
email: guest.email.clone(),
role: AFRole::Guest,
},
];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(
CollabUserId::UserUuid(&user.uuid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(
CollabUserId::UserId(&owner.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::ReadAndWrite,
access_control
.get_collab_access_level(
CollabUserId::UserId(&member.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::ReadOnly,
access_control
.get_collab_access_level(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
// wait for update message
let mut collab_listener = listeners.subscribe_collab_member_change();
let mut txn = pool
.begin()
.await
.context("acquire transaction to update collab member")?;
database::collab::upsert_collab_member_with_txn(
guest.uid,
&workspace.workspace_id.to_string(),
&AFAccessLevel::ReadAndComment,
&mut txn,
)
.await?;
txn
.commit()
.await
.expect("commit transaction to update collab member");
let _ = collab_listener.recv().await;
assert_eq!(
AFAccessLevel::ReadAndComment,
access_control
.get_collab_access_level(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
// wait for delete message
let mut collab_listener = listeners.subscribe_collab_member_change();
database::collab::delete_collab_member(guest.uid, &workspace.workspace_id.to_string(), &pool)
.await
.context("delete collab member")?;
let _ = collab_listener.recv().await;
assert!(access_control
.get_collab_access_level(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string()
)
.await
.expect_err("user should not be part of collab")
.is_record_not_found());
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = tests::create_user(&pool).await?;
let guest = tests::create_user(&pool).await?;
let stranger = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::GET
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::POST
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::PUT
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::DELETE
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
"new collab oid",
&Method::POST
)
.await?,
"should have access to non-existent collab oid"
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
&Method::GET
)
.await?,
"guest should have read access"
);
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
&Method::POST
)
.await?,
"guest should not have write access"
);
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&stranger.uid),
&workspace.workspace_id.to_string(),
&Method::GET
)
.await?,
"stranger should not have read access"
);
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&stranger.uid),
&workspace.workspace_id.to_string(),
&Method::POST
)
.await?,
"stranger should not have write access"
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_send_receive_collab_update(
pool: PgPool,
) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = tests::create_user(&pool).await?;
let guest = tests::create_user(&pool).await?;
let stranger = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
assert!(
access_control
.can_send_collab_update(&user.uid, &workspace.workspace_id.to_string())
.await?
);
assert!(
access_control
.can_receive_collab_update(&user.uid, &workspace.workspace_id.to_string())
.await?
);
assert!(
!access_control
.can_send_collab_update(&guest.uid, &workspace.workspace_id.to_string())
.await?,
"guest cannot send collab update"
);
assert!(
access_control
.can_receive_collab_update(&guest.uid, &workspace.workspace_id.to_string())
.await?,
"guest can receive collab update"
);
assert!(
!access_control
.can_send_collab_update(&stranger.uid, &workspace.workspace_id.to_string())
.await?,
"stranger cannot send collab update"
);
assert!(
!access_control
.can_receive_collab_update(&stranger.uid, &workspace.workspace_id.to_string())
.await?,
"stranger cannot receive collab update"
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_cache_collab_access_level(
pool: PgPool,
) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let uid = 123;
let oid = "collab::oid".to_owned();
access_control
.cache_collab_access_level(CollabUserId::UserId(&uid), &oid, AFAccessLevel::FullAccess)
.await?;
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(CollabUserId::UserId(&uid), &oid)
.await?
);
access_control
.cache_collab_access_level(CollabUserId::UserId(&uid), &oid, AFAccessLevel::ReadOnly)
.await?;
assert_eq!(
AFAccessLevel::ReadOnly,
access_control
.get_collab_access_level(CollabUserId::UserId(&uid), &oid)
.await?
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_casbin_access_control_update_remove(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let uid = 123;
assert!(
access_control
.update(
&uid,
&ObjectType::Workspace("123"),
&ActionType::Role(AFRole::Owner)
)
.await?
);
assert!(access_control.enforcer.read().await.enforce((
uid.to_string(),
ObjectType::Workspace("123").to_string(),
i32::from(AFRole::Owner).to_string(),
))?);
assert!(
access_control
.remove(&uid, &ObjectType::Workspace("123"))
.await?
);
assert!(!access_control.enforcer.read().await.enforce((
uid.to_string(),
ObjectType::Workspace("123").to_string(),
i32::from(AFRole::Owner).to_string(),
))?);
Ok(())
}
}

View file

@ -179,277 +179,3 @@ impl Adapter for PgAdapter {
Ok(true)
}
}
#[cfg(test)]
mod tests {
use anyhow::{anyhow, Context};
use casbin::{CoreApi, DefaultModel, Enforcer};
use database_entity::dto::AFRole;
use shared_entity::dto::workspace_dto::CreateWorkspaceMember;
use crate::biz;
use crate::biz::casbin::tests;
use super::*;
#[sqlx::test(migrations = false)]
async fn test_create_user(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let user = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Workspace(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFRole::Owner).to_string(),
))
.context("user should be owner of its workspace")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::FullAccess).to_string(),
))
.context("user should have full access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should be able to delete its collab")?);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_add_users_to_workspace(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let user_main = tests::create_user(&pool).await?;
let user_owner = tests::create_user(&pool).await?;
let user_member = tests::create_user(&pool).await?;
let user_guest = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user_main.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let members = vec![
CreateWorkspaceMember {
email: user_owner.email.clone(),
role: AFRole::Owner,
},
CreateWorkspaceMember {
email: user_member.email.clone(),
role: AFRole::Member,
},
CreateWorkspaceMember {
email: user_guest.email.clone(),
role: AFRole::Guest,
},
];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_main.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
{
// Owner
let user = user_owner;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::FullAccess).to_string(),
))
.context("owner should have full access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should be able to delete its collab")?);
}
{
// Member
let user = user_member;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should have read write access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should not be able to delete its collab")?);
}
{
// Guest
let user = user_guest;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadOnly).to_string(),
))
.context("guest should have read only access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should not be able to read its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should not be able to write its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should not be able to delete its collab")?);
}
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_reload_policy_after_adding_user_to_workspace(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let user_owner = tests::create_user(&pool).await?;
let user_member = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user_owner.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
// Create enforcer before adding user to workspace
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let mut enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let members = vec![CreateWorkspaceMember {
email: user_member.email.clone(),
role: AFRole::Member,
}];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_owner.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
assert!(!enforcer
.enforce((
user_member.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should not have read write access to collab before reload")?);
enforcer.load_policy().await?;
assert!(enforcer
.enforce((
user_member.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should have read write access to collab")?);
Ok(())
}
}

View file

@ -0,0 +1,81 @@
use crate::biz::casbin::{
ActionType, ObjectType, POLICY_FIELD_INDEX_OBJECT, POLICY_FIELD_INDEX_USER,
};
use anyhow::anyhow;
use app_error::AppError;
use casbin::{Enforcer, MgmtApi};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{event, instrument};
/// Update permission for a user.
///
/// [`ObjectType::Workspace`] has to be paired with [`ActionType::Role`],
/// [`ObjectType::Collab`] has to be paired with [`ActionType::Level`],
#[inline]
#[instrument(level = "trace", skip(enforcer, obj, act), err)]
pub(crate) async fn enforcer_update(
enforcer: &Arc<RwLock<casbin::Enforcer>>,
uid: &i64,
obj: &ObjectType<'_>,
act: &ActionType,
) -> Result<bool, AppError> {
let (obj_id, action) = match (obj, act) {
(ObjectType::Workspace(_), ActionType::Role(role)) => {
Ok((obj.to_string(), i32::from(role.clone()).to_string()))
},
(ObjectType::Collab(_), ActionType::Level(level)) => {
Ok((obj.to_string(), i32::from(*level).to_string()))
},
_ => Err(AppError::Internal(anyhow!(
"invalid object type and action type combination: object={:?}, action={:?}",
obj,
act
))),
}?;
let mut enforcer = enforcer.write().await;
enforcer_remove(&mut enforcer, uid, obj).await?;
event!(
tracing::Level::INFO,
"updating policy: object={}, user={},action={}",
obj_id,
uid,
action
);
enforcer
.add_policy(vec![uid.to_string(), obj_id, action])
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error adding policy: {e:?}")))
}
#[inline]
#[instrument(level = "trace", skip(enforcer, uid, obj), err)]
pub(crate) async fn enforcer_remove(
enforcer: &mut Enforcer,
uid: &i64,
obj: &ObjectType<'_>,
) -> Result<bool, AppError> {
let obj_id = obj.to_string();
let policies = enforcer.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![obj_id]);
let rem = policies
.into_iter()
.filter(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string())
.collect::<Vec<_>>();
if rem.is_empty() {
return Ok(false);
}
event!(
tracing::Level::INFO,
"removing policy: object={}, user={}, policies={:?}",
obj.to_string(),
uid,
rem
);
enforcer
.remove_policies(rem)
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error enforce: {e:?}")))
}

View file

@ -2,7 +2,7 @@ use database_entity::dto::{AFAccessLevel, AFRole};
pub mod access_control;
pub mod adapter;
mod enforcer_ext;
pub const MODEL_CONF: &str = r###"
[request_definition]
r = sub, obj, act
@ -40,7 +40,7 @@ const GROUPING_FIELD_INDEX_ACTION: usize = 1;
/// Represents the object type that is stored in the access control policy.
#[derive(Debug)]
enum ObjectType<'id> {
pub enum ObjectType<'id> {
/// Stored as `workspace::<uuid>`
Workspace(&'id str),
/// Stored as `collab::<uuid>`
@ -58,14 +58,14 @@ impl ToString for ObjectType<'_> {
/// Represents the action type that is stored in the access control policy.
#[derive(Debug)]
enum ActionType {
pub enum ActionType {
Role(AFRole),
Level(AFAccessLevel),
}
/// Represents the actions that can be performed on objects.
#[derive(Debug)]
enum Action {
pub enum Action {
Read,
Write,
Delete,
@ -80,82 +80,3 @@ impl ToString for Action {
}
}
}
#[cfg(test)]
mod tests {
use anyhow::Context;
use lazy_static::lazy_static;
use snowflake::Snowflake;
use sqlx::PgPool;
use tokio::sync::RwLock;
use uuid::Uuid;
pub const MODEL_CONF: &str = r#"
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _ # role to action
g2 = _, _ # worksheet to collab
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = r.sub == p.sub && g2(p.obj, r.obj) && g(p.act, r.act)
"#;
lazy_static! {
pub static ref ID_GEN: RwLock<Snowflake> = RwLock::new(Snowflake::new(1));
}
pub async fn setup_db(pool: &PgPool) -> anyhow::Result<()> {
// Have to manually manage schema and tables managed by gotrue but referenced by our
// migration scripts.
// Create schema and tables
sqlx::query(r#"create schema auth"#).execute(pool).await?;
sqlx::query(
r#"create table auth.users(
id uuid NOT NULL UNIQUE,
deleted_at timestamptz null,
CONSTRAINT users_pkey PRIMARY KEY (id)
)"#,
)
.execute(pool)
.await?;
// Manually run migration after creating required objects above.
sqlx::migrate!().run(pool).await?;
// Remove foreign key constraint
sqlx::query(r#"alter table public.af_user drop constraint af_user_email_foreign_key"#)
.execute(pool)
.await?;
Ok(())
}
#[derive(Debug, Clone)]
pub struct User {
pub uid: i64,
pub uuid: Uuid,
pub email: String,
}
pub async fn create_user(pool: &PgPool) -> anyhow::Result<User> {
// Create user and workspace
let uid = ID_GEN.write().await.next_id();
let uuid = Uuid::new_v4();
let email = format!("{}@appflowy.io", uuid);
let name = uuid.to_string();
database::user::create_user(pool, uid, &uuid, &email, &name)
.await
.context("create user")?;
Ok(User { uid, uuid, email })
}
}

View file

@ -316,7 +316,6 @@ where
CollabAC: CollabAccessControl,
WorkspaceAC: WorkspaceAccessControl,
{
#[instrument(level = "debug", skip(self))]
async fn get_collab_access_level(&self, uid: &i64, oid: &str) -> Result<AFAccessLevel, AppError> {
self
.collab_access_control
@ -336,7 +335,11 @@ where
.await
}
async fn get_user_role(&self, uid: &i64, workspace_id: &str) -> Result<AFRole, AppError> {
async fn get_user_workspace_role(
&self,
uid: &i64,
workspace_id: &str,
) -> Result<AFRole, AppError> {
self
.workspace_access_control
.get_role_from_uid(uid, &workspace_id.parse()?)

View file

@ -31,8 +31,6 @@ where
params.object_id
)));
}
upsert_collab(pg_pool, user_uuid, params).await?;
collab_access_control
.cache_collab_access_level(
CollabUserId::UserUuid(user_uuid),
@ -40,6 +38,7 @@ where
AFAccessLevel::FullAccess,
)
.await?;
upsert_collab(pg_pool, user_uuid, params).await?;
Ok(())
}

View file

@ -116,7 +116,7 @@ where
// If the user is the owner or member of the workspace, the user can create collab.
let role = self
.access_control
.get_user_role(uid, &params.workspace_id)
.get_user_workspace_role(uid, &params.workspace_id)
.await?;
event!(
tracing::Level::TRACE,

View file

@ -3,4 +3,4 @@ pub mod collab;
pub mod pg_listener;
pub mod user;
pub mod utils;
pub(crate) mod workspace;
pub mod workspace;

View file

@ -1,6 +1,8 @@
use appflowy_cloud::application::{init_state, Application};
use appflowy_cloud::config::config::{get_configuration, Environment};
use appflowy_cloud::telemetry::init_subscriber;
use std::panic;
use tracing::error;
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
@ -23,6 +25,25 @@ async fn main() -> anyhow::Result<()> {
.expect("Failed to parse APP_ENVIRONMENT.");
init_subscriber(&app_env, filters);
// Set panic hook
panic::set_hook(Box::new(|panic_info| {
let panic_message = if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
*s
} else {
"No specific panic message available"
};
let location = if let Some(location) = panic_info.location() {
format!(
"Panic occurred in file '{}' at line {}",
location.file(),
location.line()
)
} else {
"Panic location unknown".to_string()
};
error!("panic hook: {}\n{}", panic_message, location);
}));
let configuration = get_configuration(&app_env).expect("The configuration should be configured.");
let state = init_state(&configuration)
.await

View file

@ -0,0 +1,401 @@
use crate::casbin::*;
use actix_http::Method;
use anyhow::{anyhow, Context};
use appflowy_cloud::biz;
use appflowy_cloud::biz::casbin::access_control::CasbinAccessControl;
use appflowy_cloud::biz::casbin::adapter::PgAdapter;
use appflowy_cloud::biz::casbin::{ActionType, ObjectType};
use appflowy_cloud::biz::pg_listener::PgListeners;
use casbin::{CoreApi, DefaultModel, Enforcer};
use database_entity::dto::{AFAccessLevel, AFRole};
use realtime::collaborate::{CollabAccessControl, CollabUserId};
use shared_entity::dto::workspace_dto::CreateWorkspaceMember;
use sqlx::PgPool;
use std::time::Duration;
use tokio::time::sleep;
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_get_access_level(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = create_user(&pool).await?;
let owner = create_user(&pool).await?;
let member = create_user(&pool).await?;
let guest = create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let members = vec![
CreateWorkspaceMember {
email: owner.email.clone(),
role: AFRole::Owner,
},
CreateWorkspaceMember {
email: member.email.clone(),
role: AFRole::Member,
},
CreateWorkspaceMember {
email: guest.email.clone(),
role: AFRole::Guest,
},
];
let _ =
biz::workspace::ops::add_workspace_members(&pool, &user.uuid, &workspace.workspace_id, members)
.await
.context("adding users to workspace")?;
assert_access_level(
&access_control,
&user.uid,
workspace.workspace_id.to_string(),
Some(AFAccessLevel::FullAccess),
)
.await;
assert_access_level(
&access_control,
&member.uid,
workspace.workspace_id.to_string(),
Some(AFAccessLevel::ReadAndWrite),
)
.await;
assert_access_level(
&access_control,
&guest.uid,
workspace.workspace_id.to_string(),
Some(AFAccessLevel::ReadOnly),
)
.await;
let mut txn = pool
.begin()
.await
.context("acquire transaction to update collab member")?;
database::collab::upsert_collab_member_with_txn(
guest.uid,
&workspace.workspace_id.to_string(),
&AFAccessLevel::ReadAndComment,
&mut txn,
)
.await?;
txn
.commit()
.await
.expect("commit transaction to update collab member");
assert_access_level(
&access_control,
&guest.uid,
workspace.workspace_id.to_string(),
Some(AFAccessLevel::ReadAndComment),
)
.await;
database::collab::delete_collab_member(guest.uid, &workspace.workspace_id.to_string(), &pool)
.await
.context("delete collab member")?;
assert_access_level(
&access_control,
&guest.uid,
workspace.workspace_id.to_string(),
None,
)
.await;
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = create_user(&pool).await?;
let guest = create_user(&pool).await?;
let stranger = create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
for method in [Method::GET, Method::POST, Method::PUT, Method::DELETE] {
assert_can_access_http_method(
&access_control,
&user.uid,
&workspace.workspace_id.to_string(),
method,
true,
)
.await;
}
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
"new collab oid",
&Method::POST
)
.await?,
"should have access to non-existent collab oid"
);
// guest should have read access
assert_can_access_http_method(
&access_control,
&guest.uid,
&workspace.workspace_id.to_string(),
Method::GET,
true,
)
.await;
// guest should not have write access
assert_can_access_http_method(
&access_control,
&guest.uid,
&workspace.workspace_id.to_string(),
Method::POST,
false,
)
.await;
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&stranger.uid),
&workspace.workspace_id.to_string(),
&Method::GET
)
.await?,
"stranger should not have read access"
);
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&stranger.uid),
&workspace.workspace_id.to_string(),
&Method::POST
)
.await?,
"stranger should not have write access"
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_send_receive_collab_update(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = create_user(&pool).await?;
let guest = create_user(&pool).await?;
let stranger = create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
// Need to wait for the listener(spawn_listen_on_workspace_member_change) to receive the event
//
sleep(Duration::from_secs(1)).await;
assert!(
access_control
.can_send_collab_update(&user.uid, &workspace.workspace_id.to_string())
.await?
);
assert!(
access_control
.can_receive_collab_update(&user.uid, &workspace.workspace_id.to_string())
.await?
);
assert!(
!access_control
.can_send_collab_update(&guest.uid, &workspace.workspace_id.to_string())
.await?,
"guest cannot send collab update"
);
assert!(
access_control
.can_receive_collab_update(&guest.uid, &workspace.workspace_id.to_string())
.await?,
"guest can receive collab update"
);
assert!(
!access_control
.can_send_collab_update(&stranger.uid, &workspace.workspace_id.to_string())
.await?,
"stranger cannot send collab update"
);
assert!(
!access_control
.can_receive_collab_update(&stranger.uid, &workspace.workspace_id.to_string())
.await?,
"stranger cannot receive collab update"
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_cache_collab_access_level(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let uid = 123;
let oid = "collab::oid".to_owned();
access_control
.cache_collab_access_level(CollabUserId::UserId(&uid), &oid, AFAccessLevel::FullAccess)
.await?;
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(CollabUserId::UserId(&uid), &oid)
.await?
);
access_control
.cache_collab_access_level(CollabUserId::UserId(&uid), &oid, AFAccessLevel::ReadOnly)
.await?;
assert_eq!(
AFAccessLevel::ReadOnly,
access_control
.get_collab_access_level(CollabUserId::UserId(&uid), &oid)
.await?
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_casbin_access_control_update_remove(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let uid = 123;
assert!(
access_control
.update(
&uid,
&ObjectType::Workspace("123"),
&ActionType::Role(AFRole::Owner)
)
.await?
);
assert!(access_control.get_enforcer().read().await.enforce((
uid.to_string(),
ObjectType::Workspace("123").to_string(),
i32::from(AFRole::Owner).to_string(),
))?);
assert!(
access_control
.remove(&uid, &ObjectType::Workspace("123"))
.await?
);
assert!(!access_control.get_enforcer().read().await.enforce((
uid.to_string(),
ObjectType::Workspace("123").to_string(),
i32::from(AFRole::Owner).to_string(),
))?);
Ok(())
}

View file

@ -0,0 +1,107 @@
use crate::casbin::{
assert_workspace_role, assert_workspace_role_error, create_user, setup_db, MODEL_CONF,
};
use anyhow::{anyhow, Context};
use app_error::ErrorCode;
use appflowy_cloud::biz;
use appflowy_cloud::biz::casbin::access_control::CasbinAccessControl;
use appflowy_cloud::biz::casbin::adapter::PgAdapter;
use appflowy_cloud::biz::pg_listener::PgListeners;
use casbin::{CoreApi, DefaultModel, Enforcer};
use database_entity::dto::AFRole;
use shared_entity::dto::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset};
use sqlx::PgPool;
#[sqlx::test(migrations = false)]
async fn test_workspace_access_control_get_role(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_workspace_access_control();
let user = create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
assert_workspace_role(
&access_control,
&user.uid,
&workspace.workspace_id,
Some(AFRole::Owner),
)
.await;
let member = create_user(&pool).await?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&member.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: member.email.clone(),
role: AFRole::Member,
}],
)
.await
.context("adding users to workspace")?;
assert_workspace_role(
&access_control,
&member.uid,
&workspace.workspace_id,
Some(AFRole::Member),
)
.await;
// wait for update message
biz::workspace::ops::update_workspace_member(
&pool,
&workspace.workspace_id,
&WorkspaceMemberChangeset {
email: member.email.clone(),
role: Some(AFRole::Guest),
name: None,
},
)
.await
.context("update user workspace role")?;
assert_workspace_role(
&access_control,
&member.uid,
&workspace.workspace_id,
Some(AFRole::Guest),
)
.await;
biz::workspace::ops::remove_workspace_members(
&user.uuid,
&pool,
&workspace.workspace_id,
&[member.email.clone()],
)
.await
.context("removing users from workspace")?;
assert_workspace_role_error(
&access_control,
&member.uid,
&workspace.workspace_id,
ErrorCode::NotEnoughPermissions,
)
.await;
Ok(())
}

285
tests/casbin/mod.rs Normal file
View file

@ -0,0 +1,285 @@
use actix_http::Method;
use anyhow::Context;
use app_error::ErrorCode;
use appflowy_cloud::biz::casbin::access_control::{
CasbinCollabAccessControl, CasbinWorkspaceAccessControl,
};
use appflowy_cloud::biz::workspace::access_control::WorkspaceAccessControl;
use database_entity::dto::{AFAccessLevel, AFRole};
use lazy_static::lazy_static;
use realtime::collaborate::{CollabAccessControl, CollabUserId};
use snowflake::Snowflake;
use sqlx::PgPool;
use std::time::Duration;
use tokio::sync::RwLock;
use uuid::Uuid;
mod collab_ac_test;
mod member_ac_test;
mod user_ac_test;
pub const MODEL_CONF: &str = r#"
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _ # role to action
g2 = _, _ # worksheet to collab
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = r.sub == p.sub && g2(p.obj, r.obj) && g(p.act, r.act)
"#;
lazy_static! {
pub static ref ID_GEN: RwLock<Snowflake> = RwLock::new(Snowflake::new(1));
}
pub async fn setup_db(pool: &PgPool) -> anyhow::Result<()> {
// Have to manually manage schema and tables managed by gotrue but referenced by our
// migration scripts.
// Create schema and tables
sqlx::query(r#"create schema auth"#).execute(pool).await?;
sqlx::query(
r#"create table auth.users(
id uuid NOT NULL UNIQUE,
deleted_at timestamptz null,
CONSTRAINT users_pkey PRIMARY KEY (id)
)"#,
)
.execute(pool)
.await?;
// Manually run migration after creating required objects above.
sqlx::migrate!().run(pool).await?;
// Remove foreign key constraint
sqlx::query(r#"alter table public.af_user drop constraint af_user_email_foreign_key"#)
.execute(pool)
.await?;
Ok(())
}
#[derive(Debug, Clone)]
pub struct User {
pub uid: i64,
pub uuid: Uuid,
pub email: String,
}
pub async fn create_user(pool: &PgPool) -> anyhow::Result<User> {
// Create user and workspace
let uid = ID_GEN.write().await.next_id();
let uuid = Uuid::new_v4();
let email = format!("{}@appflowy.io", uuid);
let name = uuid.to_string();
database::user::create_user(pool, uid, &uuid, &email, &name)
.await
.context("create user")?;
Ok(User { uid, uuid, email })
}
/// Asserts that the user has the specified access level within a workspace.
///
/// This function continuously checks the user's access level in a workspace and asserts that it
/// matches the expected level. The function retries the check a fixed number of times before timing out.
///
/// # Panics
/// Panics if the expected access level is not achieved before the timeout.
pub async fn assert_access_level<T: AsRef<str>>(
access_control: &CasbinCollabAccessControl,
uid: &i64,
workspace_id: T,
expected_level: Option<AFAccessLevel>,
) {
let mut retry_count = 0;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
panic!("can't get the expected access level before timeout");
},
result = access_control
.get_collab_access_level(
CollabUserId::UserId(uid),
workspace_id.as_ref(),
)
=> {
retry_count += 1;
match result {
Ok(access_level) => {
if retry_count > 10 {
assert_eq!(access_level, expected_level.unwrap());
break;
}
if let Some(expected_level) = expected_level {
if access_level == expected_level {
break;
}
}
tokio::time::sleep(Duration::from_millis(300)).await;
},
Err(err) => {
if err.is_record_not_found() & expected_level.is_none() {
break;
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
},
}
}
}
/// Asserts that the user has the specified role within a workspace.
///
/// This function continuously checks the user's role in a workspace and asserts that it matches
/// the expected role. It retries the check a fixed number of times before timing out.
///
/// # Panics
/// Panics if the expected role is not achieved before the timeout.
pub async fn assert_workspace_role(
access_control: &CasbinWorkspaceAccessControl,
uid: &i64,
workspace_id: &Uuid,
expected_role: Option<AFRole>,
) {
let mut retry_count = 0;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
panic!("can't get the expected role before timeout");
},
result = access_control
.get_role_from_uid(uid, workspace_id)
=> {
retry_count += 1;
match result {
Ok(role) => {
if retry_count > 10 {
assert_eq!(role, expected_role.unwrap());
break;
}
if let Some(expected_role) = &expected_role {
if &role == expected_role {
break;
}
}
tokio::time::sleep(Duration::from_millis(300)).await;
},
Err(err) => {
if err.is_record_not_found() & expected_role.is_none() {
break;
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
},
}
}
}
/// Asserts that retrieving the user's role within a workspace results in a specific error.
///
/// This function continuously attempts to fetch the user's role in a workspace, expecting a specific
/// error. If the expected error does not occur within a certain number of retries, it panics.
///
/// # Panics
/// Panics if the expected error is not encountered before the timeout or if an unexpected role is received.
pub async fn assert_workspace_role_error(
access_control: &CasbinWorkspaceAccessControl,
uid: &i64,
workspace_id: &Uuid,
expected_error: ErrorCode,
) {
let mut retry_count = 0;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
panic!("can't get the expected role before timeout");
},
result = access_control
.get_role_from_uid(uid, workspace_id)
=> {
retry_count += 1;
match result {
Ok(role) => {
if retry_count > 10 {
panic!("expected error: {:?}, but got role: {:?}", expected_error, role);
}
tokio::time::sleep(Duration::from_millis(300)).await;
},
Err(err) => {
if err.code() == expected_error {
break;
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
},
}
}
}
/// Asserts whether the user has access to a specific HTTP method on a given object.
///
/// This function continuously checks if the user is allowed to access a particular HTTP method
/// on an object and asserts that the result matches the expected outcome. It retries the check
/// a fixed number of times before timing out.
///
/// # Arguments
/// * `access_control` - A reference to the `CasbinCollabAccessControl` instance.
/// * `uid` - The user ID for which to check the access.
/// * `object_id` - The ID of the object being accessed.
/// * `method` - The HTTP method (`Method`) to check.
/// * `expected` - The expected boolean result of the access check.
///
/// # Panics
/// Panics if the expected access result is not achieved before the timeout.
pub async fn assert_can_access_http_method(
access_control: &CasbinCollabAccessControl,
uid: &i64,
object_id: &str,
method: Method,
expected: bool,
) {
let mut retry_count = 0;
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
panic!("can't get the expected access level before timeout");
},
result = access_control
.can_access_http_method(
CollabUserId::UserId(uid),
object_id,
&method,
)
=> {
retry_count += 1;
match result {
Ok(access) => {
if retry_count > 10 {
assert_eq!(access, expected);
break;
}
if access == expected {
break;
}
tokio::time::sleep(Duration::from_millis(300)).await;
},
Err(_err) => {
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
},
}
}
}

View file

@ -0,0 +1,270 @@
use crate::casbin::*;
use anyhow::anyhow;
use appflowy_cloud::biz;
use appflowy_cloud::biz::casbin::adapter::PgAdapter;
use appflowy_cloud::biz::casbin::{Action, ObjectType};
use casbin::{CoreApi, DefaultModel, Enforcer};
use database_entity::dto::{AFAccessLevel, AFRole};
use shared_entity::dto::workspace_dto::CreateWorkspaceMember;
use sqlx::PgPool;
#[sqlx::test(migrations = false)]
async fn test_create_user(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let user = create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Workspace(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFRole::Owner).to_string(),
))
.context("user should be owner of its workspace")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::FullAccess).to_string(),
))
.context("user should have full access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should be able to delete its collab")?);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_add_users_to_workspace(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let user_main = create_user(&pool).await?;
let user_owner = create_user(&pool).await?;
let user_member = create_user(&pool).await?;
let user_guest = create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user_main.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let members = vec![
CreateWorkspaceMember {
email: user_owner.email.clone(),
role: AFRole::Owner,
},
CreateWorkspaceMember {
email: user_member.email.clone(),
role: AFRole::Member,
},
CreateWorkspaceMember {
email: user_guest.email.clone(),
role: AFRole::Guest,
},
];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_main.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
{
// Owner
let user = user_owner;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::FullAccess).to_string(),
))
.context("owner should have full access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should be able to delete its collab")?);
}
{
// Member
let user = user_member;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should have read write access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should not be able to delete its collab")?);
}
{
// Guest
let user = user_guest;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadOnly).to_string(),
))
.context("guest should have read only access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should not be able to read its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should not be able to write its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should not be able to delete its collab")?);
}
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_reload_policy_after_adding_user_to_workspace(pool: PgPool) -> anyhow::Result<()> {
setup_db(&pool).await?;
let user_owner = create_user(&pool).await?;
let user_member = create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user_owner.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
// Create enforcer before adding user to workspace
let model = DefaultModel::from_str(MODEL_CONF).await?;
let mut enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let members = vec![CreateWorkspaceMember {
email: user_member.email.clone(),
role: AFRole::Member,
}];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_owner.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
assert!(!enforcer
.enforce((
user_member.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should not have read write access to collab before reload")?);
enforcer.load_policy().await?;
assert!(enforcer
.enforce((
user_member.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should have read write access to collab")?);
Ok(())
}

View file

@ -1,4 +1,7 @@
extern crate core;
use client_api::Client;
mod casbin;
mod collab;
mod gotrue;
mod user;