feat: Collab access control (#120)

* chore: check collab message with access permission

* chore: imple collab permission service

* refactor: migrations

* chore: collab member ops

* chore: collab permission

* chore: update can edit workspace collab

* chore: fix test

* feat: fetch collab members

* chore: fix test

* chore: fix client api

* chore: check permission for collab storage proxy
This commit is contained in:
Nathan.fooo 2023-10-17 14:00:04 +08:00 committed by GitHub
parent ebc2e7ebf6
commit 9dc7bbeee0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
60 changed files with 1647 additions and 242 deletions

View file

@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT rp.permission_id \n FROM af_role_permissions rp\n JOIN af_roles ON rp.role_id = af_roles.id\n WHERE af_roles.name = 'Owner';\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "permission_id",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "42d345fbdc25ad7b36d35515079e00e018e4f58a9a4ac4efbfa4768cadcc6c85"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT id FROM af_roles WHERE name = 'Owner'",
"query": "\n SELECT id\n FROM af_permissions\n WHERE access_level = $1\n ",
"describe": {
"columns": [
{
@ -10,11 +10,13 @@
}
],
"parameters": {
"Left": []
"Left": [
"Int4"
]
},
"nullable": [
false
]
},
"hash": "0ddce840c29eb21dcc803f45e9c4d0719ae8969cbd309bf478f43e6be340a933"
"hash": "54346ddd488f0c8fde7125f17827bcc89ecbf285c4e2270b38bb4d7082b40db2"
}

View file

@ -0,0 +1,23 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT EXISTS (SELECT 1 FROM af_collab_member WHERE oid = $1 AND uid = $2 LIMIT 1)\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Text",
"Int8"
]
},
"nullable": [
null
]
},
"hash": "7af4023278eea11cf5db92040bd8947bd14df72a93eded96658187d3f9dc0e81"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n WITH workspace_check AS (\n SELECT EXISTS(\n SELECT 1\n FROM af_workspace_member\n WHERE af_workspace_member.uid = (SELECT uid FROM af_user WHERE uuid = $1) AND\n af_workspace_member.workspace_id = $3\n ) AS \"workspace_exists\"\n ),\n collab_check AS (\n SELECT EXISTS(\n SELECT 1\n FROM af_collab_member\n WHERE oid = $2\n ) AS \"collab_exists\"\n )\n SELECT \n NOT collab_check.collab_exists OR (\n workspace_check.workspace_exists AND \n EXISTS(\n SELECT 1\n FROM af_collab_member\n JOIN af_roles ON af_collab_member.role_id = af_roles.id\n WHERE \n af_collab_member.uid = (SELECT uid FROM af_user WHERE uuid = $1) AND \n af_collab_member.oid = $2 AND \n (af_roles.id = 1 OR af_roles.id = 2)\n )\n ) AS \"permission_check\"\n FROM workspace_check, collab_check;\n ",
"query": "\n WITH workspace_check AS (\n SELECT EXISTS(\n SELECT 1\n FROM af_workspace_member\n WHERE af_workspace_member.uid = (SELECT uid FROM af_user WHERE uuid = $1) AND\n af_workspace_member.workspace_id = $3\n ) AS \"workspace_exists\"\n ),\n collab_check AS (\n SELECT EXISTS(\n SELECT 1\n FROM af_collab_member\n WHERE oid = $2\n ) AS \"collab_exists\"\n )\n SELECT \n NOT collab_check.collab_exists OR (\n workspace_check.workspace_exists AND \n EXISTS(\n SELECT 1\n FROM af_collab_member\n JOIN af_permissions ON af_collab_member.permission_id = af_permissions.id\n WHERE \n af_collab_member.uid = (SELECT uid FROM af_user WHERE uuid = $1) AND \n af_collab_member.oid = $2 AND \n af_permissions.access_level > 20\n )\n ) AS \"permission_check\"\n FROM workspace_check, collab_check;\n ",
"describe": {
"columns": [
{
@ -20,5 +20,5 @@
null
]
},
"hash": "1f391e04f66a30f599a16eda8b6a222ded871013cac0a1e87fdc3c52c1b541a0"
"hash": "83b14b1f8483785ddfeab7752f698b663e78936c40a028492aab63e9dda9dc30"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO af_collab_member (oid, uid, role_id) VALUES ($1, $2, $3)",
"query": "INSERT INTO af_collab_member (oid, uid, permission_id) VALUES ($1, $2, $3)",
"describe": {
"columns": [],
"parameters": {
@ -12,5 +12,5 @@
},
"nullable": []
},
"hash": "4fecbf9cd278961ab959521064627c1f5b0977ef9d3e28871ccc7e912b617c5a"
"hash": "a5246bb745e198a16c7ad85442ea85e0d3bb9a55d5be7dabb6076ac3748b1457"
}

View file

@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": " \n INSERT INTO af_collab_member (uid, oid, permission_id)\n VALUES ($1, $2, $3)\n ON CONFLICT (uid, oid)\n DO UPDATE\n SET permission_id = excluded.permission_id;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Int4"
]
},
"nullable": []
},
"hash": "c5ba2a6febad1655b75eecad9d088694a6f0f93a84c32a0228a7cfd1a6f063f1"
}

3
Cargo.lock generated
View file

