feat: casbin access control (#178)

* feat: casbin for access control

* fix: method to generate database url instead of ToString

* fix: hold write lock when modifying policies

* chore: fix compile

* fix: remove db constraint for lib integration tests

---------

Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Jiraffe7 2023-12-11 11:08:15 +08:00 committed by GitHub
parent c0f5c2ce7e
commit 36ef0f13b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1933 additions and 74 deletions

View file

@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "SELECT DISTINCT af_workspace_member.workspace_id FROM af_workspace_member",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "workspace_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "1141f880140df9d91b1464fde0dcad2e39a10e57e828b0ad44a00621d063d18c"
}

View file

@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT af_user.uid, af_user.name, af_user.email,\n af_workspace_member.role_id AS role\n FROM public.af_workspace_member\n JOIN public.af_user ON af_workspace_member.uid = af_user.uid\n WHERE af_workspace_member.workspace_id = $1\n ORDER BY af_workspace_member.created_at ASC;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "uid",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "email",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "role",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "6f23cc00918d6c85f4b7cf71d9620a7428ebe59f0a383878b32e1abfe10dcbca"
}

View file

@ -1,34 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT af_user.name, af_user.email,\n af_workspace_member.role_id AS role\n FROM public.af_workspace_member\n JOIN public.af_user ON af_workspace_member.uid = af_user.uid\n WHERE af_workspace_member.workspace_id = $1\n ORDER BY af_workspace_member.created_at ASC;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "email",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "role",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "72771f3633b18b161d579f00cdd72eb5b5643796f6d36c0934f72880f973a37b"
}

View file

@ -1,20 +1,25 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT af_user.name, af_user.email, af_workspace_member.role_id AS role\n FROM public.af_workspace_member\n JOIN public.af_user ON af_workspace_member.uid = af_user.uid\n WHERE af_workspace_member.workspace_id = $1 \n AND af_workspace_member.uid = $2 \n ",
"query": "\n SELECT af_user.uid, af_user.name, af_user.email, af_workspace_member.role_id AS role\n FROM public.af_workspace_member\n JOIN public.af_user ON af_workspace_member.uid = af_user.uid\n WHERE af_workspace_member.workspace_id = $1 \n AND af_workspace_member.uid = $2 \n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "uid",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 1,
"ordinal": 2,
"name": "email",
"type_info": "Text"
},
{
"ordinal": 2,
"ordinal": 3,
"name": "role",
"type_info": "Int4"
}
@ -26,10 +31,11 @@
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "cd705afeabea205154c4e00d456b36d168de1b2c9720a3f76bf4525cfa87aa46"
"hash": "75f37fa6492019be868ea7f8ffea64a3809c1232c548608c375b084440f672f1"
}

View file

@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "SELECT DISTINCT af_collab_member.oid FROM af_collab_member",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "oid",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "8135c6a16c27a0ccbde65473a8041007ae9ef7f546fadb656a2948ffb0db8e53"
}

119
Cargo.lock generated
View file