@ -466,6 +466,7 @@ dependencies = [
"actix-cors",
"actix-http",
"actix-identity",
"actix-router",
"actix-rt",
"actix-service",
"actix-session",
@ -1427,8 +1428,10 @@ dependencies = [
"collab-entity",
"serde",
"serde_json",
"serde_repr",
"sqlx",
"thiserror",
"tracing",
"uuid",
"validator",
]

View file

@ -14,6 +14,7 @@ actix-web-actors = { version = "4.2.0" }
actix-service = "2.0.2"
actix-identity = "0.6.0"
actix-cors = "0.6.4"
actix-router = "0.5.1"
actix-session = { version = "0.8", features = ["redis-rs-tls-session"] }
openssl = "0.10.45"

View file

@ -2,8 +2,9 @@ use crate::notify::{ClientToken, TokenStateReceiver};
use anyhow::{anyhow, Context};
use bytes::Bytes;
use database_entity::{
AFBlobRecord, AFUserProfileView, AFWorkspaceMember, BatchQueryCollabParams,
BatchQueryCollabResult, InsertCollabParams,
AFBlobRecord, AFCollabMember, AFCollabMembers, AFUserProfileView, AFWorkspaceMember,
BatchQueryCollabParams, BatchQueryCollabResult, CollabMemberIdentify, InsertCollabMemberParams,
InsertCollabParams, QueryCollabMembers, UpdateCollabMemberParams,
};
use database_entity::{AFWorkspaces, QueryCollabParams};
use database_entity::{DeleteCollabParams, RawData};
@ -321,7 +322,7 @@ impl Client {
}
#[instrument(level = "debug", skip_all, err)]
pub async fn profile(&self) -> Result<AFUserProfileView, AppError> {
pub async fn get_profile(&self) -> Result<AFUserProfileView, AppError> {
let url = format!("{}/api/user/profile", self.base_url);
let resp = self
.http_client_with_auth(Method::GET, &url)
@ -334,7 +335,7 @@ impl Client {
}
#[instrument(level = "debug", skip_all, err)]
pub async fn workspaces(&self) -> Result<AFWorkspaces, AppError> {
pub async fn get_workspaces(&self) -> Result<AFWorkspaces, AppError> {
let url = format!("{}/api/workspace/list", self.base_url);
let resp = self
.http_client_with_auth(Method::GET, &url)
@ -597,6 +598,94 @@ impl Client {
AppResponse::<()>::from_response(resp).await?.into_error()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn add_collab_member(&self, params: InsertCollabMemberParams) -> Result<(), AppError> {
let url = format!(
"{}/api/workspace/{}/collab/{}/member",
self.base_url, params.workspace_id, &params.object_id
);
let resp = self
.http_client_with_auth(Method::POST, &url)
.await?
.json(&params)
.send()
.await?;
AppResponse::<()>::from_response(resp).await?.into_error()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn get_collab_member(
&self,
params: CollabMemberIdentify,
) -> Result<AFCollabMember, AppError> {
let url = format!(
"{}/api/workspace/{}/collab/{}/member",
self.base_url, params.workspace_id, &params.object_id
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.json(&params)
.send()
.await?;
AppResponse::<AFCollabMember>::from_response(resp)
.await?
.into_data()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn update_collab_member(
&self,
params: UpdateCollabMemberParams,
) -> Result<(), AppError> {
let url = format!(
"{}/api/workspace/{}/collab/{}/member",
self.base_url, params.workspace_id, &params.object_id
);
let resp = self
.http_client_with_auth(Method::PUT, &url)
.await?
.json(&params)
.send()
.await?;
AppResponse::<()>::from_response(resp).await?.into_error()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn remove_collab_member(&self, params: CollabMemberIdentify) -> Result<(), AppError> {
let url = format!(
"{}/api/workspace/{}/collab/{}/member",
self.base_url, params.workspace_id, &params.object_id
);
let resp = self
.http_client_with_auth(Method::DELETE, &url)
.await?
.json(&params)
.send()
.await?;
AppResponse::<()>::from_response(resp).await?.into_error()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn get_collab_members(
&self,
params: QueryCollabMembers,
) -> Result<AFCollabMembers, AppError> {
let url = format!(
"{}/api/workspace/{}/collab/{}/member/list",
self.base_url, params.workspace_id, &params.object_id
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.json(&params)
.send()
.await?;
AppResponse::<AFCollabMembers>::from_response(resp)
.await?
.into_data()
}
pub fn ws_url(&self, device_id: &str) -> Result<String, AppError> {
let access_token = self.access_token()?;
Ok(format!("{}/{}/{}", self.ws_addr, access_token, device_id))

View file

@ -15,3 +15,5 @@ chrono = {version="0.4",features = ["serde"]}
uuid = { version = "1.4.1", features = ["serde", "v4"] }
thiserror = "1.0.47"
anyhow = "1.0.75"
tracing = "0.1"
serde_repr = "0.1.16"

View file

@ -3,8 +3,8 @@ use std::borrow::Cow;
#[derive(Debug, thiserror::Error)]
pub enum DatabaseError {
#[error("Record not found")]
RecordNotFound,
#[error("Record not found:{0}")]
RecordNotFound(String),
#[error(transparent)]
UnexpectedData(#[from] validator::ValidationErrors),
@ -33,14 +33,16 @@ pub enum DatabaseError {
impl DatabaseError {
pub fn is_not_found(&self) -> bool {
matches!(self, Self::RecordNotFound)
matches!(self, Self::RecordNotFound(_))
}
}
impl From<sqlx::Error> for DatabaseError {
fn from(value: sqlx::Error) -> Self {
match value {
Error::RowNotFound => DatabaseError::RecordNotFound,
Error::RowNotFound => {
DatabaseError::RecordNotFound("Can't find the row in the database".to_string())
},
_ => DatabaseError::SqlxError(value),
}
}

View file

@ -1,9 +1,11 @@
use chrono::{DateTime, Utc};
use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use sqlx::FromRow;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use tracing::error;
use uuid::Uuid;
use validator::{Validate, ValidationError};
@ -205,7 +207,22 @@ impl From<i32> for AFRole {
1 => AFRole::Owner,
2 => AFRole::Member,
3 => AFRole::Guest,
_ => panic!("Invalid value for AFRole"),
_ => {
error!("Invalid role id: {}", value);
AFRole::Guest
},
}
}
}
impl From<Option<i32>> for AFRole {
fn from(value: Option<i32>) -> Self {
match value {
None => {
error!("Invalid role id: None");
AFRole::Guest
},
Some(value) => value.into(),
}
}
}
@ -220,12 +237,80 @@ impl From<AFRole> for i32 {
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AFPermission {
/// The permission id
pub id: i32,
pub name: String,
pub access_level: AFAccessLevel,
pub description: String,
}
#[derive(Deserialize_repr, Serialize_repr, Eq, PartialEq, Debug, Clone)]
#[repr(i32)]
pub enum AFAccessLevel {
// Can't modify the value of the enum
ReadOnly = 10,
ReadAndComment = 20,
ReadAndWrite = 30,
FullAccess = 50,
}
impl AFAccessLevel {
pub fn can_write(&self) -> bool {
match self {
AFAccessLevel::ReadOnly | AFAccessLevel::ReadAndComment => false,
AFAccessLevel::ReadAndWrite | AFAccessLevel::FullAccess => true,
}
}
}
impl From<i32> for AFAccessLevel {
fn from(value: i32) -> Self {
// Can't modify the value of the enum
match value {
10 => AFAccessLevel::ReadOnly,
20 => AFAccessLevel::ReadAndComment,
30 => AFAccessLevel::ReadAndWrite,
50 => AFAccessLevel::FullAccess,
_ => {
error!("Invalid role id: {}", value);
AFAccessLevel::ReadOnly
},
}
}
}
impl From<AFAccessLevel> for i32 {
fn from(level: AFAccessLevel) -> Self {
level as i32
}
}
#[derive(FromRow, Serialize, Deserialize)]
pub struct AFWorkspaceMember {
pub email: String,
pub role: AFRole,
}
#[derive(Serialize, Deserialize)]
pub struct AFCollabMember {
pub uid: i64,
pub oid: String,
pub permission: AFPermission,
}
#[derive(Serialize, Deserialize)]
pub struct AFCollabMembers(pub Vec<AFCollabMember>);
#[derive(FromRow, Clone, Debug, Serialize, Deserialize)]
pub struct AFCollabMemberRow {
pub uid: i64,
pub oid: String,
pub permission_id: i64,
}
#[derive(FromRow, Serialize, Deserialize)]
pub struct AFBlobMetadata {
pub workspace_id: Uuid,
@ -260,3 +345,32 @@ pub enum QueryCollabResult {
#[derive(Serialize, Deserialize)]
pub struct BatchQueryCollabResult(pub HashMap<String, QueryCollabResult>);
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct InsertCollabMemberParams {
pub uid: i64,
#[validate(custom = "validate_not_empty_str")]
pub workspace_id: String,
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
pub access_level: AFAccessLevel,
}
pub type UpdateCollabMemberParams = InsertCollabMemberParams;
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct CollabMemberIdentify {
pub uid: i64,
#[validate(custom = "validate_not_empty_str")]
pub workspace_id: String,
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct QueryCollabMembers {
#[validate(custom = "validate_not_empty_str")]
pub workspace_id: String,
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
}

View file

@ -1,26 +1,28 @@
use anyhow::Context;
use collab_entity::CollabType;
use database_entity::{
database_error::DatabaseError, AFCollabSnapshot, AFCollabSnapshots, BatchQueryCollab,
InsertCollabParams, QueryCollabResult, RawData,
database_error::DatabaseError, AFAccessLevel, AFCollabMember, AFCollabSnapshot,
AFCollabSnapshots, AFPermission, BatchQueryCollab, InsertCollabParams, QueryCollabResult,
RawData,
};
use sqlx::{PgPool, Transaction};
use sqlx::postgres::PgRow;
use sqlx::{Error, PgPool, Row, Transaction};
use std::collections::HashMap;
use std::{ops::DerefMut, str::FromStr};
use tracing::{error, trace};
use tracing::{error, instrument, trace};
use uuid::Uuid;
pub async fn collab_exists(pg_pool: &PgPool, oid: &str) -> Result<bool, sqlx::Error> {
sqlx::query_scalar!(
let result = sqlx::query_scalar!(
r#"
SELECT EXISTS (SELECT 1 FROM af_collab WHERE oid = $1 LIMIT 1)
"#,
&oid,
)
.fetch_one(pg_pool)
.await?
.ok_or(sqlx::Error::RowNotFound)
.await;
transform_record_not_found_error(result)
}
/// Inserts a new row into the `af_collab` table or updates an existing row if it matches the
@ -47,7 +49,7 @@ pub async fn collab_exists(pg_pool: &PgPool, oid: &str) -> Result<bool, sqlx::Er
///
pub async fn insert_af_collab(
tx: &mut Transaction<'_, sqlx::Postgres>,
owner_uid: i64,
owner_uid: &i64,
params: &InsertCollabParams,
) -> Result<(), DatabaseError> {
let encrypt = 0;
@ -88,10 +90,17 @@ pub async fn insert_af_collab(
}
},
None => {
// Get the 'Owner' role_id from af_roles
let role_id: i32 = sqlx::query_scalar!("SELECT id FROM af_roles WHERE name = 'Owner'")
.fetch_one(tx.deref_mut())
.await?;
// Get the permission_id of the Owner
let permission_id: i32 = sqlx::query_scalar!(
r#"
SELECT rp.permission_id
FROM af_role_permissions rp
JOIN af_roles ON rp.role_id = af_roles.id
WHERE af_roles.name = 'Owner';
"#
)
.fetch_one(tx.deref_mut())
.await?;
trace!(
"Insert new af_collab row: {}:{}:{}",
@ -102,16 +111,16 @@ pub async fn insert_af_collab(
// Insert into af_collab_member
sqlx::query!(
"INSERT INTO af_collab_member (oid, uid, role_id) VALUES ($1, $2, $3)",
"INSERT INTO af_collab_member (oid, uid, permission_id) VALUES ($1, $2, $3)",
params.object_id,
owner_uid,
role_id
permission_id
)
.execute(tx.deref_mut())
.await
.context(format!(
"Insert af_collab_member failed: {}:{}:{}",
owner_uid, params.object_id, role_id
owner_uid, params.object_id, permission_id
))?;
sqlx::query!(
@ -289,3 +298,156 @@ pub async fn get_all_snapshots(
.await?;
Ok(AFCollabSnapshots(snapshots))
}
#[instrument(skip(pg_pool), err)]
pub async fn insert_collab_member(
uid: i64,
oid: &str,
access_level: &AFAccessLevel,
pg_pool: &PgPool,
) -> Result<(), DatabaseError> {
let access_level: i32 = access_level.clone().into();
let mut txn = pg_pool
.begin()
.await
.context("failed to acquire a transaction to insert collab member")?;
let permission_id = sqlx::query_scalar!(
r#"
SELECT id
FROM af_permissions
WHERE access_level = $1
"#,
access_level
)
.fetch_one(txn.deref_mut())
.await
.context("Get permission id from access level fail")?;
sqlx::query!(
r#"
INSERT INTO af_collab_member (uid, oid, permission_id)
VALUES ($1, $2, $3)
ON CONFLICT (uid, oid)
DO UPDATE
SET permission_id = excluded.permission_id;
"#,
uid,
oid,
permission_id
)
.execute(txn.deref_mut())
.await
.context("oops fuck")?;
txn
.commit()
.await
.context("failed to commit the transaction to insert collab member")?;
Ok(())
}
pub async fn delete_collab_member(
uid: i64,
oid: &str,
pg_pool: &PgPool,
) -> Result<(), DatabaseError> {
sqlx::query("DELETE FROM af_collab_member WHERE uid = $1 AND oid = $2")
.bind(uid)
.bind(oid)
.execute(pg_pool)
.await?;
Ok(())
}
pub async fn select_collab_members(
oid: &str,
pg_pool: &PgPool,
) -> Result<Vec<AFCollabMember>, DatabaseError> {
let members = sqlx::query(
r#"
SELECT af_collab_member.uid, af_collab_member.oid, af_permissions.id, af_permissions.name, af_permissions.access_level, af_permissions.description
FROM af_collab_member
JOIN af_permissions ON af_collab_member.permission_id = af_permissions.id
WHERE af_collab_member.oid = $1
"#,
)
.bind(oid)
.try_map(collab_member_try_from_row)
.fetch_all(pg_pool)
.await?;
Ok(members)
}
pub async fn select_collab_member(
uid: &i64,
oid: &str,
pg_pool: &PgPool,
) -> Result<AFCollabMember, DatabaseError> {
let member = sqlx::query(
r#"
SELECT af_collab_member.uid, af_collab_member.oid, af_permissions.id, af_permissions.name, af_permissions.access_level, af_permissions.description
FROM af_collab_member
JOIN af_permissions ON af_collab_member.permission_id = af_permissions.id
WHERE af_collab_member.uid = $1 AND af_collab_member.oid = $2
"#,
)
.bind(uid)
.bind(oid)
.try_map(collab_member_try_from_row)
.fetch_one(pg_pool)
.await?;
Ok(member)
}
fn collab_member_try_from_row(row: PgRow) -> Result<AFCollabMember, sqlx::Error> {
let access_level = AFAccessLevel::from(row.try_get::<i32, _>(4)?);
let permission = AFPermission {
id: row.try_get(2)?,
name: row.try_get(3)?,
access_level,
description: row.try_get(5)?,
};
Ok(AFCollabMember {
uid: row.try_get(0)?,
oid: row.try_get(1)?,
permission,
})
}
#[inline]
pub async fn is_collab_member_exists(
uid: i64,
oid: &str,
pg_pool: &PgPool,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query_scalar!(
r#"
SELECT EXISTS (SELECT 1 FROM af_collab_member WHERE oid = $1 AND uid = $2 LIMIT 1)
"#,
&oid,
&uid,
)
.fetch_one(pg_pool)
.await;
transform_record_not_found_error(result)
}
#[inline]
fn transform_record_not_found_error(
result: Result<Option<bool>, sqlx::Error>,
) -> Result<bool, sqlx::Error> {
match result {
Ok(value) => Ok(value.unwrap_or(false)),
Err(err) => {
if let Error::RowNotFound = err {
Ok(false)
} else {
Err(err)
}
},
}
}

View file

@ -10,6 +10,7 @@ use database_entity::{
use sqlx::types::Uuid;
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Weak;
use crate::collab::collab_db_ops;
@ -46,7 +47,7 @@ pub trait CollabStorage: Clone + Send + Sync + 'static {
/// # Returns
///
/// * `Result<()>` - Returns `Ok(())` if the collaboration was created successfully, `Err` otherwise.
async fn insert_collab(&self, owner_uid: i64, params: InsertCollabParams) -> Result<()>;
async fn insert_collab(&self, uid: &i64, params: InsertCollabParams) -> Result<()>;
/// Retrieves a collaboration from the storage.
///
@ -57,10 +58,11 @@ pub trait CollabStorage: Clone + Send + Sync + 'static {
/// # Returns
///
/// * `Result<RawData>` - Returns the data of the collaboration if found, `Err` otherwise.
async fn get_collab(&self, params: QueryCollabParams) -> Result<RawData>;
async fn get_collab(&self, uid: &i64, params: QueryCollabParams) -> Result<RawData>;
async fn batch_get_collab(
&self,
uid: &i64,
queries: Vec<BatchQueryCollab>,
) -> HashMap<String, QueryCollabResult>;
@ -73,7 +75,7 @@ pub trait CollabStorage: Clone + Send + Sync + 'static {
/// # Returns
///
/// * `Result<()>` - Returns `Ok(())` if the collaboration was deleted successfully, `Err` otherwise.
async fn delete_collab(&self, object_id: &str) -> Result<()>;
async fn delete_collab(&self, uid: &i64, object_id: &str) -> Result<()>;
async fn create_snapshot(&self, params: InsertSnapshotParams) -> Result<()>;
@ -125,7 +127,7 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
.unwrap_or(false)
}
async fn insert_collab(&self, owner_uid: i64, params: InsertCollabParams) -> Result<()> {
async fn insert_collab(&self, uid: &i64, params: InsertCollabParams) -> Result<()> {
params.validate()?;
let mut transaction = self
@ -133,7 +135,7 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
.begin()
.await
.context("Failed to acquire a Postgres transaction to insert collab")?;
collab_db_ops::insert_af_collab(&mut transaction, owner_uid, &params).await?;
collab_db_ops::insert_af_collab(&mut transaction, uid, &params).await?;
transaction
.commit()
.await
@ -141,7 +143,7 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
Ok(())
}
async fn get_collab(&self, params: QueryCollabParams) -> Result<RawData> {
async fn get_collab(&self, _uid: &i64, params: QueryCollabParams) -> Result<RawData> {
params.validate()?;
match collab_db_ops::get_collab_blob(&self.pg_pool, &params.collab_type, &params.object_id)
.await
@ -151,7 +153,10 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
Ok(data)
},
Err(e) => match e {
sqlx::Error::RowNotFound => Err(DatabaseError::RecordNotFound),
sqlx::Error::RowNotFound => Err(DatabaseError::RecordNotFound(format!(
"Can't find the row for query: {:?}",
params
))),
_ => Err(e.into()),
},
}
@ -159,12 +164,13 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
async fn batch_get_collab(
&self,
_uid: &i64,
queries: Vec<BatchQueryCollab>,
) -> HashMap<String, QueryCollabResult> {
collab_db_ops::batch_get_collab_blob(&self.pg_pool, queries).await
}
async fn delete_collab(&self, object_id: &str) -> Result<()> {
async fn delete_collab(&self, _uid: &i64, object_id: &str) -> Result<()> {
collab_db_ops::delete_collab(&self.pg_pool, object_id).await?;
Ok(())
}

View file

@ -2,6 +2,7 @@ use anyhow::Context;
use database_entity::database_error::DatabaseError;
use sqlx::PgPool;
use tracing::instrument;
use uuid::Uuid;
pub async fn update_user_name(
pool: &PgPool,
@ -50,12 +51,12 @@ pub async fn create_user_if_not_exists(
Ok(affected_rows > 0)
}
pub async fn uid_from_uuid(pool: &PgPool, gotrue_uuid: &uuid::Uuid) -> Result<i64, DatabaseError> {
pub async fn select_uid_from_uuid(pool: &PgPool, user_uuid: &Uuid) -> Result<i64, DatabaseError> {
let uid = sqlx::query!(
r#"
SELECT uid FROM af_user WHERE uuid = $1
"#,
gotrue_uuid
user_uuid
)
.fetch_one(pool)
.await?

View file

@ -32,9 +32,6 @@ pub async fn select_user_is_workspace_owner(
user_uuid: &Uuid,
workspace_uuid: &Uuid,
) -> Result<bool, DatabaseError> {
// 1. Identifies the user's UID in the 'af_user' table using the provided user UUID ($2).
// 2. Then, it checks the 'af_workspace_member' table to find a record that matches the provided workspace_id ($1) and the identified UID.
// 3. It joins with 'af_roles' to ensure that the role associated with the workspace member is 'Owner'.
let exists = sqlx::query_scalar!(
r#"
SELECT EXISTS(
@ -62,6 +59,7 @@ pub async fn select_user_is_workspace_owner(
/// 1. user is the member of the workspace
/// 2. the collab object is not exist
/// 3. the collab object is exist and the user is the member of the collab and the role is owner or member
#[allow(dead_code)]
pub async fn select_user_can_edit_collab(
pg_pool: &PgPool,
user_uuid: &Uuid,
@ -91,11 +89,11 @@ pub async fn select_user_can_edit_collab(
EXISTS(
SELECT 1
FROM af_collab_member
JOIN af_roles ON af_collab_member.role_id = af_roles.id
JOIN af_permissions ON af_collab_member.permission_id = af_permissions.id
WHERE
af_collab_member.uid = (SELECT uid FROM af_user WHERE uuid = $1) AND
af_collab_member.oid = $2 AND
(af_roles.id = 1 OR af_roles.id = 2)
af_permissions.access_level > 20
)
) AS "permission_check"
FROM workspace_check, collab_check;

View file

@ -131,6 +131,10 @@ impl CollabMessage {
}
}
pub fn uid(&self) -> Option<i64> {
self.origin().and_then(|origin| origin.client_user_id())
}
pub fn object_id(&self) -> &str {
match self {
CollabMessage::ClientInit(value) => &value.object_id,

View file

@ -22,6 +22,7 @@ async-trait = "0.1.73"
anyhow = "1.0.75"
serde_repr = "0.1.6"
tokio-retry = "0.3.0"
reqwest = "0.11.18"
collab = { version = "0.1.0"}
collab-entity = { version = "0.1.0" }
@ -31,6 +32,8 @@ yrs = "0.16.5"
lib0 = "0.16.3"
chrono = "0.4.30"
realtime-entity = { workspace = true }
uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
actix = "0.13"
actix-web = { version = "4.3.1" }
@ -45,4 +48,3 @@ tracing-log = "0.1.1"
serde-aux = "4.2.0"
tempfile = "3.8.0"
assert-json-diff = "2.0.2"
uuid = { version = "1", features = ["v4"] }

View file

@ -9,7 +9,7 @@ use actix_web_actors::ws;
use bytes::Bytes;
use std::ops::Deref;
use crate::collaborate::CollabServer;
use crate::collaborate::{CollabPermission, CollabServer};
use crate::error::RealtimeError;
use actix_web_actors::ws::ProtocolError;
@ -18,22 +18,23 @@ use realtime_entity::collab_msg::CollabMessage;
use std::time::{Duration, Instant};
use tracing::error;
pub struct ClientWSSession<U: Unpin + RealtimeUser, S: Unpin + 'static> {
pub struct ClientSession<U: Unpin + RealtimeUser, S: Unpin + 'static, P: Unpin + CollabPermission> {
user: U,
hb: Instant,
pub server: Addr<CollabServer<S, U>>,
pub server: Addr<CollabServer<S, U, P>>,
heartbeat_interval: Duration,
client_timeout: Duration,
}
impl<U, S> ClientWSSession<U, S>
impl<U, S, P> ClientSession<U, S, P>
where
U: Unpin + RealtimeUser + Clone,
S: CollabStorage + Unpin,
P: CollabPermission + Unpin,
{
pub fn new(
user: U,
server: Addr<CollabServer<S, U>>,
server: Addr<CollabServer<S, U, P>>,
heartbeat_interval: Duration,
client_timeout: Duration,
) -> Self {
@ -80,10 +81,11 @@ where
}
}
impl<U, S> Actor for ClientWSSession<U, S>
impl<U, S, P> Actor for ClientSession<U, S, P>
where
U: Unpin + RealtimeUser,
S: Unpin + CollabStorage,
P: CollabPermission + Unpin,
{
type Context = ws::WebsocketContext<Self>;
@ -121,10 +123,11 @@ where
}
}
impl<U, S> Handler<RealtimeMessage> for ClientWSSession<U, S>
impl<U, S, P> Handler<RealtimeMessage> for ClientSession<U, S, P>
where
U: Unpin + RealtimeUser,
S: Unpin + CollabStorage,
P: CollabPermission + Unpin,
{
type Result = ();
@ -134,10 +137,11 @@ where
}
/// WebSocket message handler
impl<U, S> StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientWSSession<U, S>
impl<U, S, P> StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientSession<U, S, P>
where
U: Unpin + RealtimeUser + Clone,
S: Unpin + CollabStorage,
P: CollabPermission + Unpin,
{
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
@ -175,7 +179,6 @@ where
pub struct ClientWSSink(pub Recipient<RealtimeMessage>);
impl Deref for ClientWSSink {
type Target = Recipient<RealtimeMessage>;
fn deref(&self) -> &Self::Target {
&self.0
}
@ -183,13 +186,18 @@ impl Deref for ClientWSSink {
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct RealtimeUserImpl {
pub uid: i64,
pub uuid: String,
pub device_id: String,
}
impl RealtimeUserImpl {
pub fn new(uuid: String, device_id: String) -> Self {
Self { uuid, device_id }
pub fn new(uid: i64, uuid: String, device_id: String) -> Self {
Self {
uid,
uuid,
device_id,
}
}
}
@ -202,4 +210,8 @@ impl Display for RealtimeUserImpl {
}
}
impl RealtimeUser for RealtimeUserImpl {}
impl RealtimeUser for RealtimeUserImpl {
fn uid(&self) -> i64 {
self.uid
}
}

View file

@ -1,9 +1,11 @@
mod broadcast;
mod group;
mod permission;
mod plugin;
mod retry;
mod server;
pub use broadcast::*;
pub use permission::*;
pub use plugin::*;
pub use server::*;

View file

@ -0,0 +1,47 @@
use async_trait::async_trait;
use database_entity::AFAccessLevel;
use reqwest::Method;
use std::fmt::Display;
#[derive(Debug)]
pub enum CollabUserId<'a> {
UserId(&'a i64),
UserUuid(&'a uuid::Uuid),
}
#[async_trait]
pub trait CollabPermission: Sync + Send + 'static {
type Error: Display;
/// Return the access level of the user in the collab
/// If the collab object is not found, return None. Otherwise, return the access level of the user
async fn get_access_level(
&self,
user: CollabUserId<'_>,
oid: &str,
) -> Result<Option<AFAccessLevel>, Self::Error>;
/// Return true if the user from the HTTP request is allowed to access the collab object.
/// This function will be called very frequently, so it should be very fast.
///
async fn can_access_http_method(
&self,
user: CollabUserId<'_>,
oid: &str,
method: &Method,
) -> Result<bool, Self::Error>;
/// Return true if the user is allowed to send the message.
/// This function will be called very frequently, so it should be very fast.
///
/// The user can send the message if:
/// 1. user is the member of the collab object
/// 2. the permission level of the user is `ReadAndWrite` or `FullAccess`
async fn can_send_message(&self, uid: &i64, oid: &str) -> Result<bool, Self::Error>;
/// Return true if the user is allowed to send the message.
/// This function will be called very frequently, so it should be very fast.
///
/// The user can recv the message if the user is the member of the collab object
async fn can_receive_message(&self, uid: &i64, oid: &str) -> Result<bool, Self::Error>;
}

View file

@ -75,7 +75,7 @@ where
collab_type: self.collab_type.clone(),
};
match self.storage.get_collab(params).await {
match self.storage.get_collab(&self.uid, params).await {
Ok(raw_data) => match init_collab_with_raw_data(raw_data, doc) {
Ok(_) => {},
Err(e) => {
@ -85,7 +85,7 @@ where
},
Err(err) => {
match &err {
DatabaseError::RecordNotFound => {
DatabaseError::RecordNotFound(_) => {
let raw_data = {
let txn = doc.transact();
txn.encode_state_as_update_v1(&StateVector::default())
@ -96,7 +96,7 @@ where
raw_data,
&self.workspace_id,
);
match self.storage.insert_collab(self.uid, params).await {
match self.storage.insert_collab(&self.uid, params).await {
Ok(_) => {},
Err(err) => tracing::error!("{:?}", err),
}
@ -149,7 +149,7 @@ where
let uid = self.uid;
tokio::spawn(async move {
let object_id = params.object_id.clone();
match storage.insert_collab(uid, params).await {
match storage.insert_collab(&uid, params).await {
Ok(_) => tracing::debug!("[💭Server] end flushing collab: {}", object_id),
Err(err) => tracing::error!("save collab failed: {:?}", err),
}

View file

@ -1,13 +1,13 @@
use crate::collaborate::CollabClientStream;
use anyhow::{anyhow, Error};
use collab::core::origin::CollabOrigin;
use database::collab::CollabStorage;
use futures_util::SinkExt;
use parking_lot::Mutex;
use realtime_entity::collab_msg::CollabMessage;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::future;
use std::future::Future;
use std::iter::Take;
use std::pin::Pin;
@ -20,24 +20,28 @@ use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Action, Condition, Retry, RetryIf};
use crate::collaborate::group::CollabGroupCache;
use crate::collaborate::permission::CollabPermission;
use crate::error::RealtimeError;
use tracing::{error, trace, warn};
pub(crate) struct SubscribeGroupIfNeed<'a, U, S> {
pub(crate) struct SubscribeGroupIfNeed<'a, U, S, P> {
pub(crate) client_msg: &'a ClientMessage<U>,
pub(crate) groups: &'a Arc<CollabGroupCache<S, U>>,
pub(crate) edit_collab_by_user: &'a Arc<Mutex<HashMap<U, HashSet<Editing>>>>,
pub(crate) client_stream_by_user: &'a Arc<RwLock<HashMap<U, CollabClientStream>>>,
pub(crate) permission_service: &'a Arc<P>,
}
impl<'a, U, S> SubscribeGroupIfNeed<'a, U, S>
impl<'a, U, S, P> SubscribeGroupIfNeed<'a, U, S, P>
where
U: RealtimeUser,
S: CollabStorage,
P: CollabPermission,
{
pub(crate) fn run(
self,
) -> RetryIf<Take<FixedInterval>, SubscribeGroupIfNeed<'a, U, S>, SubscribeGroupCondition<U>> {
) -> RetryIf<Take<FixedInterval>, SubscribeGroupIfNeed<'a, U, S, P>, SubscribeGroupCondition<U>>
{
let weak_client_stream = Arc::downgrade(self.client_stream_by_user);
let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(5);
RetryIf::spawn(
@ -48,10 +52,11 @@ where
}
}
impl<'a, U, S> Action for SubscribeGroupIfNeed<'a, U, S>
impl<'a, U, S, P> Action for SubscribeGroupIfNeed<'a, U, S, P>
where
U: RealtimeUser,
S: CollabStorage,
P: CollabPermission,
{
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + 'a>>;
type Item = ();
@ -68,6 +73,24 @@ where
.origin
.client_user_id()
.ok_or(RealtimeError::UnexpectedData("The client user id is empty"))?;
// before create a group, check the user is allowed to create a group.
match self
.permission_service
.can_send_message(&uid, object_id)
.await
{
Ok(is_allowed) => {
if !is_allowed {
return Ok(());
}
},
Err(err) => {
warn!("fail to create group. error: {}", err);
return Ok(());
},
}
self
.groups
.create_group(
@ -112,38 +135,88 @@ where
None => warn!("The client stream is not found"),
Some(client_stream) => {
if let Some(collab_group) = self.groups.get_group(object_id).await {
collab_group
if let Entry::Vacant(entry) = collab_group
.subscribers
.write()
.await
.entry(self.client_msg.user.clone())
.or_insert_with(|| {
trace!(
"[💭Server]: {} subscribe group:{}",
self.client_msg.user,
self.client_msg.content.object_id()
);
{
trace!(
"[💭Server]: {} subscribe group:{}",
self.client_msg.user,
self.client_msg.content.object_id()
);
self
.edit_collab_by_user
.lock()
.entry(self.client_msg.user.clone())
.or_default()
.insert(Editing {
object_id: object_id.to_string(),
origin: origin.clone(),
});
self
.edit_collab_by_user
.lock()
.entry(self.client_msg.user.clone())
.or_default()
.insert(Editing {
object_id: object_id.to_string(),
origin: origin.clone(),
});
let (sink, stream) = client_stream
.client_channel::<CollabMessage, _>(object_id, move |object_id, msg| {
msg.object_id() == object_id
let sink_permission_service = self.permission_service.clone();
let stream_permission_service = self.permission_service.clone();
let (sink, stream) = client_stream.client_channel::<CollabMessage, _, _>(
object_id,
move |object_id, msg| {
if msg.object_id() != object_id {
return Box::pin(future::ready(false));
}
let msg_uid = msg.uid();
let object_id = object_id.to_string();
let cloned_sink_permission_service = sink_permission_service.clone();
Box::pin(async move {
if let Some(uid) = msg_uid {
if let Err(err) = cloned_sink_permission_service
.can_receive_message(&uid, &object_id)
.await
{
trace!("client:{} fail to receive message. error: {}", uid, err);
return false;
}
}
true
})
.unwrap();
},
move |object_id, msg| {
if msg.object_id != object_id {
return Box::pin(future::ready(false));
}
let object_id = object_id.to_string();
let msg_uid = msg.uid;
let cloned_stream_permission_service = stream_permission_service.clone();
Box::pin(async move {
match msg_uid {
None => true,
Some(uid) => {
match cloned_stream_permission_service
.can_send_message(&uid, &object_id)
.await
{
Ok(is_allowed) => is_allowed,
Err(err) => {
trace!("client:{} fail to send message. error: {}", uid, err);
false
},
}
},
}
})
},
);
entry.insert(
collab_group
.broadcast
.subscribe(origin.clone(), sink, stream)
});
.subscribe(origin.clone(), sink, stream),
);
}
}
},
}

View file

@ -3,10 +3,11 @@ use crate::error::{RealtimeError, StreamError};
use anyhow::Result;
use actix::{Actor, Context, Handler, ResponseFuture};
use futures_util::future::BoxFuture;
use parking_lot::Mutex;
use realtime_entity::collab_msg::CollabMessage;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
@ -17,12 +18,13 @@ use tracing::{info, trace};
use crate::client::ClientWSSink;
use crate::collaborate::group::CollabGroupCache;
use crate::collaborate::permission::CollabPermission;
use crate::collaborate::retry::SubscribeGroupIfNeed;
use crate::util::channel_ext::UnboundedSenderSink;
use database::collab::CollabStorage;
#[derive(Clone)]
pub struct CollabServer<S, U> {
pub struct CollabServer<S, U, P> {
#[allow(dead_code)]
storage: S,
/// Keep track of all collab groups
@ -31,14 +33,16 @@ pub struct CollabServer<S, U> {
editing_collab_by_user: Arc<Mutex<HashMap<U, HashSet<Editing>>>>,
/// Keep track of all client streams
client_stream_by_user: Arc<RwLock<HashMap<U, CollabClientStream>>>,
permission_service: Arc<P>,
}
impl<S, U> CollabServer<S, U>
impl<S, U, P> CollabServer<S, U, P>
where
S: CollabStorage + Clone,
U: RealtimeUser,
P: CollabPermission,
{
pub fn new(storage: S) -> Result<Self, RealtimeError> {
pub fn new(storage: S, permission_service: Arc<P>) -> Result<Self, RealtimeError> {
let groups = Arc::new(CollabGroupCache::new(storage.clone()));
let edit_collab_by_user = Arc::new(Mutex::new(HashMap::new()));
Ok(Self {
@ -46,6 +50,7 @@ where
groups,
editing_collab_by_user: edit_collab_by_user,
client_stream_by_user: Default::default(),
permission_service,
})
}
}
@ -72,18 +77,20 @@ async fn remove_user<S, U>(
}
}
impl<S, U> Actor for CollabServer<S, U>
impl<S, U, P> Actor for CollabServer<S, U, P>
where
S: 'static + Unpin,
U: RealtimeUser + Unpin,
P: CollabPermission + Unpin,
{
type Context = Context<Self>;
}
impl<S, U> Handler<Connect<U>> for CollabServer<S, U>
impl<S, U, P> Handler<Connect<U>> for CollabServer<S, U, P>
where
U: RealtimeUser + Unpin,
S: CollabStorage + Unpin,
P: CollabPermission + Unpin,
{
type Result = ResponseFuture<Result<(), RealtimeError>>;
@ -112,10 +119,11 @@ where
}
}
impl<S, U> Handler<Disconnect<U>> for CollabServer<S, U>
impl<S, U, P> Handler<Disconnect<U>> for CollabServer<S, U, P>
where
U: RealtimeUser + Unpin,
S: CollabStorage + Unpin,
P: CollabPermission + Unpin,
{
type Result = ResponseFuture<Result<(), RealtimeError>>;
fn handle(&mut self, msg: Disconnect<U>, _: &mut Context<Self>) -> Self::Result {
@ -136,10 +144,11 @@ where
}
}
impl<S, U> Handler<ClientMessage<U>> for CollabServer<S, U>
impl<S, U, P> Handler<ClientMessage<U>> for CollabServer<S, U, P>
where
U: RealtimeUser + Unpin,
S: CollabStorage + Unpin,
P: CollabPermission + Unpin,
{
type Result = ResponseFuture<Result<(), RealtimeError>>;
@ -147,6 +156,7 @@ where
let client_stream_by_user = self.client_stream_by_user.clone();
let groups = self.groups.clone();
let edit_collab_by_user = self.editing_collab_by_user.clone();
let permission_service = self.permission_service.clone();
Box::pin(async move {
SubscribeGroupIfNeed {
@ -154,6 +164,7 @@ where
groups: &groups,
edit_collab_by_user: &edit_collab_by_user,
client_stream_by_user: &client_stream_by_user,
permission_service: &permission_service,
}
.run()
.await?;
@ -210,12 +221,13 @@ async fn remove_user_from_group<S, U>(
}
}
impl<S, U> actix::Supervised for CollabServer<S, U>
impl<S, U, P> actix::Supervised for CollabServer<S, U, P>
where
S: 'static + Unpin,
U: RealtimeUser + Unpin,
P: CollabPermission + Unpin,
{
fn restarting(&mut self, _ctx: &mut Context<CollabServer<S, U>>) {
fn restarting(&mut self, _ctx: &mut Context<CollabServer<S, U, P>>) {
tracing::warn!("restarting");
}
}
@ -230,7 +242,11 @@ impl TryFrom<RealtimeMessage> for CollabMessage {
pub struct CollabClientStream {
ws_sink: ClientWSSink,
/// Used to receive messages from the collab server
/// Used to receive messages from the collab server. The message will forward to the [CollabBroadcast] which
/// will broadcast the message to all connected clients.
///
/// The message flow:
/// ClientSession(websocket) -> [CollabServer] -> [CollabClientStream] -> [CollabBroadcast] 1->* websocket(client)
pub(crate) stream_tx: tokio::sync::broadcast::Sender<Result<RealtimeMessage, StreamError>>,
}
@ -246,18 +262,20 @@ impl CollabClientStream {
/// Returns a [UnboundedSenderSink] and a [ReceiverStream] for the object_id.
#[allow(clippy::type_complexity)]
pub fn client_channel<T, F1>(
pub fn client_channel<T, SinkFilter, StreamFilter>(
&mut self,
object_id: &str,
sink_filter: F1,
) -> Option<(
sink_filter: SinkFilter,
stream_filter: StreamFilter,
) -> (
UnboundedSenderSink<T>,
ReceiverStream<Result<T, StreamError>>,
)>
)
where
T:
TryFrom<RealtimeMessage, Error = StreamError> + Into<RealtimeMessage> + Send + Sync + 'static,
F1: Fn(&str, &T) -> bool + Send + Sync + 'static,
SinkFilter: Fn(&str, &T) -> BoxFuture<'static, bool> + Sync + Send + 'static,
StreamFilter: Fn(&str, &RealtimeMessage) -> BoxFuture<'static, bool> + Sync + Send + 'static,
{
let client_ws_sink = self.ws_sink.clone();
let mut stream_rx = BroadcastStream::new(self.stream_tx.subscribe());
@ -265,9 +283,10 @@ impl CollabClientStream {
// Send the message to the connected websocket client
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<T>();
tokio::spawn(async move {
tokio::task::spawn(async move {
while let Some(msg) = rx.recv().await {
if sink_filter(&cloned_object_id, &msg) {
let can_sink = sink_filter(&cloned_object_id, &msg).await;
if can_sink {
client_ws_sink.do_send(msg.into());
}
}
@ -280,7 +299,7 @@ impl CollabClientStream {
let (tx, rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
while let Some(Ok(Ok(msg))) = stream_rx.next().await {
if cloned_object_id == msg.object_id {
if stream_filter(&cloned_object_id, &msg).await {
let _ = tx.send(T::try_from(msg)).await;
}
}
@ -292,6 +311,6 @@ impl CollabClientStream {
//
// When receiving a message from the client_forward_stream, it will send the message to the broadcast
// group. The message will be broadcast to all connected clients.
Some((client_forward_sink, client_forward_stream))
(client_forward_sink, client_forward_stream)
}
}

View file

@ -13,9 +13,17 @@ use std::sync::Arc;
pub trait RealtimeUser:
Clone + Debug + Send + Sync + 'static + Display + Hash + Eq + PartialEq
{
fn uid(&self) -> i64;
}
impl<T> RealtimeUser for Arc<T> where T: RealtimeUser {}
impl<T> RealtimeUser for Arc<T>
where
T: RealtimeUser,
{
fn uid(&self) -> i64 {
self.as_ref().uid()
}
}
#[derive(Debug, Message, Clone)]
#[rtype(result = "Result<(), RealtimeError>")]
@ -48,6 +56,7 @@ pub struct ClientMessage<U> {
#[rtype(result = "()")]
pub struct RealtimeMessage {
pub business_id: BusinessID,
pub uid: Option<i64>,
pub object_id: String,
pub payload: Bytes,
}
@ -69,6 +78,7 @@ impl From<CollabMessage> for RealtimeMessage {
fn from(msg: CollabMessage) -> Self {
Self {
business_id: BusinessID::CollabId,
uid: msg.uid(),
object_id: msg.object_id().to_string(),
payload: Bytes::from(msg.to_vec()),
}
@ -82,6 +92,7 @@ where
fn from(client_msg: ClientMessage<U>) -> Self {
Self {
business_id: client_msg.business_id,
uid: Some(client_msg.user.uid()),
object_id: client_msg.content.object_id().to_string(),
payload: Bytes::from(client_msg.content.to_vec()),
}

View file

@ -30,6 +30,12 @@ pub enum RealtimeError {
#[error(transparent)]
StorageError(#[from] DatabaseError),
#[error("Received message from client:{0}, but the client does not have sufficient permissions to write")]
NotEnoughPermissionToWrite(i64),
#[error("Client:{0} does not have enough permission to read")]
NotEnoughPermissionToRead(i64),
#[error("Internal failure: {0}")]
Internal(#[from] anyhow::Error),
}

View file

@ -22,6 +22,10 @@ impl AppError {
message: message.into(),
}
}
pub fn is_record_not_found(&self) -> bool {
self.code == ErrorCode::RecordNotFound
}
}
impl Display for AppError {
@ -54,11 +58,13 @@ impl From<anyhow::Error> for AppError {
impl From<DatabaseError> for AppError {
fn from(value: DatabaseError) -> Self {
match &value {
DatabaseError::RecordNotFound => AppError::new(ErrorCode::RecordNotFound, value),
DatabaseError::UnexpectedData(_) => AppError::new(ErrorCode::InvalidRequestParams, value),
match value {
DatabaseError::RecordNotFound(msg) => AppError::new(ErrorCode::RecordNotFound, msg),
DatabaseError::UnexpectedData(msg) => {
AppError::new(ErrorCode::InvalidRequestParams, msg.to_string())
},
DatabaseError::NotEnoughPermissions(msg) => {
AppError::new(ErrorCode::NotEnoughPermissions, msg.clone())
AppError::new(ErrorCode::NotEnoughPermissions, msg)
},
DatabaseError::StorageSpaceNotEnough => {
AppError::new(ErrorCode::StorageSpaceNotEnough, value)

View file

@ -28,3 +28,18 @@ END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trigger_prevent_reset_encryption_sign BEFORE
UPDATE ON af_user FOR EACH ROW EXECUTE FUNCTION prevent_reset_encryption_sign_func();
-- Enable RLS on the af_user table
-- Policy for INSERT
ALTER TABLE af_user ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_user_insert_policy ON public.af_user FOR
INSERT TO anon,
authenticated WITH CHECK (true);
-- Policy for UPDATE
CREATE POLICY af_user_update_policy ON public.af_user FOR
UPDATE USING (true) WITH CHECK (true);
-- Policy for SELECT
CREATE POLICY af_user_select_policy ON public.af_user FOR
SELECT TO anon,
authenticated USING (true);
ALTER TABLE af_user FORCE ROW LEVEL SECURITY;

View file

@ -11,7 +11,7 @@ VALUES ('Owner'),
CREATE TABLE af_permissions (
id SERIAL PRIMARY KEY,
name VARCHAR(255) UNIQUE NOT NULL,
access_level INTEGER,
access_level INTEGER NOT NULL,
description TEXT
);
-- Insert default permissions

View file

@ -9,6 +9,13 @@ CREATE TABLE IF NOT EXISTS af_workspace (
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
workspace_name TEXT DEFAULT 'My Workspace'
);
-- Enable RLS on the af_workspace table
ALTER TABLE af_workspace ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_workspace_policy ON af_workspace FOR ALL TO anon,
authenticated USING (true);
ALTER TABLE af_workspace FORCE ROW LEVEL SECURITY;
-- This trigger is fired after an insert operation on the af_user table. It automatically creates a workspace
-- in the af_workspace table with the uid of the new user profile as the owner_uid
CREATE OR REPLACE FUNCTION create_af_workspace_func() RETURNS TRIGGER AS $$BEGIN
@ -19,6 +26,7 @@ END $$LANGUAGE plpgsql;
CREATE TRIGGER create_af_workspace_trigger
AFTER
INSERT ON af_user FOR EACH ROW EXECUTE FUNCTION create_af_workspace_func();
-- af_workspace_member contains all the members associated with a workspace and their roles.
CREATE TABLE IF NOT EXISTS af_workspace_member (
uid BIGINT NOT NULL,
@ -28,6 +36,41 @@ CREATE TABLE IF NOT EXISTS af_workspace_member (
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (uid, workspace_id)
);
-- Enable RLS on the af_workspace_member table
ALTER TABLE af_workspace_member ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_workspace_member_policy ON af_workspace_member FOR ALL TO anon,
authenticated USING (true);
ALTER TABLE af_workspace_member FORCE ROW LEVEL SECURITY;
-- Listener for af_workspace_member table
DROP TRIGGER IF EXISTS af_workspace_member_change_trigger ON af_workspace_member;
CREATE OR REPLACE FUNCTION notify_af_workspace_member_change() RETURNS trigger AS $$
DECLARE
payload TEXT;
BEGIN
payload := json_build_object(
'old', row_to_json(OLD),
'new', row_to_json(NEW),
'action_type', TG_OP
)::text;
PERFORM pg_notify('af_workspace_member_channel', payload);
-- Return the new row state for INSERT/UPDATE, and the old state for DELETE.
IF TG_OP = 'DELETE' THEN
RETURN OLD;
ELSE
RETURN NEW;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER af_workspace_member_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON af_workspace_member
FOR EACH ROW EXECUTE FUNCTION notify_af_workspace_member_change();
-- Index
CREATE UNIQUE INDEX idx_af_workspace_member ON af_workspace_member (uid, workspace_id, role_id);
-- This trigger is fired after an insert operation on the af_workspace table. It automatically creates a workspace
-- member in the af_workspace_member table. If the user is the owner of the workspace, they are given the role 'Owner'.

View file

@ -26,11 +26,38 @@ VALUES IN (5);
CREATE TABLE af_collab_member (
uid BIGINT REFERENCES af_user(uid) ON DELETE CASCADE,
oid TEXT NOT NULL ,
role_id INTEGER REFERENCES af_roles(id),
oid TEXT NOT NULL,
permission_id INTEGER REFERENCES af_permissions(id) NOT NULL,
PRIMARY KEY(uid, oid)
);
-- Listener
DROP TRIGGER IF EXISTS af_collab_member_change_trigger ON af_collab_member;
CREATE OR REPLACE FUNCTION notify_af_collab_member_change() RETURNS trigger AS $$
DECLARE
payload TEXT;
BEGIN
payload := json_build_object(
'old', row_to_json(OLD),
'new', row_to_json(NEW),
'action_type', TG_OP
)::text;
PERFORM pg_notify('af_collab_member_channel', payload);
-- Return the new row state for INSERT/UPDATE, and the old state for DELETE.
IF TG_OP = 'DELETE' THEN
RETURN OLD;
ELSE
RETURN NEW;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER af_collab_member_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON af_collab_member
FOR EACH ROW EXECUTE FUNCTION notify_af_collab_member_change();
-- collab snapshot. It will be used to store the snapshots of the collab.
CREATE TABLE IF NOT EXISTS af_collab_snapshot (
sid BIGSERIAL PRIMARY KEY,-- snapshot id
@ -43,5 +70,8 @@ CREATE TABLE IF NOT EXISTS af_collab_snapshot (
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL
);
CREATE INDEX idx_af_collab_snapshot_oid ON af_collab_snapshot(oid);
-- Enable RLS on the af_collab table
ALTER TABLE af_collab ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_collab_policy ON af_collab FOR ALL TO anon,
authenticated USING (true);
ALTER TABLE af_collab FORCE ROW LEVEL SECURITY;

View file

@ -1,28 +0,0 @@
-- Enable RLS on the af_collab table
ALTER TABLE af_collab ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_collab_policy ON af_collab FOR ALL TO anon,
authenticated USING (true);
-- Enable RLS on the af_user table
-- Policy for INSERT
ALTER TABLE af_user ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_user_insert_policy ON public.af_user FOR
INSERT TO anon,
authenticated WITH CHECK (true);
-- Policy for UPDATE
CREATE POLICY af_user_update_policy ON public.af_user FOR
UPDATE USING (true) WITH CHECK (true);
-- Policy for SELECT
CREATE POLICY af_user_select_policy ON public.af_user FOR
SELECT TO anon,
authenticated USING (true);
ALTER TABLE af_user FORCE ROW LEVEL SECURITY;
-- Enable RLS on the af_workspace_member table
ALTER TABLE af_workspace_member ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_workspace_member_policy ON af_workspace_member FOR ALL TO anon,
authenticated USING (true);
ALTER TABLE af_workspace_member FORCE ROW LEVEL SECURITY;
-- Enable RLS on the af_workspace table
ALTER TABLE af_workspace ENABLE ROW LEVEL SECURITY;
CREATE POLICY af_workspace_policy ON af_workspace FOR ALL TO anon,
authenticated USING (true);
ALTER TABLE af_workspace FORCE ROW LEVEL SECURITY;

View file

@ -15,6 +15,7 @@ use tracing_actix_web::RequestId;
use crate::biz;
use crate::component::storage_proxy::CollabStorageProxy;
use database::collab::CollabStorage;
use database::user::select_uid_from_uuid;
use database_entity::database_error::DatabaseError;
use shared_entity::app_error::AppError;
use shared_entity::error_code::ErrorCode;
@ -40,6 +41,17 @@ pub fn workspace_scope() -> Scope {
.route(web::put().to(update_collab_handler))
.route(web::delete().to(delete_collab_handler)),
)
.service(
web::resource("{workspace_id}/collab/{object_id}/member")
.route(web::post().to(create_collab_member_handler))
.route(web::get().to(get_collab_member_handler))
.route(web::put().to(update_collab_member_handler))
.route(web::delete().to(delete_collab_member_handler)),
)
.service(
web::resource("{workspace_id}/collab/{object_id}/member/list")
.route(web::get().to(get_collab_member_list_handler)),
)
.service(
web::resource("{workspace_id}/collab_list").route(web::get().to(batch_get_collab_handler)),
)
@ -131,19 +143,23 @@ async fn create_collab_handler(
Ok(Json(AppResponse::Ok()))
}
#[instrument(skip(storage, payload), err)]
#[instrument(skip(storage, payload, state), err)]
async fn get_collab_handler(
user_uuid: UserUuid,
required_id: RequestId,
payload: Json<QueryCollabParams>,
state: Data<AppState>,
storage: Data<Storage<CollabStorageProxy>>,
) -> Result<Json<AppResponse<RawData>>> {
let uid = select_uid_from_uuid(&state.pg_pool, &user_uuid)
.await
.map_err(AppError::from)?;
let data = storage
.collab_storage
.get_collab(payload.into_inner())
.get_collab(&uid, payload.into_inner())
.await
.map_err(|err| match &err {
DatabaseError::RecordNotFound => AppError::new(ErrorCode::RecordNotFound, err.to_string()),
.map_err(|err| match err {
DatabaseError::RecordNotFound(msg) => AppError::new(ErrorCode::RecordNotFound, msg),
_ => AppError::new(ErrorCode::DatabaseError, err.to_string()),
})?;
@ -151,17 +167,21 @@ async fn get_collab_handler(
Ok(Json(AppResponse::Ok().with_data(data)))
}
#[instrument(skip(storage, payload), err)]
#[instrument(skip(storage, payload, state), err)]
async fn batch_get_collab_handler(
user_uuid: UserUuid,
required_id: RequestId,
state: Data<AppState>,
payload: Json<BatchQueryCollabParams>,
storage: Data<Storage<CollabStorageProxy>>,
) -> Result<Json<AppResponse<BatchQueryCollabResult>>> {
let uid = select_uid_from_uuid(&state.pg_pool, &user_uuid)
.await
.map_err(AppError::from)?;
let result = BatchQueryCollabResult(
storage
.collab_storage
.batch_get_collab(payload.into_inner().0)
.batch_get_collab(&uid, payload.into_inner().0)
.await,
);
Ok(Json(AppResponse::Ok().with_data(result)))
@ -211,3 +231,57 @@ async fn retrieve_snapshots_handler(
.await?;
Ok(Json(AppResponse::Ok().with_data(data)))
}
#[instrument(skip(state, payload), err)]
async fn create_collab_member_handler(
required_id: RequestId,
payload: Json<InsertCollabMemberParams>,
state: Data<AppState>,
) -> Result<Json<AppResponse<()>>> {
biz::collab::ops::create_collab_member(&state.pg_pool, &payload.into_inner()).await?;
Ok(Json(AppResponse::Ok()))
}
#[instrument(skip(state, payload), err)]
async fn update_collab_member_handler(
user_uuid: UserUuid,
required_id: RequestId,
payload: Json<UpdateCollabMemberParams>,
state: Data<AppState>,
) -> Result<Json<AppResponse<()>>> {
biz::collab::ops::upsert_collab_member(&state.pg_pool, &user_uuid, &payload.into_inner()).await?;
Ok(Json(AppResponse::Ok()))
}
#[instrument(skip(state, payload), err)]
async fn get_collab_member_handler(
user_uuid: UserUuid,
required_id: RequestId,
payload: Json<CollabMemberIdentify>,
state: Data<AppState>,
) -> Result<Json<AppResponse<AFCollabMember>>> {
let member = biz::collab::ops::get_collab_member(&state.pg_pool, &payload.into_inner()).await?;
Ok(Json(AppResponse::Ok().with_data(member)))
}
#[instrument(skip(state, payload), err)]
async fn delete_collab_member_handler(
user_uuid: UserUuid,
required_id: RequestId,
payload: Json<CollabMemberIdentify>,
state: Data<AppState>,
) -> Result<Json<AppResponse<()>>> {
biz::collab::ops::delete_collab_member(&state.pg_pool, &payload.into_inner()).await?;
Ok(Json(AppResponse::Ok()))
}
#[instrument(skip(state, payload), err)]
async fn get_collab_member_list_handler(
user_uuid: UserUuid,
required_id: RequestId,
payload: Json<QueryCollabMembers>,
state: Data<AppState>,
) -> Result<Json<AppResponse<AFCollabMembers>>> {
let members =
biz::collab::ops::get_collab_member_list(&state.pg_pool, &payload.into_inner()).await?;
Ok(Json(AppResponse::Ok().with_data(AFCollabMembers(members))))
}

View file

@ -5,9 +5,12 @@ use actix_web::{get, web, HttpRequest, HttpResponse, Result, Scope};
use actix_web_actors::ws;
use std::sync::Arc;
use realtime::client::{ClientWSSession, RealtimeUserImpl};
use realtime::client::{ClientSession, RealtimeUserImpl};
use realtime::collaborate::CollabServer;
use crate::biz::collab::access_control::CollabPermissionImpl;
use database::user::select_uid_from_uuid;
use shared_entity::app_error::AppError;
use std::time::Duration;
use crate::component::auth::jwt::{authorization_from_token, UserUuid};
@ -26,14 +29,17 @@ pub async fn establish_ws_connection(
payload: Payload,
path: Path<(String, String)>,
state: Data<AppState>,
server: Data<Addr<CollabServer<CollabStorageProxy, Arc<RealtimeUserImpl>>>>,
server: Data<Addr<CollabServer<CollabStorageProxy, Arc<RealtimeUserImpl>, CollabPermissionImpl>>>,
) -> Result<HttpResponse> {
tracing::info!("receive ws connect: {:?}", request);
let (token, device_id) = path.into_inner();
let auth = authorization_from_token(token.as_str(), &state)?;
let user_uuid = UserUuid::from_auth(auth)?;
let realtime_user = Arc::new(RealtimeUserImpl::new(user_uuid.to_string(), device_id));
let client = ClientWSSession::new(
let uid = select_uid_from_uuid(&state.pg_pool, &user_uuid)
.await
.map_err(AppError::from)?;
let realtime_user = Arc::new(RealtimeUserImpl::new(uid, user_uuid.to_string(), device_id));
let client = ClientSession::new(
realtime_user,
server.get_ref().clone(),
Duration::from_secs(state.config.websocket.heartbeat_interval as u64),

View file

@ -26,7 +26,8 @@ use crate::api::file_storage::file_storage_scope;
use crate::api::user::user_scope;
use crate::api::workspace::workspace_scope;
use crate::api::ws::ws_scope;
use crate::biz::collab::access_control::CollabAccessControl;
use crate::biz::collab::access_control::{CollabAccessControl, CollabPermissionImpl};
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::access_control::WorkspaceOwnerAccessControl;
use crate::component::storage_proxy::CollabStorageProxy;
use crate::middleware::access_control_mw::WorkspaceAccessControl;
@ -81,13 +82,16 @@ pub async fn run(
.unwrap_or_else(Key::generate);
let storage = state.collab_storage.clone();
let collab_server = CollabServer::<_, Arc<RealtimeUserImpl>>::new(storage.collab_storage.clone())
.unwrap()
.start();
let collab_server = CollabServer::<_, Arc<RealtimeUserImpl>, _>::new(
storage.collab_storage.clone(),
state.collab_permission.clone(),
)
.unwrap()
.start();
let access_control = WorkspaceAccessControl::new(state.pg_pool.clone())
.with_acs(WorkspaceOwnerAccessControl)
.with_acs(CollabAccessControl);
.with_acs(CollabAccessControl(state.collab_permission.clone()));
let mut server = HttpServer::new(move || {
App::new()
@ -138,6 +142,15 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
let gotrue_client = get_gotrue_client(&config.gotrue).await?;
setup_admin_account(&gotrue_client, &pg_pool, &config.gotrue).await?;
let redis_client = get_redis_client(config.redis_uri.expose_secret()).await?;
// TODO(nathan): Maybe PgListeners shouldn't return error
let pg_listeners = Arc::new(PgListeners::new(&pg_pool).await?);
let collab_member_listener = pg_listeners.subscribe_collab_member_change();
let collab_permission = Arc::new(CollabPermissionImpl::new(
pg_pool.clone(),
collab_member_listener,
));
let collab_storage = init_storage(config, pg_pool.clone()).await?;
Ok(AppState {
@ -148,7 +161,9 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
gotrue_client,
redis_client,
collab_storage,
collab_permission,
bucket_storage,
pg_listeners,
})
}

View file

@ -1,33 +1,235 @@
use crate::biz::collab::ops::require_user_can_edit;
use crate::component::auth::jwt::UserUuid;
use crate::middleware::access_control_mw::{AccessControlService, AccessResource};
use crate::biz::collab::member_listener::{CollabMemberAction, CollabMemberChange};
use crate::middleware::access_control_mw::{AccessResource, HttpAccessControlService};
use async_trait::async_trait;
use database_entity::AFAccessLevel;
use realtime::collaborate::{CollabPermission, CollabUserId};
use shared_entity::app_error::AppError;
use shared_entity::error_code::ErrorCode;
use sqlx::PgPool;
use tracing::trace;
use actix_router::{Path, Url};
use actix_web::http::Method;
use database::user::select_uid_from_uuid;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{trace, warn};
use uuid::Uuid;
#[derive(Clone)]
pub struct CollabAccessControl;
pub struct CollabAccessControl<P: CollabPermission>(pub Arc<P>);
#[async_trait]
impl AccessControlService for CollabAccessControl {
impl<P> HttpAccessControlService for CollabAccessControl<P>
where
P: CollabPermission,
AppError: From<<P as CollabPermission>::Error>,
{
fn resource(&self) -> AccessResource {
AccessResource::Collab
}
async fn check_collab_permission(
&self,
workspace_id: &Uuid,
_workspace_id: &Uuid,
oid: &str,
user_uuid: &UserUuid,
pg_pool: &PgPool,
user_uuid: &Uuid,
_pg_pool: &PgPool,
method: Method,
_path: Path<Url>,
) -> Result<(), AppError> {
trace!(
"Collab access control: oid: {:?}, user_uuid: {:?}",
oid,
user_uuid
);
require_user_can_edit(pg_pool, workspace_id, user_uuid, oid).await
trace!("oid: {:?}, user_uuid: {:?}", oid, user_uuid);
let can_access = self
.0
.can_access_http_method(CollabUserId::UserUuid(user_uuid), oid, &method)
.await
.map_err(AppError::from)?;
if !can_access {
return Err(AppError::new(
ErrorCode::NotEnoughPermissions,
format!(
"Not enough permissions to access the collab: {} with http method: {}",
oid, method
),
));
}
Ok(())
}
}
type MemberStatusByUid = HashMap<i64, CollabMemberStatusByOid>;
type CollabMemberStatusByOid = HashMap<String, MemberStatus>;
/// Use to check if the user is allowed to send or receive the [CollabMessage]
pub struct CollabPermissionImpl {
pg_pool: PgPool,
member_status_by_uid: Arc<RwLock<MemberStatusByUid>>,
}
#[derive(Clone, Debug)]
enum MemberStatus {
Deleted,
Valid(AFAccessLevel),
}
impl CollabPermissionImpl {
pub fn new(pg_pool: PgPool, mut listener: broadcast::Receiver<CollabMemberChange>) -> Self {
let member_status_by_uid = Arc::new(RwLock::new(HashMap::new()));
// Update the role of the user when the role of the collab member is changed
let cloned_pg_pool = pg_pool.clone();
let cloned_member_status_by_uid = member_status_by_uid.clone();
tokio::spawn(async move {
while let Ok(change) = listener.recv().await {
match change.action_type {
CollabMemberAction::INSERT | CollabMemberAction::UPDATE => {
if let (Some(oid), Some(uid)) = (change.new_oid(), change.new_uid()) {
let _ =
refresh_from_db(uid, oid, &cloned_pg_pool, &cloned_member_status_by_uid).await;
} else {
warn!("The oid or uid is None")
}
},
CollabMemberAction::DELETE => {
if let (Some(oid), Some(uid)) = (change.old_oid(), change.old_uid()) {
if let Some(inner_map) = cloned_member_status_by_uid.write().await.get_mut(uid) {
inner_map.insert(oid.to_string(), MemberStatus::Deleted);
}
} else {
warn!("The oid or uid is None")
}
},
}
}
});
Self {
pg_pool,
member_status_by_uid,
}
}
/// Return the role of the user in the collab
async fn get_role_state(&self, uid: &i64, oid: &str) -> Option<MemberStatus> {
self
.member_status_by_uid
.read()
.await
.get(uid)
.map(|map| map.get(oid).cloned())?
}
#[inline]
async fn get_user_collab_access_level(
&self,
uid: &i64,
oid: &str,
) -> Result<AFAccessLevel, AppError> {
let member_status = match self.get_role_state(uid, oid).await {
None => refresh_from_db(uid, oid, &self.pg_pool, &self.member_status_by_uid).await?,
Some(status) => status,
};
match member_status {
MemberStatus::Deleted => Err(AppError::new(
ErrorCode::NotEnoughPermissions,
"The user is not the member of the collab",
)),
MemberStatus::Valid(access_level) => Ok(access_level),
}
}
}
#[inline]
async fn refresh_from_db(
uid: &i64,
oid: &str,
pg_pool: &PgPool,
member_status_by_uid: &Arc<RwLock<MemberStatusByUid>>,
) -> Result<MemberStatus, AppError> {
let member = database::collab::select_collab_member(uid, oid, pg_pool).await?;
let mut outer_map = member_status_by_uid.write().await;
let inner_map = outer_map.entry(*uid).or_insert_with(HashMap::new);
inner_map.insert(
member.oid,
MemberStatus::Valid(member.permission.access_level.clone()),
);
Ok(MemberStatus::Valid(member.permission.access_level))
}
#[async_trait]
impl CollabPermission for CollabPermissionImpl {
type Error = AppError;
async fn get_access_level(
&self,
user: CollabUserId<'_>,
oid: &str,
) -> Result<Option<AFAccessLevel>, Self::Error> {
let result = match user {
CollabUserId::UserId(uid) => self.get_user_collab_access_level(uid, oid).await,
CollabUserId::UserUuid(uuid) => {
let uid = select_uid_from_uuid(&self.pg_pool, uuid).await?;
self.get_user_collab_access_level(&uid, oid).await
},
};
match result {
Ok(level) => Ok(Some(level)),
Err(err) => {
if err.is_record_not_found() {
Ok(None)
} else {
Err(err)
}
},
}
}
async fn can_access_http_method(
&self,
user: CollabUserId<'_>,
oid: &str,
method: &Method,
) -> Result<bool, Self::Error> {
let level = self.get_access_level(user, oid).await?;
trace!("access level: {:?}", level);
match level {
None => Ok(true),
Some(level) => {
if Method::POST == method || Method::PUT == method || Method::DELETE == method {
Ok(level.can_write())
} else {
Ok(true)
}
},
}
}
#[inline]
async fn can_send_message(&self, uid: &i64, oid: &str) -> Result<bool, Self::Error> {
match self
.get_access_level(CollabUserId::UserId(uid), oid)
.await?
{
None => Ok(true),
Some(level) => match level {
AFAccessLevel::ReadOnly | AFAccessLevel::ReadAndComment => Ok(false),
AFAccessLevel::ReadAndWrite | AFAccessLevel::FullAccess => Ok(true),
},
}
}
#[inline]
async fn can_receive_message(&self, uid: &i64, oid: &str) -> Result<bool, Self::Error> {
Ok(
self
.get_access_level(CollabUserId::UserId(uid), oid)
.await
.is_ok(),
)
}
}

View file

@ -0,0 +1,39 @@
use crate::biz::pg_listener::PostgresDBListener;
use database_entity::AFCollabMemberRow;
use serde::Deserialize;
#[allow(clippy::upper_case_acronyms)]
#[derive(Deserialize, Clone, Debug)]
pub enum CollabMemberAction {
INSERT,
UPDATE,
DELETE,
}
#[derive(Deserialize, Debug, Clone)]
pub struct CollabMemberChange {
/// The old will be None if the row does not exist before
pub old: Option<AFCollabMemberRow>,
/// The new will be None if the row is deleted
pub new: Option<AFCollabMemberRow>,
/// Represent the action of the database. Such as INSERT, UPDATE, DELETE
pub action_type: CollabMemberAction,
}
impl CollabMemberChange {
pub fn old_uid(&self) -> Option<&i64> {
self.old.as_ref().map(|o| &o.uid)
}
pub fn old_oid(&self) -> Option<&str> {
self.old.as_ref().map(|o| o.oid.as_str())
}
pub fn new_uid(&self) -> Option<&i64> {
self.new.as_ref().map(|n| &n.uid)
}
pub fn new_oid(&self) -> Option<&str> {
self.new.as_ref().map(|n| n.oid.as_str())
}
}
pub type CollabMemberListener = PostgresDBListener<CollabMemberChange>;

View file

@ -1,2 +1,3 @@
pub mod access_control;
pub mod member_listener;
pub mod ops;

View file

@ -1,8 +1,9 @@
use database::user;
use database::workspace::select_user_can_edit_collab;
use database_entity::{
AFCollabSnapshots, DeleteCollabParams, InsertCollabParams, QueryObjectSnapshotParams,
QuerySnapshotParams,
AFCollabMember, AFCollabSnapshots, CollabMemberIdentify, DeleteCollabParams,
InsertCollabMemberParams, InsertCollabParams, QueryCollabMembers, QueryObjectSnapshotParams,
QuerySnapshotParams, UpdateCollabMemberParams,
};
use shared_entity::{app_error::AppError, error_code::ErrorCode};
use sqlx::{types::Uuid, PgPool};
@ -27,9 +28,9 @@ pub async fn upsert_collab(
) -> Result<(), AppError> {
params.validate()?;
let owner_uid = user::uid_from_uuid(pg_pool, user_uuid).await?;
let owner_uid = user::select_uid_from_uuid(pg_pool, user_uuid).await?;
let mut tx = pg_pool.begin().await?;
database::collab::insert_af_collab(&mut tx, owner_uid, params).await?;
database::collab::insert_af_collab(&mut tx, &owner_uid, params).await?;
tx.commit().await?;
Ok(())
}
@ -61,14 +62,68 @@ pub async fn delete_collab(
Ok(())
}
pub async fn require_user_can_edit(
/// Create a new collab member
/// If the collab member already exists, return [ErrorCode::RecordAlreadyExists]
/// If the collab member does not exist, create a new one
pub async fn create_collab_member(
pg_pool: &PgPool,
workspace_id: &Uuid,
user_uuid: &Uuid,
oid: &str,
params: &InsertCollabMemberParams,
) -> Result<(), AppError> {
match select_user_can_edit_collab(pg_pool, user_uuid, workspace_id, oid).await? {
true => Ok(()),
false => Err(ErrorCode::NotEnoughPermissions.into()),
params.validate()?;
if database::collab::is_collab_member_exists(params.uid, &params.object_id, pg_pool).await? {
return Err(ErrorCode::RecordAlreadyExists.into());
}
database::collab::insert_collab_member(
params.uid,
&params.object_id,
&params.access_level,
pg_pool,
)
.await?;
Ok(())
}
pub async fn upsert_collab_member(
pg_pool: &PgPool,
_user_uuid: &Uuid,
params: &UpdateCollabMemberParams,
) -> Result<(), AppError> {
params.validate()?;
database::collab::insert_collab_member(
params.uid,
&params.object_id,
&params.access_level,
pg_pool,
)
.await?;
Ok(())
}
pub async fn get_collab_member(
pg_pool: &PgPool,
params: &CollabMemberIdentify,
) -> Result<AFCollabMember, AppError> {
params.validate()?;
let collab_member =
database::collab::select_collab_member(&params.uid, &params.object_id, pg_pool).await?;
Ok(collab_member)
}
pub async fn delete_collab_member(
pg_pool: &PgPool,
params: &CollabMemberIdentify,
) -> Result<(), AppError> {
params.validate()?;
database::collab::delete_collab_member(params.uid, &params.object_id, pg_pool).await?;
Ok(())
}
pub async fn get_collab_member_list(
pg_pool: &PgPool,
params: &QueryCollabMembers,
) -> Result<Vec<AFCollabMember>, AppError> {
params.validate()?;
let collab_member = database::collab::select_collab_members(&params.object_id, pg_pool).await?;
Ok(collab_member)
}

View file

@ -1,4 +1,5 @@
pub mod collab;
pub mod pg_listener;
pub mod user;
pub mod utils;
pub(crate) mod workspace;

70
src/biz/pg_listener.rs Normal file
View file

@ -0,0 +1,70 @@
use crate::biz::collab::member_listener::{CollabMemberChange, CollabMemberListener};
use crate::biz::workspace::member_listener::{WorkspaceMemberChange, WorkspaceMemberListener};
use anyhow::Error;
use serde::de::DeserializeOwned;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use tokio::sync::broadcast;
use tracing::error;
pub struct PgListeners {
workspace_member_listener: WorkspaceMemberListener,
collab_member_listener: CollabMemberListener,
}
impl PgListeners {
pub async fn new(pg_pool: &PgPool) -> Result<Self, Error> {
let workspace_member_listener =
WorkspaceMemberListener::new(pg_pool, "af_workspace_member_channel").await?;
let collab_member_listener =
CollabMemberListener::new(pg_pool, "af_collab_member_channel").await?;
Ok(Self {
workspace_member_listener,
collab_member_listener,
})
}
pub fn subscribe_workspace_member_change(&self) -> broadcast::Receiver<WorkspaceMemberChange> {
self.workspace_member_listener.notify.subscribe()
}
pub fn subscribe_collab_member_change(&self) -> broadcast::Receiver<CollabMemberChange> {
self.collab_member_listener.notify.subscribe()
}
}
pub struct PostgresDBListener<T: Clone> {
notify: broadcast::Sender<T>,
}
impl<T> PostgresDBListener<T>
where
T: Clone + DeserializeOwned + Send + 'static,
{
pub async fn new(pg_pool: &PgPool, channel: &str) -> Result<Self, Error> {
let mut listener = PgListener::connect_with(pg_pool).await?;
listener.listen(channel).await?;
let (tx, _) = broadcast::channel(1000);
let notify = tx.clone();
tokio::spawn(async move {
while let Ok(notification) = listener.recv().await {
match serde_json::from_str::<T>(notification.payload()) {
Ok(change) => {
let _ = tx.send(change);
},
Err(err) => {
error!(
"Failed to deserialize change: {:?}, payload: {}",
err,
notification.payload()
);
},
}
}
});
Ok(Self { notify })
}
}

View file

@ -1,6 +1,6 @@
use crate::biz::workspace::ops::require_user_is_workspace_owner;
use crate::component::auth::jwt::UserUuid;
use crate::middleware::access_control_mw::{AccessControlService, AccessResource};
use crate::middleware::access_control_mw::{AccessResource, HttpAccessControlService};
use async_trait::async_trait;
use shared_entity::app_error::AppError;
use sqlx::PgPool;
@ -9,9 +9,8 @@ use uuid::Uuid;
#[derive(Clone)]
pub struct WorkspaceOwnerAccessControl;
#[async_trait]
impl AccessControlService for WorkspaceOwnerAccessControl {
impl HttpAccessControlService for WorkspaceOwnerAccessControl {
fn resource(&self) -> AccessResource {
AccessResource::Workspace
}
@ -23,7 +22,7 @@ impl AccessControlService for WorkspaceOwnerAccessControl {
pg_pool: &PgPool,
) -> Result<(), AppError> {
trace!(
"workspace access control: workspace_id: {:?}, user_uuid: {:?}",
"workspace_id: {:?}, user_uuid: {:?}",
workspace_id,
user_uuid
);

View file

@ -0,0 +1,27 @@
use crate::biz::pg_listener::PostgresDBListener;
use collab::preclude::Uuid;
use serde::Deserialize;
#[allow(clippy::upper_case_acronyms)]
#[derive(Deserialize, Clone, Debug)]
pub enum WorkspaceMemberAction {
INSERT,
UPDATE,
DELETE,
}
#[derive(Deserialize, Debug, Clone)]
pub struct WorkspaceMemberChange {
pub old: Option<WorkspaceMemberRow>,
pub new: Option<WorkspaceMemberRow>,
pub action_type: WorkspaceMemberAction,
}
#[derive(Deserialize, Debug, Clone)]
pub struct WorkspaceMemberRow {
pub uid: i64,
pub role_id: i64,
pub workspace_id: Uuid,
}
pub type WorkspaceMemberListener = PostgresDBListener<WorkspaceMemberChange>;

View file

@ -1,2 +1,3 @@
pub mod access_control;
pub mod member_listener;
pub mod ops;

View file

@ -1,6 +1,6 @@
use async_trait::async_trait;
use collab::core::collab::MutexCollab;
use database::collab::{CollabPostgresDBStorageImpl, CollabStorage, StorageConfig};
use database::collab::{CollabPostgresDBStorageImpl, CollabStorage, Result, StorageConfig};
use database_entity::{
AFCollabSnapshots, BatchQueryCollab, InsertCollabParams, InsertSnapshotParams, QueryCollabParams,
QueryCollabResult, QueryObjectSnapshotParams, QuerySnapshotParams, RawData,
@ -48,15 +48,11 @@ impl CollabStorage for CollabStorageProxy {
.insert(object_id.to_string(), collab);
}
async fn insert_collab(
&self,
owner_uid: i64,
params: InsertCollabParams,
) -> database::collab::Result<()> {
self.inner.insert_collab(owner_uid, params).await
async fn insert_collab(&self, uid: &i64, params: InsertCollabParams) -> Result<()> {
self.inner.insert_collab(uid, params).await
}
async fn get_collab(&self, params: QueryCollabParams) -> database::collab::Result<RawData> {
async fn get_collab(&self, uid: &i64, params: QueryCollabParams) -> Result<RawData> {
let collab = self
.collab_by_object_id
.read()
@ -65,7 +61,7 @@ impl CollabStorage for CollabStorageProxy {
.and_then(|collab| collab.upgrade());
match collab {
None => self.inner.get_collab(params).await,
None => self.inner.get_collab(uid, params).await,
Some(collab) => {
info!("Get collab data:{} from memory", params.object_id);
let data = collab.encode_as_update_v1().0;
@ -76,6 +72,7 @@ impl CollabStorage for CollabStorageProxy {
async fn batch_get_collab(
&self,
uid: &i64,
queries: Vec<BatchQueryCollab>,
) -> HashMap<String, QueryCollabResult> {
let (valid_queries, mut results): (Vec<_>, HashMap<_, _>) =
@ -109,12 +106,12 @@ impl CollabStorage for CollabStorageProxy {
});
results.extend(results_from_memory);
results.extend(self.inner.batch_get_collab(queries).await);
results.extend(self.inner.batch_get_collab(uid, queries).await);
results
}
async fn delete_collab(&self, object_id: &str) -> database::collab::Result<()> {
self.inner.delete_collab(object_id).await
async fn delete_collab(&self, uid: &i64, object_id: &str) -> Result<()> {
self.inner.delete_collab(uid, object_id).await
}
async fn create_snapshot(&self, params: InsertSnapshotParams) -> database::collab::Result<()> {

View file

@ -1,8 +1,10 @@
use crate::component::auth::jwt::UserUuid;
use crate::api::workspace::{COLLAB_OBJECT_ID_PATH, WORKSPACE_ID_PATH};
use actix_router::{Path, Url};
use actix_service::{forward_ready, Service, Transform};
use actix_web::dev::{ResourceDef, ServiceRequest, ServiceResponse};
use actix_web::http::Method;
use actix_web::Error;
use async_trait::async_trait;
use futures_util::future::LocalBoxFuture;
@ -23,33 +25,37 @@ pub enum AccessResource {
}
#[async_trait]
pub trait AccessControlService: Send + Sync {
pub trait HttpAccessControlService: Send + Sync {
fn resource(&self) -> AccessResource;
#[allow(unused_variables)]
async fn check_workspace_permission(
&self,
_workspace_id: &Uuid,
_user_uuid: &UserUuid,
_pg_pool: &PgPool,
workspace_id: &Uuid,
user_uuid: &UserUuid,
pg_pool: &PgPool,
) -> Result<(), AppError> {
Ok(())
}
#[allow(unused_variables)]
async fn check_collab_permission(
&self,
_workspace_id: &Uuid,
_oid: &str,
_user_uuid: &UserUuid,
_pg_pool: &PgPool,
workspace_id: &Uuid,
oid: &str,
user_uuid: &Uuid,
pg_pool: &PgPool,
method: Method,
path: Path<Url>,
) -> Result<(), AppError> {
Ok(())
}
}
#[async_trait]
impl<T> AccessControlService for Arc<T>
impl<T> HttpAccessControlService for Arc<T>
where
T: AccessControlService,
T: HttpAccessControlService,
{
fn resource(&self) -> AccessResource {
self.as_ref().resource()
@ -69,19 +75,21 @@ where
async fn check_collab_permission(
&self,
_workspace_id: &Uuid,
_oid: &str,
_user_uuid: &UserUuid,
_pg_pool: &PgPool,
workspace_id: &Uuid,
oid: &str,
user_uuid: &Uuid,
pg_pool: &PgPool,
method: Method,
path: Path<Url>,
) -> Result<(), AppError> {
self
.as_ref()
.check_collab_permission(_workspace_id, _oid, _user_uuid, _pg_pool)
.check_collab_permission(workspace_id, oid, user_uuid, pg_pool, method, path)
.await
}
}
pub type AccessControlServices = Arc<HashMap<AccessResource, Arc<dyn AccessControlService>>>;
pub type AccessControlServices = Arc<HashMap<AccessResource, Arc<dyn HttpAccessControlService>>>;
#[derive(Clone)]
pub struct WorkspaceAccessControl {
@ -97,7 +105,10 @@ impl WorkspaceAccessControl {
}
}
pub fn with_acs<T: AccessControlService + 'static>(mut self, access_control_service: T) -> Self {
pub fn with_acs<T: HttpAccessControlService + 'static>(
mut self,
access_control_service: T,
) -> Self {
let resource = access_control_service.resource();
Arc::make_mut(&mut self.access_control_services)
.insert(resource, Arc::new(access_control_service));
@ -177,6 +188,7 @@ where
.and_then(|id| Uuid::parse_str(id).ok());
let collab_object_id = path.get(COLLAB_OBJECT_ID_PATH).map(|id| id.to_string());
let method = req.method().clone();
let fut = self.service.call(req);
let pg_pool = self.pg_pool.clone();
let services = self.access_control_service.clone();
@ -208,6 +220,8 @@ where
&collab_object_id,
&user_uuid,
&pg_pool,
method,
path,
)
.await
{

View file

@ -8,6 +8,9 @@ use sqlx::PgPool;
use std::collections::BTreeMap;
use std::sync::Arc;
use crate::biz::pg_listener::PgListeners;
use crate::biz::collab::access_control::CollabPermissionImpl;
use database::file::bucket_s3_impl::S3BucketStorage;
use tokio::sync::RwLock;
@ -20,7 +23,9 @@ pub struct AppState {
pub gotrue_client: gotrue::api::Client,
pub redis_client: redis::aio::ConnectionManager,
pub collab_storage: Storage<CollabStorageProxy>,
pub collab_permission: Arc<CollabPermissionImpl>,
pub bucket_storage: Arc<S3BucketStorage>,
pub pg_listeners: Arc<PgListeners>,
}
impl AppState {

179
tests/collab/member_test.rs Normal file
View file

@ -0,0 +1,179 @@
use crate::collab::workspace_id_from_client;
use crate::user::utils::generate_unique_registered_user_client;
use collab_entity::CollabType;
use database_entity::{
AFAccessLevel, CollabMemberIdentify, InsertCollabMemberParams, InsertCollabParams,
QueryCollabMembers, UpdateCollabMemberParams,
};
use uuid::Uuid;
#[tokio::test]
async fn collab_owner_permission_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let raw_data = "hello world".to_string().as_bytes().to_vec();
let workspace_id = workspace_id_from_client(&c).await;
let object_id = Uuid::new_v4().to_string();
let uid = c.get_profile().await.unwrap().uid.unwrap();
c.create_collab(InsertCollabParams::new(
&object_id,
CollabType::Document,
raw_data.clone(),
workspace_id.clone(),
))
.await
.unwrap();
let member = c
.get_collab_member(CollabMemberIdentify {
uid,
object_id,
workspace_id,
})
.await
.unwrap();
assert_eq!(member.permission.access_level, AFAccessLevel::FullAccess);
}
#[tokio::test]
async fn update_collab_member_permission_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let raw_data = "hello world".to_string().as_bytes().to_vec();
let workspace_id = workspace_id_from_client(&c).await;
let object_id = Uuid::new_v4().to_string();
let uid = c.get_profile().await.unwrap().uid.unwrap();
c.create_collab(InsertCollabParams::new(
&object_id,
CollabType::Document,
raw_data.clone(),
workspace_id.clone(),
))
.await
.unwrap();
c.update_collab_member(UpdateCollabMemberParams {
uid,
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
access_level: AFAccessLevel::ReadOnly,
})
.await
.unwrap();
let member = c
.get_collab_member(CollabMemberIdentify {
uid,
object_id,
workspace_id,
})
.await
.unwrap();
assert_eq!(member.permission.access_level, AFAccessLevel::ReadOnly);
}
#[tokio::test]
async fn add_collab_member_test() {
let (c_1, _user) = generate_unique_registered_user_client().await;
let workspace_id = workspace_id_from_client(&c_1).await;
let object_id = Uuid::new_v4().to_string();
c_1
.create_collab(InsertCollabParams::new(
&object_id,
CollabType::Document,
vec![0; 10],
workspace_id.clone(),
))
.await
.unwrap();
// create new client
let (c_2, _user) = generate_unique_registered_user_client().await;
let uid_2 = c_2.get_profile().await.unwrap().uid.unwrap();
// add new member
c_1
.add_collab_member(InsertCollabMemberParams {
uid: uid_2,
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
access_level: AFAccessLevel::ReadAndComment,
})
.await
.unwrap();
// check the member is added and its permission is correct
let member = c_1
.get_collab_member(CollabMemberIdentify {
uid: uid_2,
object_id,
workspace_id,
})
.await
.unwrap();
assert_eq!(
member.permission.access_level,
AFAccessLevel::ReadAndComment
);
}
#[tokio::test]
async fn add_collab_member_then_remove_test() {
let (c_1, _user) = generate_unique_registered_user_client().await;
let workspace_id = workspace_id_from_client(&c_1).await;
let object_id = Uuid::new_v4().to_string();
c_1
.create_collab(InsertCollabParams::new(
&object_id,
CollabType::Document,
vec![0; 10],
workspace_id.clone(),
))
.await
.unwrap();
// Create new client
let (c_2, _user) = generate_unique_registered_user_client().await;
let uid_2 = c_2.get_profile().await.unwrap().uid.unwrap();
// Add new member
c_1
.add_collab_member(InsertCollabMemberParams {
uid: uid_2,
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
access_level: AFAccessLevel::ReadAndComment,
})
.await
.unwrap();
let members = c_1
.get_collab_members(QueryCollabMembers {
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
})
.await
.unwrap()
.0;
assert_eq!(members.len(), 2);
// Delete the member
c_1
.remove_collab_member(CollabMemberIdentify {
uid: uid_2,
object_id: object_id.clone(),
workspace_id: workspace_id.clone(),
})
.await
.unwrap();
let members = c_1
.get_collab_members(QueryCollabMembers {
workspace_id,
object_id,
})
.await
.unwrap()
.0;
assert_eq!(members.len(), 1);
}

View file

@ -1,9 +1,10 @@
use client_api::Client;
mod member_test;
mod storage_test;
pub(crate) async fn workspace_id_from_client(c: &Client) -> String {
c.workspaces()
c.get_workspaces()
.await
.unwrap()
.first()

View file

@ -152,6 +152,6 @@ async fn admin_generate_link_and_user_sign_in() {
.unwrap();
assert!(is_new);
let workspaces = client.workspaces().await.unwrap();
let workspaces = client.get_workspaces().await.unwrap();
assert_eq!(workspaces.len(), 1);
}

View file

@ -0,0 +1 @@

View file

@ -1,4 +1,5 @@
mod connect_test;
mod edit_collab_test;
mod offline_edit_collab_test;
mod collab_permisson;
mod edit_collab;
mod edit_offline_collab;
mod test_client;
mod ws_connect;

View file

@ -103,7 +103,7 @@ impl TestClient {
pub(crate) async fn current_workspace_id(&self) -> String {
self
.api_client
.workspaces()
.get_workspaces()
.await
.unwrap()
.first()
@ -119,7 +119,7 @@ impl TestClient {
object_id: &str,
collab_type: CollabType,
) {
let uid = self.api_client.profile().await.unwrap().uid.unwrap();
let uid = self.api_client.get_profile().await.unwrap().uid.unwrap();
// Subscribe to object
let handler = self

View file

@ -25,7 +25,7 @@ async fn refresh_trigger() {
.as_secs() as i64;
// querying that requires auth should trigger a refresh
let _workspaces = c.workspaces().await.unwrap();
let _workspaces = c.get_workspaces().await.unwrap();
let new_token = c.access_token().unwrap();
assert_ne!(old_access_token, new_token);

View file

@ -64,9 +64,9 @@ async fn sign_in_success() {
.confirmed_at
.is_some());
let workspaces = c.workspaces().await.unwrap();
let workspaces = c.get_workspaces().await.unwrap();
assert_eq!(workspaces.0.len(), 1);
let profile = c.profile().await.unwrap();
let profile = c.get_profile().await.unwrap();
let latest_workspace = workspaces.get_latest(&profile);
assert!(latest_workspace.is_some());
}
@ -81,7 +81,7 @@ async fn sign_in_success() {
assert!(!is_new);
// workspaces should be the same
let workspaces = c.workspaces().await.unwrap();
let workspaces = c.get_workspaces().await.unwrap();
assert_eq!(workspaces.0.len(), 1);
}
}

View file

@ -9,7 +9,13 @@ use crate::user::utils::generate_unique_registered_user_client;
async fn add_workspace_members_not_enough_permission() {
let (c1, user1) = generate_unique_registered_user_client().await;
let (c2, _user2) = generate_unique_registered_user_client().await;
let workspace_id_2 = c2.workspaces().await.unwrap().first().unwrap().workspace_id;
let workspace_id_2 = c2
.get_workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id;
// attempt to add user2 to user1's workspace
let err = c1
@ -31,7 +37,7 @@ async fn add_workspace_members_then_delete() {
let (c2, _user2) = generate_unique_registered_user_client().await;
let c2_email = c2.token().read().as_ref().unwrap().user.email.clone();
let c1_workspace = c1.workspaces().await.unwrap();
let c1_workspace = c1.get_workspaces().await.unwrap();
let c1_workspace_id = c1_workspace.first().unwrap().workspace_id;
let email = c2.token().read().as_ref().unwrap().user.email.to_owned();
@ -77,7 +83,13 @@ async fn workspace_member_add_new_member() {
let (c2, user2) = generate_unique_registered_user_client().await;
let (_c3, user3) = generate_unique_registered_user_client().await;
let workspace_id = c1.workspaces().await.unwrap().first().unwrap().workspace_id;
let workspace_id = c1
.get_workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id;
c1.add_workspace_members(
workspace_id,
@ -108,7 +120,13 @@ async fn workspace_owner_add_new_owner() {
let (c1, user1) = generate_unique_registered_user_client().await;
let (_c2, user2) = generate_unique_registered_user_client().await;
let workspace_id = c1.workspaces().await.unwrap().first().unwrap().workspace_id;
let workspace_id = c1
.get_workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id;
c1.add_workspace_members(
workspace_id,
vec![CreateWorkspaceMember {
@ -133,7 +151,13 @@ async fn workspace_second_owner_add_new_member() {
let (c2, user2) = generate_unique_registered_user_client().await;
let (_c3, user3) = generate_unique_registered_user_client().await;
let workspace_id = c1.workspaces().await.unwrap().first().unwrap().workspace_id;
let workspace_id = c1
.get_workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id;
c1.add_workspace_members(
workspace_id,
vec![CreateWorkspaceMember {
@ -170,7 +194,13 @@ async fn workspace_second_owner_can_not_delete_origin_owner() {
let (c1, user1) = generate_unique_registered_user_client().await;
let (c2, user2) = generate_unique_registered_user_client().await;
let workspace_id = c1.workspaces().await.unwrap().first().unwrap().workspace_id;
let workspace_id = c1
.get_workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id;
c1.add_workspace_members(
workspace_id,
vec![CreateWorkspaceMember {
@ -194,7 +224,13 @@ async fn workspace_owner_update_member_role() {
let (c1, _user1) = generate_unique_registered_user_client().await;
let (_c2, user2) = generate_unique_registered_user_client().await;
let workspace_id = c1.workspaces().await.unwrap().first().unwrap().workspace_id;
let workspace_id = c1
.get_workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id;
c1.add_workspace_members(
workspace_id,
vec![CreateWorkspaceMember {