@ -408,6 +408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a"
dependencies = [
"cfg-if",
"const-random",
"getrandom 0.2.10",
"once_cell",
"version_check",
@ -516,6 +517,7 @@ dependencies = [
"async-trait",
"axum_session",
"bytes",
"casbin",
"chrono",
"client-api",
"collab",
@ -1115,6 +1117,26 @@ dependencies = [
"bytes",
]
[[package]]
name = "casbin"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53fd06051bc34aa7ee753647dbd36506fa9f84026993d9871e65e0ee71ac3632"
dependencies = [
"async-trait",
"fixedbitset",
"getrandom 0.2.10",
"once_cell",
"parking_lot",
"petgraph",
"regex",
"rhai",
"ritelinked",
"serde",
"thiserror",
"tokio",
]
[[package]]
name = "cast"
version = "0.3.0"
@ -1396,6 +1418,26 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f"
[[package]]
name = "const-random"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aaf16c9c2c612020bcfd042e170f6e32de9b9d75adb5277cdbbd2e2c8c8299a"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom 0.2.10",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "convert_case"
version = "0.4.0"
@ -1590,6 +1632,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crunchy"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -2202,8 +2250,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
[[package]]
@ -2285,6 +2335,15 @@ version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash 0.7.6",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -2599,6 +2658,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "ipnet"
version = "2.8.0"
@ -4053,6 +4121,34 @@ dependencies = [
"winreg",
]
[[package]]
name = "rhai"
version = "1.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3625f343d89990133d013e39c46e350915178cf94f1bec9f49b0cbef98a3e3c"
dependencies = [
"ahash 0.8.6",
"bitflags 2.4.0",
"instant",
"num-traits",
"once_cell",
"rhai_codegen",
"serde",
"smallvec",
"smartstring",
]
[[package]]
name = "rhai_codegen"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "853977598f084a492323fe2f7896b4100a86284ee8473612de60021ea341310f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.32",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -4068,6 +4164,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "ritelinked"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98f2771d255fd99f0294f13249fecd0cae6e074f86b4197ec1f1689d537b44d3"
dependencies = [
"ahash 0.7.6",
"hashbrown 0.11.2",
]
[[package]]
name = "rkyv"
version = "0.7.42"
@ -4606,6 +4712,9 @@ name = "smallvec"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
dependencies = [
"serde",
]
[[package]]
name = "smartstring"
@ -4614,6 +4723,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29"
dependencies = [
"autocfg",
"serde",
"static_assertions",
"version_check",
]
@ -5103,6 +5213,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"

View file

@ -68,6 +68,7 @@ axum_session = "0.7.0"
uuid = "1.4.1"
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
prost = "0.12.1"
casbin = "2.0.9"
# collab
collab = { version = "0.1.0", features = ["async-plugin"] }

View file

@ -27,3 +27,5 @@ s3:
secret_key: minioadmin
bucket: appflowy
region: us-east-1
casbin:
pool_size: 8

View file

@ -95,6 +95,10 @@ pub enum AppError {
}
impl AppError {
pub fn is_not_enough_permissions(&self) -> bool {
matches!(self, AppError::NotEnoughPermissions(_))
}
pub fn is_record_not_found(&self) -> bool {
matches!(self, AppError::RecordNotFound(_))
}

View file

@ -255,7 +255,7 @@ pub struct AFPermission {
pub description: String,
}
#[derive(Deserialize_repr, Serialize_repr, Eq, PartialEq, Debug, Clone)]
#[derive(Deserialize_repr, Serialize_repr, Eq, PartialEq, Debug, Clone, Copy)]
#[repr(i32)]
pub enum AFAccessLevel {
// Can't modify the value of the enum

View file

@ -49,6 +49,7 @@ pub struct AFUserProfileRow {
#[derive(FromRow, Serialize, Deserialize)]
pub struct AFWorkspaceMemberRow {
pub uid: i64,
pub name: String,
pub email: String,
pub role: AFRole,

View file

@ -326,7 +326,7 @@ pub async fn upsert_collab_member_with_txn<T: AsRef<str> + Debug>(
txn: &mut Transaction<'_, sqlx::Postgres>,
) -> Result<(), AppError> {
let oid = oid.as_ref();
let access_level: i32 = access_level.clone().into();
let access_level: i32 = (*access_level).into();
let permission_id = sqlx::query_scalar!(
r#"
SELECT id
@ -382,6 +382,26 @@ pub async fn delete_collab_member(uid: i64, oid: &str, pg_pool: &PgPool) -> Resu
Ok(())
}
#[inline]
pub async fn select_all_collab_members(
pg_pool: &PgPool,
) -> Result<Vec<(String, Vec<AFCollabMember>)>, AppError> {
let collabs: Vec<_> = sqlx::query!("SELECT DISTINCT af_collab_member.oid FROM af_collab_member")
.fetch_all(pg_pool)
.await?
.into_iter()
.map(|r| r.oid)
.collect();
let mut collab_members = Vec::with_capacity(collabs.len());
for oid in collabs {
let members = select_collab_members(&oid, pg_pool).await?;
collab_members.push((oid, members));
}
Ok(collab_members)
}
#[inline]
pub async fn select_collab_members(
oid: &str,

View file

@ -237,6 +237,26 @@ pub async fn delete_workspace_members(
Ok(())
}
pub async fn select_all_workspace_members(
pg_pool: &PgPool,
) -> Result<Vec<(String, Vec<AFWorkspaceMemberRow>)>, AppError> {
let workspaces: Vec<_> =
sqlx::query!("SELECT DISTINCT af_workspace_member.workspace_id FROM af_workspace_member")
.fetch_all(pg_pool)
.await?
.into_iter()
.map(|r| r.workspace_id)
.collect();
let mut workspace_members = Vec::with_capacity(workspaces.len());
for id in workspaces {
let members = select_workspace_member_list(pg_pool, &id).await?;
workspace_members.push((id.to_string(), members));
}
Ok(workspace_members)
}
/// returns a list of workspace members, sorted by their creation time.
#[inline]
pub async fn select_workspace_member_list(
@ -246,7 +266,7 @@ pub async fn select_workspace_member_list(
let members = sqlx::query_as!(
AFWorkspaceMemberRow,
r#"
SELECT af_user.name, af_user.email,
SELECT af_user.uid, af_user.name, af_user.email,
af_workspace_member.role_id AS role
FROM public.af_workspace_member
JOIN public.af_user ON af_workspace_member.uid = af_user.uid
@ -269,7 +289,7 @@ pub async fn select_workspace_member(
let member = sqlx::query_as!(
AFWorkspaceMemberRow,
r#"
SELECT af_user.name, af_user.email, af_workspace_member.role_id AS role
SELECT af_user.uid, af_user.name, af_user.email, af_workspace_member.role_id AS role
FROM public.af_workspace_member
JOIN public.af_user ON af_workspace_member.uid = af_user.uid
WHERE af_workspace_member.workspace_id = $1

View file

@ -1,3 +1,4 @@
use crate::biz::casbin::access_control::CasbinCollabAccessControl;
use crate::state::AppState;
use actix::Addr;
use actix_web::web::{Data, Path, Payload};
@ -8,7 +9,6 @@ use std::sync::Arc;
use realtime::client::ClientSession;
use realtime::collaborate::CollabServer;
use crate::biz::collab::access_control::CollabAccessControlImpl;
use crate::biz::collab::storage::CollabPostgresDBStorage;
use crate::biz::user::RealtimeUserImpl;
use crate::component::auth::jwt::{authorization_from_token, UserUuid};
@ -23,7 +23,7 @@ pub fn ws_scope() -> Scope {
const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB
pub type CollabServerImpl =
Addr<CollabServer<CollabPostgresDBStorage, Arc<RealtimeUserImpl>, Arc<CollabAccessControlImpl>>>;
Addr<CollabServer<CollabPostgresDBStorage, Arc<RealtimeUserImpl>, CasbinCollabAccessControl>>;
#[instrument(skip_all, err)]
#[get("/{token}/{device_id}")]

View file

@ -1,4 +1,6 @@
use crate::api::metrics::{metrics_registry, metrics_scope};
use crate::biz::casbin::adapter::PgAdapter;
use crate::biz::casbin::MODEL_CONF;
use crate::component::auth::HEADER_TOKEN;
use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting, TlsConfig};
use crate::middleware::cors_mw::default_cors;
@ -29,16 +31,16 @@ use crate::api::file_storage::file_storage_scope;
use crate::api::user::user_scope;
use crate::api::workspace::{collab_scope, workspace_scope};
use crate::api::ws::ws_scope;
use crate::biz::collab::access_control::{CollabAccessControlImpl, CollabHttpAccessControl};
use crate::biz::casbin::access_control::CasbinAccessControl;
use crate::biz::collab::access_control::CollabHttpAccessControl;
use crate::biz::collab::storage::init_collab_storage;
use crate::biz::pg_listener::PgListeners;
use crate::biz::user::RealtimeUserImpl;
use crate::biz::workspace::access_control::{
WorkspaceAccessControlImpl, WorkspaceHttpAccessControl,
};
use crate::biz::workspace::access_control::WorkspaceHttpAccessControl;
use crate::middleware::access_control_mw::WorkspaceAccessControl;
use crate::middleware::metrics_mw::MetricsMiddleware;
use casbin::CoreApi;
use database::file::bucket_s3_impl::S3BucketStorage;
use realtime::collaborate::CollabServer;
@ -96,9 +98,11 @@ pub async fn run(
let access_control = WorkspaceAccessControl::new()
.with_acs(WorkspaceHttpAccessControl(
state.workspace_access_control.clone(),
state.workspace_access_control.clone().into(),
))
.with_acs(CollabHttpAccessControl(state.collab_access_control.clone()));
.with_acs(CollabHttpAccessControl(
state.collab_access_control.clone().into(),
));
// Initialize metrics that which are registered in the registry.
let (metrics, registry) = metrics_registry();
@ -169,20 +173,21 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
// Pg listeners
let pg_listeners = Arc::new(PgListeners::new(&pg_pool).await?);
// Collab access control
let collab_member_listener = pg_listeners.subscribe_collab_member_change();
let collab_access_control = Arc::new(CollabAccessControlImpl::new(
let workspace_member_listener = pg_listeners.subscribe_workspace_member_change();
let access_control_model = casbin::DefaultModel::from_str(MODEL_CONF).await?;
let access_control_adapter = PgAdapter::new(pg_pool.clone());
let enforcer = casbin::Enforcer::new(access_control_model, access_control_adapter).await?;
let casbin_access_control = CasbinAccessControl::new(
pg_pool.clone(),
collab_member_listener,
));
// Workspace access control
let workspace_member_listener = pg_listeners.subscribe_workspace_member_change();
let workspace_access_control = Arc::new(WorkspaceAccessControlImpl::new(
pg_pool.clone(),
workspace_member_listener,
));
enforcer,
);
let collab_access_control = casbin_access_control.new_collab_access_control();
let workspace_access_control = casbin_access_control.new_workspace_access_control();
let collab_storage = Arc::new(
init_collab_storage(
@ -205,6 +210,7 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
workspace_access_control,
bucket_storage,
pg_listeners,
casbin_access_control,
})
}

View file

@ -0,0 +1,997 @@
use std::{str::FromStr, sync::Arc};
use actix_web::http::Method;
use anyhow::anyhow;
use async_trait::async_trait;
use casbin::{CoreApi, MgmtApi};
use database::user::select_uid_from_uuid;
use sqlx::PgPool;
use tokio::sync::{broadcast, RwLock};
use tracing::log::warn;
use uuid::Uuid;
use crate::biz::{
collab::member_listener::{CollabMemberAction, CollabMemberNotification},
workspace::{
access_control::WorkspaceAccessControl,
member_listener::{WorkspaceMemberAction, WorkspaceMemberNotification},
},
};
use app_error::AppError;
use database_entity::dto::{AFAccessLevel, AFRole};
use realtime::collaborate::{CollabAccessControl, CollabUserId};
use super::{
Action, ActionType, ObjectType, POLICY_FIELD_INDEX_ACTION, POLICY_FIELD_INDEX_OBJECT,
POLICY_FIELD_INDEX_USER,
};
/// Manages access control using Casbin.
///
/// Stores access control policies in the form `subject, object, role`
/// where `subject` is `uid`, `object` is `oid`, and `role` is [AFAccessLevel] or [AFRole].
///
/// Roles are mapped to the corresponding actions that they are allowed to perform.
/// `FullAccess` has write
/// `FullAccess` has read
///
/// Access control requests are made in the form `subject, object, action`
/// and will be evaluated against the policies and mappings stored,
/// according to the model defined.
pub struct CasbinAccessControl {
pg_pool: PgPool,
enforcer: Arc<RwLock<casbin::Enforcer>>,
}
impl Clone for CasbinAccessControl {
fn clone(&self) -> Self {
Self {
pg_pool: self.pg_pool.clone(),
enforcer: Arc::clone(&self.enforcer),
}
}
}
impl CasbinAccessControl {
pub fn new(
pg_pool: PgPool,
collab_listener: broadcast::Receiver<CollabMemberNotification>,
workspace_listener: broadcast::Receiver<WorkspaceMemberNotification>,
enforcer: casbin::Enforcer,
) -> Self {
let enforcer = Arc::new(RwLock::new(enforcer));
spawn_listen_on_workspace_member_change(workspace_listener, enforcer.clone());
spawn_listen_on_collab_member_change(collab_listener, enforcer.clone());
Self { pg_pool, enforcer }
}
pub fn new_collab_access_control(&self) -> CasbinCollabAccessControl {
CasbinCollabAccessControl {
casbin_access_control: self.clone(),
}
}
pub fn new_workspace_access_control(&self) -> CasbinWorkspaceAccessControl {
CasbinWorkspaceAccessControl {
casbin_access_control: self.clone(),
}
}
/// Update permission for a user.
///
/// [`ObjectType::Workspace`] has to be paired with [`ActionType::Role`],
/// [`ObjectType::Collab`] has to be paired with [`ActionType::Level`],
async fn update(
&self,
uid: &i64,
obj: &ObjectType<'_>,
act: &ActionType,
) -> Result<bool, AppError> {
let (obj_id, action) = match (obj, act) {
(ObjectType::Workspace(_), ActionType::Role(role)) => {
Ok((obj.to_string(), i32::from(role.clone()).to_string()))
},
(ObjectType::Collab(_), ActionType::Level(level)) => {
Ok((obj.to_string(), i32::from(*level).to_string()))
},
_ => Err(AppError::Internal(anyhow!(
"invalid object type and action type combination: object={:?}, action={:?}",
obj,
act
))),
}?;
let mut enforcer = self.enforcer.write().await;
let current_policy = enforcer
.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![obj_id.clone()])
.into_iter()
.find(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string());
if let Some(current_policy) = current_policy {
enforcer
.remove_policy(current_policy)
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error removing policy: {e:?}")))?;
}
enforcer
.add_policy(vec![uid.to_string(), obj_id, action])
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error adding policy: {e:?}")))
}
async fn remove(&self, uid: &i64, obj: &ObjectType<'_>) -> Result<bool, AppError> {
let obj_id = obj.to_string();
let mut enforcer = self.enforcer.write().await;
let policies = enforcer.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![obj_id]);
let rem = policies
.into_iter()
.filter(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string())
.collect();
enforcer
.remove_policies(rem)
.await
.map_err(|e| AppError::Internal(anyhow!("casbin error enforce: {e:?}")))
}
/// Get uid which is used for all operations.
async fn get_uid(&self, user: &CollabUserId<'_>) -> Result<i64, AppError> {
let uid = match user {
CollabUserId::UserId(uid) => **uid,
CollabUserId::UserUuid(uuid) => select_uid_from_uuid(&self.pg_pool, uuid).await?,
};
Ok(uid)
}
}
fn spawn_listen_on_collab_member_change(
mut listener: broadcast::Receiver<CollabMemberNotification>,
enforcer: Arc<RwLock<casbin::Enforcer>>,
) {
tokio::spawn(async move {
while let Ok(change) = listener.recv().await {
match change.action_type {
CollabMemberAction::INSERT | CollabMemberAction::UPDATE => {
if let (Some(_), Some(_)) = (change.new_oid(), change.new_uid()) {
if let Err(err) = enforcer.write().await.load_policy().await {
warn!(
"Failed to reload the collab member status from db: {:?}, error: {}",
change, err
);
}
} else {
warn!("The oid or uid is None")
}
},
CollabMemberAction::DELETE => {
if let (Some(_), Some(_)) = (change.old_oid(), change.old_uid()) {
if let Err(err) = enforcer.write().await.load_policy().await {
warn!(
"Failed to reload the collab member status from db: {:?}, error: {}",
change, err
);
}
} else {
warn!("The oid or uid is None")
}
},
}
}
});
}
fn spawn_listen_on_workspace_member_change(
mut listener: broadcast::Receiver<WorkspaceMemberNotification>,
enforcer: Arc<RwLock<casbin::Enforcer>>,
) {
tokio::spawn(async move {
while let Ok(change) = listener.recv().await {
match change.action_type {
WorkspaceMemberAction::INSERT | WorkspaceMemberAction::UPDATE => match change.new {
None => {
warn!("The workspace member change can't be None when the action is INSERT or UPDATE")
},
Some(_) => {
if let Err(err) = enforcer.write().await.load_policy().await {
warn!("Failed to reload workspace member status from db: {}", err);
}
},
},
WorkspaceMemberAction::DELETE => match change.old {
None => warn!("The workspace member change can't be None when the action is DELETE"),
Some(_) => {
if let Err(err) = enforcer.write().await.load_policy().await {
warn!("Failed to reload workspace member status from db: {}", err);
}
},
},
}
}
});
}
#[derive(Clone)]
pub struct CasbinCollabAccessControl {
casbin_access_control: CasbinAccessControl,
}
impl CasbinCollabAccessControl {
pub async fn update_member(&self, uid: &i64, oid: &str, access_level: AFAccessLevel) {
let _ = self
.casbin_access_control
.update(
uid,
&ObjectType::Collab(oid),
&ActionType::Level(access_level),
)
.await;
}
pub async fn remove_member(&self, uid: &i64, oid: &str) {
let _ = self
.casbin_access_control
.remove(uid, &ObjectType::Collab(oid))
.await;
}
}
#[async_trait]
impl CollabAccessControl for CasbinCollabAccessControl {
async fn get_collab_access_level(
&self,
user: CollabUserId<'_>,
oid: &str,
) -> Result<AFAccessLevel, AppError> {
let uid = self.casbin_access_control.get_uid(&user).await?;
let enforcer = self.casbin_access_control.enforcer.read().await;
let collab_id = ObjectType::Collab(oid).to_string();
let policies = enforcer.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![collab_id]);
// There should only be one entry per user per object, which is enforced in [CasbinAccessControl], so just take one using next.
let access_level = policies
.into_iter()
.find(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string())
.map(|p| p[POLICY_FIELD_INDEX_ACTION].clone())
.and_then(|s| i32::from_str(s.as_str()).ok())
.map(AFAccessLevel::from)
.ok_or(AppError::RecordNotFound(format!(
"user:{} is not a member of collab:{}",
uid, oid
)));
access_level
}
async fn cache_collab_access_level(
&self,
user: CollabUserId<'_>,
oid: &str,
level: AFAccessLevel,
) -> Result<(), AppError> {
let uid = self.casbin_access_control.get_uid(&user).await?;
self
.casbin_access_control
.update(&uid, &ObjectType::Collab(oid), &ActionType::Level(level))
.await?;
Ok(())
}
async fn can_access_http_method(
&self,
user: CollabUserId<'_>,
oid: &str,
method: &Method,
) -> Result<bool, AppError> {
let uid = self.casbin_access_control.get_uid(&user).await?;
let action = if Method::POST == method || Method::PUT == method || Method::DELETE == method {
Action::Write
} else {
Action::Read
};
// If collab does not exist, allow access.
// Workspace access control will still check it.
let collab_exists = self
.casbin_access_control
.enforcer
.read()
.await
.get_all_objects()
.contains(&ObjectType::Collab(oid).to_string());
if !collab_exists {
return Ok(true);
}
self
.casbin_access_control
.enforcer
.read()
.await
.enforce((
uid.to_string(),
ObjectType::Collab(oid).to_string(),
action.to_string(),
))
.map_err(|e| AppError::Internal(anyhow!("casbin error enforce: {e:?}")))
}
async fn can_send_collab_update(&self, uid: &i64, oid: &str) -> Result<bool, AppError> {
self
.casbin_access_control
.enforcer
.read()
.await
.enforce((
uid.to_string(),
ObjectType::Collab(oid).to_string(),
Action::Write.to_string(),
))
.map_err(|e| AppError::Internal(anyhow!("casbin error enforce: {e:?}")))
}
async fn can_receive_collab_update(&self, uid: &i64, oid: &str) -> Result<bool, AppError> {
self
.casbin_access_control
.enforcer
.read()
.await
.enforce((
uid.to_string(),
ObjectType::Collab(oid).to_string(),
Action::Read.to_string(),
))
.map_err(|e| AppError::Internal(anyhow!("casbin error enforce: {e:?}")))
}
}
#[derive(Clone)]
pub struct CasbinWorkspaceAccessControl {
casbin_access_control: CasbinAccessControl,
}
impl CasbinWorkspaceAccessControl {
pub async fn update_member(&self, uid: &i64, workspace_id: &Uuid, role: AFRole) {
let _ = self
.casbin_access_control
.update(
uid,
&ObjectType::Workspace(&workspace_id.to_string()),
&ActionType::Role(role),
)
.await;
}
pub async fn remove_member(&self, uid: &i64, workspace_id: &Uuid) {
let _ = self
.casbin_access_control
.remove(uid, &ObjectType::Workspace(&workspace_id.to_string()))
.await;
}
}
#[async_trait]
impl WorkspaceAccessControl for CasbinWorkspaceAccessControl {
async fn get_role_from_uuid(
&self,
user_uuid: &Uuid,
workspace_id: &Uuid,
) -> Result<AFRole, AppError> {
let uid = self
.casbin_access_control
.get_uid(&CollabUserId::UserUuid(user_uuid))
.await?;
self.get_role_from_uid(&uid, workspace_id).await
}
async fn get_role_from_uid(&self, uid: &i64, workspace_id: &Uuid) -> Result<AFRole, AppError> {
let enforcer = self.casbin_access_control.enforcer.read().await;
let workspace_id = workspace_id.to_string();
let policies = enforcer.get_filtered_policy(
POLICY_FIELD_INDEX_OBJECT,
vec![ObjectType::Workspace(&workspace_id).to_string()],
);
// There should only be one entry per user per object, which is enforced in [CasbinAccessControl], so just take one using next.
let role = policies
.into_iter()
.find(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string())
.map(|p| p[POLICY_FIELD_INDEX_ACTION].clone())
.and_then(|s| i32::from_str(s.as_str()).ok())
.map(AFRole::from)
.ok_or(AppError::NotEnoughPermissions(format!(
"user:{} is not a member of workspace:{}",
uid, workspace_id
)));
role
}
}
#[cfg(test)]
mod tests {
use anyhow::Context;
use casbin::{DefaultModel, Enforcer};
use shared_entity::dto::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset};
use crate::biz::{
self,
casbin::{adapter::PgAdapter, tests},
pg_listener::PgListeners,
};
use super::*;
#[sqlx::test(migrations = false)]
async fn test_workspace_access_control_get_role(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_workspace_access_control();
let user = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
assert_eq!(
AFRole::Owner,
access_control
.get_role_from_uuid(&user.uuid, &workspace.workspace_id)
.await?
);
assert_eq!(
AFRole::Owner,
access_control
.get_role_from_uid(&user.uid, &workspace.workspace_id)
.await?
);
let member = tests::create_user(&pool).await?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&member.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: member.email.clone(),
role: AFRole::Member,
}],
)
.await
.context("adding users to workspace")?;
assert_eq!(
AFRole::Member,
access_control
.get_role_from_uuid(&member.uuid, &workspace.workspace_id)
.await?
);
assert_eq!(
AFRole::Member,
access_control
.get_role_from_uid(&member.uid, &workspace.workspace_id)
.await?
);
// wait for update message
let mut workspace_listener = listeners.subscribe_workspace_member_change();
biz::workspace::ops::update_workspace_member(
&pool,
&workspace.workspace_id,
&WorkspaceMemberChangeset {
email: member.email.clone(),
role: Some(AFRole::Guest),
name: None,
},
)
.await
.context("update user workspace role")?;
let _ = workspace_listener.recv().await;
assert_eq!(
AFRole::Guest,
access_control
.get_role_from_uid(&member.uid, &workspace.workspace_id)
.await?
);
// wait for delete message
let mut workspace_listener = listeners.subscribe_workspace_member_change();
biz::workspace::ops::remove_workspace_members(
&user.uuid,
&pool,
&workspace.workspace_id,
&[member.email.clone()],
)
.await
.context("removing users from workspace")?;
let _ = workspace_listener.recv().await;
assert!(access_control
.get_role_from_uid(&member.uid, &workspace.workspace_id)
.await
.expect_err("user should not be part of workspace")
.is_not_enough_permissions());
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_get_access_level(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = tests::create_user(&pool).await?;
let owner = tests::create_user(&pool).await?;
let member = tests::create_user(&pool).await?;
let guest = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let members = vec![
CreateWorkspaceMember {
email: owner.email.clone(),
role: AFRole::Owner,
},
CreateWorkspaceMember {
email: member.email.clone(),
role: AFRole::Member,
},
CreateWorkspaceMember {
email: guest.email.clone(),
role: AFRole::Guest,
},
];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(
CollabUserId::UserUuid(&user.uuid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(
CollabUserId::UserId(&owner.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::ReadAndWrite,
access_control
.get_collab_access_level(
CollabUserId::UserId(&member.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
assert_eq!(
AFAccessLevel::ReadOnly,
access_control
.get_collab_access_level(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
// wait for update message
let mut collab_listener = listeners.subscribe_collab_member_change();
let mut txn = pool
.begin()
.await
.context("acquire transaction to update collab member")?;
database::collab::upsert_collab_member_with_txn(
guest.uid,
&workspace.workspace_id.to_string(),
&AFAccessLevel::ReadAndComment,
&mut txn,
)
.await?;
txn
.commit()
.await
.expect("commit transaction to update collab member");
let _ = collab_listener.recv().await;
assert_eq!(
AFAccessLevel::ReadAndComment,
access_control
.get_collab_access_level(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
)
.await?
);
// wait for delete message
let mut collab_listener = listeners.subscribe_collab_member_change();
database::collab::delete_collab_member(guest.uid, &workspace.workspace_id.to_string(), &pool)
.await
.context("delete collab member")?;
let _ = collab_listener.recv().await;
assert!(access_control
.get_collab_access_level(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string()
)
.await
.expect_err("user should not be part of collab")
.is_record_not_found());
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = tests::create_user(&pool).await?;
let guest = tests::create_user(&pool).await?;
let stranger = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::GET
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::POST
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::PUT
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
&workspace.workspace_id.to_string(),
&Method::DELETE
)
.await?
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&user.uid),
"new collab oid",
&Method::POST
)
.await?,
"should have access to non-existent collab oid"
);
assert!(
access_control
.can_access_http_method(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
&Method::GET
)
.await?,
"guest should have read access"
);
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&guest.uid),
&workspace.workspace_id.to_string(),
&Method::POST
)
.await?,
"guest should not have write access"
);
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&stranger.uid),
&workspace.workspace_id.to_string(),
&Method::GET
)
.await?,
"stranger should not have read access"
);
assert!(
!access_control
.can_access_http_method(
CollabUserId::UserId(&stranger.uid),
&workspace.workspace_id.to_string(),
&Method::POST
)
.await?,
"stranger should not have write access"
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_send_receive_collab_update(
pool: PgPool,
) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let user = tests::create_user(&pool).await?;
let guest = tests::create_user(&pool).await?;
let stranger = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
assert!(
access_control
.can_send_collab_update(&user.uid, &workspace.workspace_id.to_string())
.await?
);
assert!(
access_control
.can_receive_collab_update(&user.uid, &workspace.workspace_id.to_string())
.await?
);
assert!(
!access_control
.can_send_collab_update(&guest.uid, &workspace.workspace_id.to_string())
.await?,
"guest cannot send collab update"
);
assert!(
access_control
.can_receive_collab_update(&guest.uid, &workspace.workspace_id.to_string())
.await?,
"guest can receive collab update"
);
assert!(
!access_control
.can_send_collab_update(&stranger.uid, &workspace.workspace_id.to_string())
.await?,
"stranger cannot send collab update"
);
assert!(
!access_control
.can_receive_collab_update(&stranger.uid, &workspace.workspace_id.to_string())
.await?,
"stranger cannot receive collab update"
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_collab_access_control_cache_collab_access_level(
pool: PgPool,
) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let access_control = access_control.new_collab_access_control();
let uid = 123;
let oid = "collab::oid".to_owned();
access_control
.cache_collab_access_level(CollabUserId::UserId(&uid), &oid, AFAccessLevel::FullAccess)
.await?;
assert_eq!(
AFAccessLevel::FullAccess,
access_control
.get_collab_access_level(CollabUserId::UserId(&uid), &oid)
.await?
);
access_control
.cache_collab_access_level(CollabUserId::UserId(&uid), &oid, AFAccessLevel::ReadOnly)
.await?;
assert_eq!(
AFAccessLevel::ReadOnly,
access_control
.get_collab_access_level(CollabUserId::UserId(&uid), &oid)
.await?
);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_casbin_access_control_update_remove(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let listeners = PgListeners::new(&pool).await?;
let access_control = CasbinAccessControl::new(
pool.clone(),
listeners.subscribe_collab_member_change(),
listeners.subscribe_workspace_member_change(),
enforcer,
);
let uid = 123;
assert!(
access_control
.update(
&uid,
&ObjectType::Workspace("123"),
&ActionType::Role(AFRole::Owner)
)
.await?
);
assert!(access_control.enforcer.read().await.enforce((
uid.to_string(),
ObjectType::Workspace("123").to_string(),
i32::from(AFRole::Owner).to_string(),
))?);
assert!(
access_control
.remove(&uid, &ObjectType::Workspace("123"))
.await?
);
assert!(!access_control.enforcer.read().await.enforce((
uid.to_string(),
ObjectType::Workspace("123").to_string(),
i32::from(AFRole::Owner).to_string(),
))?);
Ok(())
}
}

454
src/biz/casbin/adapter.rs Normal file
View file

@ -0,0 +1,454 @@
use super::{Action, ObjectType};
use async_trait::async_trait;
use casbin::error::AdapterError;
use casbin::Adapter;
use casbin::Filter;
use casbin::Model;
use casbin::Result;
use database::collab::select_all_collab_members;
use database::workspace::select_all_workspace_members;
use database_entity::dto::AFAccessLevel;
use database_entity::dto::AFCollabMember;
use database_entity::pg_row::AFWorkspaceMemberRow;
use sqlx::PgPool;
/// Implmentation of [`casbin::Adapter`] for access control authorisation.
/// Access control policies that are managed by workspace and collab CRUD.
pub struct PgAdapter {
pg_pool: PgPool,
}
impl PgAdapter {
pub fn new(pg_pool: PgPool) -> Self {
Self { pg_pool }
}
}
fn create_collab_policies(collab_members: Vec<(String, Vec<AFCollabMember>)>) -> Vec<Vec<String>> {
let mut policies: Vec<Vec<String>> = Vec::new();
for (oid, members) in collab_members {
for m in members {
let p = [
m.uid.to_string(),
ObjectType::Collab(&oid).to_string(),
i32::from(m.permission.access_level).to_string(),
]
.to_vec();
policies.push(p);
}
}
policies
}
fn create_workspace_policies(
workspace_members: Vec<(String, Vec<AFWorkspaceMemberRow>)>,
) -> Vec<Vec<String>> {
let mut policies: Vec<Vec<String>> = Vec::new();
for (oid, members) in workspace_members {
for m in members {
let p = [
m.uid.to_string(),
ObjectType::Workspace(&oid).to_string(),
i32::from(m.role).to_string(),
]
.to_vec();
policies.push(p);
}
}
policies
}
#[async_trait]
impl Adapter for PgAdapter {
async fn load_policy(&mut self, model: &mut dyn Model) -> Result<()> {
let workspace_members = select_all_workspace_members(&self.pg_pool)
.await
.map_err(|err| AdapterError(Box::new(err)))?;
let workspace_policies = create_workspace_policies(workspace_members);
// Policy definition `p` of type `p`. See `model.conf`
model.add_policies("p", "p", workspace_policies);
let collab_members = select_all_collab_members(&self.pg_pool)
.await
.map_err(|err| AdapterError(Box::new(err)))?;
let collab_policies = create_collab_policies(collab_members);
// Policy definition `p` of type `p`. See `model.conf`
model.add_policies("p", "p", collab_policies);
// Grouping definition of role to action.
let af_access_levels = [
AFAccessLevel::ReadOnly,
AFAccessLevel::ReadAndComment,
AFAccessLevel::ReadAndWrite,
AFAccessLevel::FullAccess,
];
let mut grouping_policies = Vec::new();
for level in af_access_levels {
// All levels can read
grouping_policies.push([i32::from(level).to_string(), Action::Read.to_string()].to_vec());
if level.can_write() {
grouping_policies.push([i32::from(level).to_string(), Action::Write.to_string()].to_vec());
}
if level.can_delete() {
grouping_policies.push([i32::from(level).to_string(), Action::Delete.to_string()].to_vec());
}
}
// Grouping definition `g` of type `g`. See `model.conf`
model.add_policies("g", "g", grouping_policies);
Ok(())
}
async fn load_filtered_policy<'a>(&mut self, m: &mut dyn Model, _f: Filter<'a>) -> Result<()> {
// No support for filtered.
self.load_policy(m).await
}
async fn save_policy(&mut self, _m: &mut dyn Model) -> Result<()> {
// unimplemented!()
//
// Adapter is used only for loading policies from database
// since policies are managed by workspace and collab CRUD.
Ok(())
}
async fn clear_policy(&mut self) -> Result<()> {
// unimplemented!()
//
// Adapter is used only for loading policies from database
// since policies are managed by workspace and collab CRUD.
Ok(())
}
fn is_filtered(&self) -> bool {
// No support for filtered.
false
}
async fn add_policy(&mut self, _sec: &str, _ptype: &str, _rule: Vec<String>) -> Result<bool> {
// unimplemented!()
//
// Adapter is used only for loading policies from database
// since policies are managed by workspace and collab CRUD.
Ok(true)
}
async fn add_policies(
&mut self,
_sec: &str,
_ptype: &str,
_rules: Vec<Vec<String>>,
) -> Result<bool> {
// unimplemented!()
//
// Adapter is used only for loading policies from database
// since policies are managed by workspace and collab CRUD.
Ok(true)
}
async fn remove_policy(&mut self, _sec: &str, _ptype: &str, _rule: Vec<String>) -> Result<bool> {
// unimplemented!()
//
// Adapter is used only for loading policies from database
// since policies are managed by workspace and collab CRUD.
Ok(true)
}
async fn remove_policies(
&mut self,
_sec: &str,
_ptype: &str,
_rules: Vec<Vec<String>>,
) -> Result<bool> {
// unimplemented!()
//
// Adapter is used only for loading policies from database
// since policies are managed by workspace and collab CRUD.
Ok(true)
}
async fn remove_filtered_policy(
&mut self,
_sec: &str,
_ptype: &str,
_field_index: usize,
_field_values: Vec<String>,
) -> Result<bool> {
// unimplemented!()
//
// Adapter is used only for loading policies from database
// since policies are managed by workspace and collab CRUD.
Ok(true)
}
}
#[cfg(test)]
mod tests {
use anyhow::{anyhow, Context};
use casbin::{CoreApi, DefaultModel, Enforcer};
use database_entity::dto::AFRole;
use shared_entity::dto::workspace_dto::CreateWorkspaceMember;
use crate::biz;
use crate::biz::casbin::tests;
use super::*;
#[sqlx::test(migrations = false)]
async fn test_create_user(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let user = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Workspace(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFRole::Owner).to_string(),
))
.context("user should be owner of its workspace")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::FullAccess).to_string(),
))
.context("user should have full access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should be able to delete its collab")?);
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_add_users_to_workspace(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let user_main = tests::create_user(&pool).await?;
let user_owner = tests::create_user(&pool).await?;
let user_member = tests::create_user(&pool).await?;
let user_guest = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user_main.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
let members = vec![
CreateWorkspaceMember {
email: user_owner.email.clone(),
role: AFRole::Owner,
},
CreateWorkspaceMember {
email: user_member.email.clone(),
role: AFRole::Member,
},
CreateWorkspaceMember {
email: user_guest.email.clone(),
role: AFRole::Guest,
},
];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_main.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
{
// Owner
let user = user_owner;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::FullAccess).to_string(),
))
.context("owner should have full access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should be able to delete its collab")?);
}
{
// Member
let user = user_member;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should have read write access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should be able to read its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should be able to write its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should not be able to delete its collab")?);
}
{
// Guest
let user = user_guest;
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadOnly).to_string(),
))
.context("guest should have read only access of its collab")?);
assert!(enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Read.to_string(),
))
.context("user should not be able to read its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Write.to_string(),
))
.context("user should not be able to write its collab")?);
assert!(!enforcer
.enforce((
user.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
Action::Delete.to_string(),
))
.context("user should not be able to delete its collab")?);
}
Ok(())
}
#[sqlx::test(migrations = false)]
async fn test_reload_policy_after_adding_user_to_workspace(pool: PgPool) -> anyhow::Result<()> {
tests::setup_db(&pool).await?;
let user_owner = tests::create_user(&pool).await?;
let user_member = tests::create_user(&pool).await?;
// Get workspace details
let workspace = database::workspace::select_user_workspace(&pool, &user_owner.uuid)
.await?
.into_iter()
.next()
.ok_or(anyhow!("workspace should be created"))?;
// Create enforcer before adding user to workspace
let model = DefaultModel::from_str(tests::MODEL_CONF).await?;
let mut enforcer = Enforcer::new(model, PgAdapter::new(pool.clone())).await?;
let members = vec![CreateWorkspaceMember {
email: user_member.email.clone(),
role: AFRole::Member,
}];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_owner.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
assert!(!enforcer
.enforce((
user_member.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should not have read write access to collab before reload")?);
enforcer.load_policy().await?;
assert!(enforcer
.enforce((
user_member.uid.to_string(),
ObjectType::Collab(&workspace.workspace_id.to_string()).to_string(),
i32::from(AFAccessLevel::ReadAndWrite).to_string(),
))
.context("member should have read write access to collab")?);
Ok(())
}
}

161
src/biz/casbin/mod.rs Normal file
View file

@ -0,0 +1,161 @@
use database_entity::dto::{AFAccessLevel, AFRole};
pub mod access_control;
pub mod adapter;
pub const MODEL_CONF: &str = r###"
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _ # role to action
g2 = _, _ # worksheet to collab
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = r.sub == p.sub && g2(p.obj, r.obj) && g(p.act, r.act)
"###;
/// Represents the entity stored at the index of the access control policy.
/// `user_id, object_id, role/action`
///
/// E.g. user1, collab::123, Owner
const POLICY_FIELD_INDEX_USER: usize = 0;
const POLICY_FIELD_INDEX_OBJECT: usize = 1;
const POLICY_FIELD_INDEX_ACTION: usize = 2;
/// Represents the entity stored at the index of the grouping.
/// `role, action`
///
/// E.g. Owner, Write
#[allow(dead_code)]
const GROUPING_FIELD_INDEX_ROLE: usize = 0;
#[allow(dead_code)]
const GROUPING_FIELD_INDEX_ACTION: usize = 1;
/// Represents the object type that is stored in the access control policy.
#[derive(Debug)]
enum ObjectType<'id> {
/// Stored as `workspace::<uuid>`
Workspace(&'id str),
/// Stored as `collab::<uuid>`
Collab(&'id str),
}
impl ToString for ObjectType<'_> {
fn to_string(&self) -> String {
match self {
ObjectType::Collab(s) => format!("collab::{}", s),
ObjectType::Workspace(s) => format!("workspace::{}", s),
}
}
}
/// Represents the action type that is stored in the access control policy.
#[derive(Debug)]
enum ActionType {
Role(AFRole),
Level(AFAccessLevel),
}
/// Represents the actions that can be performed on objects.
#[derive(Debug)]
enum Action {
Read,
Write,
Delete,
}
impl ToString for Action {
fn to_string(&self) -> String {
match self {
Action::Read => "read".to_owned(),
Action::Write => "write".to_owned(),
Action::Delete => "delete".to_owned(),
}
}
}
#[cfg(test)]
mod tests {
use anyhow::Context;
use lazy_static::lazy_static;
use snowflake::Snowflake;
use sqlx::PgPool;
use tokio::sync::RwLock;
use uuid::Uuid;
pub const MODEL_CONF: &str = r#"
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _ # role to action
g2 = _, _ # worksheet to collab
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = r.sub == p.sub && g2(p.obj, r.obj) && g(p.act, r.act)
"#;
lazy_static! {
pub static ref ID_GEN: RwLock<Snowflake> = RwLock::new(Snowflake::new(1));
}
pub async fn setup_db(pool: &PgPool) -> anyhow::Result<()> {
// Have to manually manage schema and tables managed by gotrue but referenced by our
// migration scripts.
// Create schema and tables
sqlx::query(r#"create schema auth"#).execute(pool).await?;
sqlx::query(
r#"create table auth.users(
id uuid NOT NULL UNIQUE,
deleted_at timestamptz null,
CONSTRAINT users_pkey PRIMARY KEY (id)
)"#,
)
.execute(pool)
.await?;
// Manually run migration after creating required objects above.
sqlx::migrate!().run(pool).await?;
// Remove foreign key constraint
sqlx::query(r#"alter table public.af_user drop constraint af_user_email_foreign_key"#)
.execute(pool)
.await?;
Ok(())
}
#[derive(Debug, Clone)]
pub struct User {
pub uid: i64,
pub uuid: Uuid,
pub email: String,
}
pub async fn create_user(pool: &PgPool) -> anyhow::Result<User> {
// Create user and workspace
let uid = ID_GEN.write().await.next_id();
let uuid = Uuid::new_v4();
let email = format!("{}@appflowy.io", uuid);
let name = uuid.to_string();
database::user::create_user(pool, uid, &uuid, &email, &name)
.await
.context("create user")?;
Ok(User { uid, uuid, email })
}
}

View file

@ -166,7 +166,7 @@ async fn reload_collab_member_status_from_db(
member_status_by_uid: &Arc<RwLock<MemberStatusByUid>>,
) -> Result<MemberStatus, AppError> {
let member = database::collab::select_collab_member(uid, oid, pg_pool).await?;
let status = MemberStatus::Valid(member.permission.access_level.clone());
let status = MemberStatus::Valid(member.permission.access_level);
cache_collab_member_status(
uid,
oid,

View file

@ -9,8 +9,8 @@ use database_entity::dto::{
};
use itertools::{Either, Itertools};
use crate::biz::collab::access_control::{CollabAccessControlImpl, CollabStorageAccessControlImpl};
use crate::biz::workspace::access_control::WorkspaceAccessControlImpl;
use crate::biz::casbin::access_control::{CasbinCollabAccessControl, CasbinWorkspaceAccessControl};
use crate::biz::collab::access_control::CollabStorageAccessControlImpl;
use anyhow::Context;
use app_error::AppError;
use collab::core::collab_plugin::EncodedCollabV1;
@ -24,17 +24,17 @@ use tracing::{event, info, instrument};
use validator::Validate;
pub type CollabPostgresDBStorage = CollabStorageWrapper<
CollabStorageAccessControlImpl<CollabAccessControlImpl, WorkspaceAccessControlImpl>,
CollabStorageAccessControlImpl<CasbinCollabAccessControl, CasbinWorkspaceAccessControl>,
>;
pub async fn init_collab_storage(
pg_pool: PgPool,
collab_access_control: Arc<CollabAccessControlImpl>,
workspace_access_control: Arc<WorkspaceAccessControlImpl>,
collab_access_control: CasbinCollabAccessControl,
workspace_access_control: CasbinWorkspaceAccessControl,
) -> CollabPostgresDBStorage {
let access_control = CollabStorageAccessControlImpl {
collab_access_control,
workspace_access_control,
collab_access_control: collab_access_control.into(),
workspace_access_control: workspace_access_control.into(),
};
let collab_storage_impl = CollabStoragePgImpl::new(pg_pool);
CollabStorageWrapper::new(collab_storage_impl, access_control)

View file

@ -1,3 +1,4 @@
pub mod casbin;
pub mod collab;
pub mod pg_listener;
pub mod user;

View file

@ -1,3 +1,4 @@
#![allow(unused)]
use crate::biz::workspace::member_listener::{WorkspaceMemberAction, WorkspaceMemberNotification};
use crate::component::auth::jwt::UserUuid;
use crate::middleware::access_control_mw::{AccessResource, HttpAccessControlService};

View file

@ -1,4 +1,3 @@
use crate::component::auth::jwt::UserUuid;
use anyhow::Context;
use app_error::AppError;
use database::collab::upsert_collab_member_with_txn;
@ -103,7 +102,7 @@ pub async fn add_workspace_members(
}
pub async fn remove_workspace_members(
user_uuid: &UserUuid,
user_uuid: &Uuid,
pg_pool: &PgPool,
workspace_id: &Uuid,
member_emails: &[String],

View file

@ -14,6 +14,12 @@ pub struct Config {
pub websocket: WebsocketSetting,
pub redis_uri: Secret<String>,
pub s3: S3Setting,
pub casbin: CasbinSetting,
}
#[derive(serde::Deserialize, Clone, Debug)]
pub struct CasbinSetting {
pub pool_size: u32,
}
#[derive(serde::Deserialize, Clone, Debug)]
@ -106,6 +112,19 @@ impl DatabaseSetting {
pub fn with_db(&self) -> PgConnectOptions {
self.without_db().database(&self.database_name)
}
/// Generate a postgresql connection string from the database settings.
pub fn to_pg_url(&self) -> String {
let ssl_mode = if self.require_ssl {
"require"
} else {
"prefer"
};
format!(
"postgres://{}:{}@{}:{}/{}?sslmode={}",
self.username, self.password, self.host, self.port, self.database_name, ssl_mode
)
}
}
pub fn get_configuration(app_env: &Environment) -> Result<Config, config::ConfigError> {

View file

@ -1,7 +1,8 @@
use crate::biz::collab::access_control::CollabAccessControlImpl;
use crate::biz::casbin::access_control::{
CasbinAccessControl, CasbinCollabAccessControl, CasbinWorkspaceAccessControl,
};
use crate::biz::collab::storage::CollabPostgresDBStorage;
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::access_control::WorkspaceAccessControlImpl;
use crate::component::auth::LoggedUser;
use crate::config::config::Config;
use chrono::{DateTime, Utc};
@ -21,10 +22,11 @@ pub struct AppState {
pub gotrue_client: gotrue::api::Client,
pub redis_client: redis::aio::ConnectionManager,
pub collab_storage: Arc<CollabPostgresDBStorage>,
pub collab_access_control: Arc<CollabAccessControlImpl>,
pub workspace_access_control: Arc<WorkspaceAccessControlImpl>,
pub collab_access_control: CasbinCollabAccessControl,
pub workspace_access_control: CasbinWorkspaceAccessControl,
pub bucket_storage: Arc<S3BucketStorage>,
pub pg_listeners: Arc<PgListeners>,
pub casbin_access_control: CasbinAccessControl,
}
impl AppState {