mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
feat: duplicate published doc (#700)
* chore: test collab sync * feat: folder view for user workspace * feat: add private indicator * chore: use collab cache instead * chore: initial impl * chore: cargo sqlx * fix: write back to collab cache * fix: assign page id * fix: text map * chore: connect api and refactor * chore: switch to using mem storage * fix: collab type * feat: use group manager to manage sync * feat: try add send command instead * chore: add client api code * feat: try use sink and stream from group collab * chore: disable sync first * fix: insert page before stopping group * feat: add extra for view * feat: add metadata to doc * fix: icon * fix: page circular dep * fix: page circular dep * fix: live update * feat: database initial impl * feat: workspace database * chore: link database * fix: database validation * fix: workspace database oid * fix: workspace database oid * fix: specify view layout * feat: add txn when inserting collab and rework database views * fix: parent view id for new database * fix: database view for folder * fix: database ids * fix: database row id * fix: main database layout and write database before folder * fix: metadata for non database main view * chore: database in doc wip * chore: filter visible views * chore: use structure value instead of json * chore: database in doc wip * fix: remove child item push * fix: main database view meta * chore: rework inline database in doc * fix: make duplicate more error forgiving * chore: use get instead of get key value * chore: merge with main * feat: set max depth for folder view * chore: update dependency counter * chore: remove unneeded arg * chore: simplify collab new from source * chore: remove unneeded param * chore: rename and doc * feat: rework inline database and use duplicated ref * chore: simplify params * chore: fmt * feat: deduplicate database wip * fix: compatible with newer appflowy version * fix: database views * feat: database in doc * chore: cargo clippy * chore: update dockerfile rust toolchain * fix: doc inline db parent id * fix: database id * fix: document data modification * chore: minor refactor and todo * chore: remove unneeded nesting * fix: enforce ordering for views to add * chore: add doc and test case for folder view * fix: remove the need for cloning published collab * fix: some review * chore: move folder conversion logic * fix: remove collab folder deps on shared entity * chore: cargo fmt * fix: compile * chore: remove group manager dependency * fix: dep count * fix: add messages sent by server * fix: dep count * chore: add test for doc ref and inline doc database * chore: cargo clippy * chore: add more test scenarios * chore: fix tests * chore: get database id * chore: update collab * chore: add more assert and database row id checks * fix: suggestions from review * chore: sqlx * fix: accumulate collab before insert * chore: add tokio spawn blocking for encoding * fix: reduce limit for publish collabs --------- Co-authored-by: Bartosz Sypytkowski <b.sypytkowski@gmail.com> Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
parent
cf1bae3ccb
commit
826546c5cb
43 changed files with 2894 additions and 88 deletions
23
.sqlx/query-0389af6b225125d09c5a75b443561dba4d97b786d040e5b8d5a76de36716beb2.json
generated
Normal file
23
.sqlx/query-0389af6b225125d09c5a75b443561dba4d97b786d040e5b8d5a76de36716beb2.json
generated
Normal file
|
@ -0,0 +1,23 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT oid\n FROM af_collab\n WHERE workspace_id = $1\n AND partition_key = $2\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "oid",
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Int4"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "0389af6b225125d09c5a75b443561dba4d97b786d040e5b8d5a76de36716beb2"
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT EXISTS (SELECT 1 FROM af_collab_member WHERE oid = $1 AND uid = $2 LIMIT 1)\n ",
|
||||
"query": "\n SELECT EXISTS (SELECT 1 FROM af_collab_member WHERE oid = $1 AND uid = $2 LIMIT 1)\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
|
@ -19,5 +19,5 @@
|
|||
null
|
||||
]
|
||||
},
|
||||
"hash": "7af4023278eea11cf5db92040bd8947bd14df72a93eded96658187d3f9dc0e81"
|
||||
"hash": "4fffbac63c56402626b50f8f50e3ddbb1b566b6c064d8d397a4e073433a86c11"
|
||||
}
|
28
.sqlx/query-7aa6e41c80f0b2906d46e73ae05e8e70e133b7edd450b102715b8a487d6055ac.json
generated
Normal file
28
.sqlx/query-7aa6e41c80f0b2906d46e73ae05e8e70e133b7edd450b102715b8a487d6055ac.json
generated
Normal file
|
@ -0,0 +1,28 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT metadata, blob\n FROM af_published_collab\n WHERE view_id = $1\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "metadata",
|
||||
"type_info": "Jsonb"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "blob",
|
||||
"type_info": "Bytea"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "7aa6e41c80f0b2906d46e73ae05e8e70e133b7edd450b102715b8a487d6055ac"
|
||||
}
|
34
.sqlx/query-bc5df5a1fe64ed4f32654f09d0d62459d02f494912fb38b97f87c46b62a69b1f.json
generated
Normal file
34
.sqlx/query-bc5df5a1fe64ed4f32654f09d0d62459d02f494912fb38b97f87c46b62a69b1f.json
generated
Normal file
|
@ -0,0 +1,34 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n aw.publish_namespace AS namespace,\n apc.publish_name,\n apc.view_id\n FROM af_published_collab apc\n LEFT JOIN af_workspace aw\n ON apc.workspace_id = aw.workspace_id\n WHERE apc.view_id = $1;\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "namespace",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "publish_name",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "view_id",
|
||||
"type_info": "Uuid"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "bc5df5a1fe64ed4f32654f09d0d62459d02f494912fb38b97f87c46b62a69b1f"
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT EXISTS (SELECT 1 FROM af_collab WHERE oid = $1 LIMIT 1)\n ",
|
||||
"query": "\n SELECT EXISTS (SELECT 1 FROM af_collab WHERE oid = $1 LIMIT 1)\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
|
@ -18,5 +18,5 @@
|
|||
null
|
||||
]
|
||||
},
|
||||
"hash": "1c93a309d53f3c5fc976716fcbb7c84abe5dad39806e54ac9270ec8d6f7eac8d"
|
||||
"hash": "d388782f755f0b164ef36c168af142baeb9bbd3cc2b8b7cd736b346580be8790"
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n uid, oid, access_level\n FROM af_collab_member\n INNER JOIN af_permissions\n ON af_collab_member.permission_id = af_permissions.id\n ",
|
||||
"query": "\n SELECT uid, oid, access_level\n FROM af_collab_member\n INNER JOIN af_permissions\n ON af_collab_member.permission_id = af_permissions.id\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
|
@ -28,5 +28,5 @@
|
|||
false
|
||||
]
|
||||
},
|
||||
"hash": "9eba47895808e9968529d789a02758807486054028990fffef62fb89ac047750"
|
||||
"hash": "f1b56cf92eeb5f7ddda80876c1ecf5b6a5357a58d18b9bf6f14e6a2261bd1182"
|
||||
}
|
5
Cargo.lock
generated
5
Cargo.lock
generated
|
@ -632,6 +632,7 @@ dependencies = [
|
|||
"gotrue-entity",
|
||||
"governor",
|
||||
"handlebars",
|
||||
"hex",
|
||||
"image",
|
||||
"infra",
|
||||
"itertools 0.11.0",
|
||||
|
@ -678,6 +679,7 @@ dependencies = [
|
|||
"validator",
|
||||
"workspace-access",
|
||||
"workspace-template",
|
||||
"yrs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2013,6 +2015,7 @@ dependencies = [
|
|||
"client-api",
|
||||
"client-websocket",
|
||||
"collab",
|
||||
"collab-database",
|
||||
"collab-document",
|
||||
"collab-entity",
|
||||
"collab-folder",
|
||||
|
@ -2738,6 +2741,7 @@ dependencies = [
|
|||
"chrono",
|
||||
"collab",
|
||||
"collab-entity",
|
||||
"collab-rt-entity",
|
||||
"database-entity",
|
||||
"futures-util",
|
||||
"pgvector",
|
||||
|
@ -6295,6 +6299,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"serde_repr",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"validator",
|
||||
]
|
||||
|
|
|
@ -139,6 +139,7 @@ shared-entity = { path = "libs/shared-entity", features = ["cloud"] }
|
|||
workspace-template = { workspace = true }
|
||||
collab-rt-entity.workspace = true
|
||||
collab-stream.workspace = true
|
||||
yrs.workspace = true
|
||||
|
||||
tonic-build = "0.11.0"
|
||||
log = "0.4.20"
|
||||
|
@ -164,6 +165,7 @@ client-api = { path = "libs/client-api", features = [
|
|||
opener = "0.6.1"
|
||||
image = "0.23.14"
|
||||
collab-rt-entity.workspace = true
|
||||
hex = "0.4.3"
|
||||
|
||||
[[bin]]
|
||||
name = "appflowy_cloud"
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# Using cargo-chef to manage Rust build cache effectively
|
||||
FROM lukemathwalker/cargo-chef:latest-rust-1.77 as chef
|
||||
FROM lukemathwalker/cargo-chef:latest-rust-1.79 as chef
|
||||
|
||||
WORKDIR /app
|
||||
RUN apt update && apt install lld clang -y
|
||||
|
|
|
@ -26,6 +26,7 @@ image = "0.23.14"
|
|||
database-entity.workspace = true
|
||||
collab-entity.workspace = true
|
||||
shared-entity.workspace = true
|
||||
collab-database.workspace = true
|
||||
tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter", "ansi", "json"] }
|
||||
uuid = "1.6.1"
|
||||
lazy_static = "1.4.0"
|
||||
|
@ -43,4 +44,4 @@ web-sys = { version = "0.3", features = ["console"] }
|
|||
|
||||
[features]
|
||||
collab-sync = ["client-api/collab-sync"]
|
||||
ai-test-enabled = []
|
||||
ai-test-enabled = []
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Error};
|
||||
|
@ -185,13 +183,38 @@ impl TestClient {
|
|||
Folder::from_collab_doc_state(
|
||||
uid,
|
||||
CollabOrigin::Empty,
|
||||
DataSource::DocStateV1(data.encode_collab.doc_state.to_vec()),
|
||||
data.encode_collab.into(),
|
||||
&workspace_id,
|
||||
vec![],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn get_workspace_database_collab(&mut self, workspace_id: &str) -> Collab {
|
||||
let db_storage_id = self.open_workspace(workspace_id).await.database_storage_id;
|
||||
let ws_db_doc_state = self
|
||||
.get_collab(QueryCollabParams {
|
||||
workspace_id: workspace_id.to_string(),
|
||||
inner: QueryCollab {
|
||||
object_id: db_storage_id.to_string(),
|
||||
collab_type: CollabType::WorkspaceDatabase,
|
||||
},
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.encode_collab
|
||||
.doc_state
|
||||
.to_vec();
|
||||
Collab::new_with_source(
|
||||
CollabOrigin::Server,
|
||||
&db_storage_id.to_string(),
|
||||
DataSource::DocStateV1(ws_db_doc_state),
|
||||
vec![],
|
||||
false,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn get_user_awareness(&self) -> UserAwareness {
|
||||
let workspace_id = self.workspace_id().await;
|
||||
let profile = self.get_user_profile().await;
|
||||
|
@ -969,31 +992,3 @@ pub async fn get_collab_json_from_server(
|
|||
.unwrap()
|
||||
.to_json_value()
|
||||
}
|
||||
|
||||
pub struct TestTempFile(PathBuf);
|
||||
|
||||
impl TestTempFile {
|
||||
fn cleanup(dir: &PathBuf) {
|
||||
let _ = std::fs::remove_dir_all(dir);
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Path> for TestTempFile {
|
||||
fn as_ref(&self) -> &Path {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for TestTempFile {
|
||||
type Target = PathBuf;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TestTempFile {
|
||||
fn drop(&mut self) {
|
||||
Self::cleanup(&self.0)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use crate::notify::{ClientToken, TokenStateReceiver};
|
||||
use app_error::AppError;
|
||||
use client_api_entity::workspace_dto::FolderView;
|
||||
use client_api_entity::workspace_dto::QueryWorkspaceFolder;
|
||||
use client_api_entity::workspace_dto::QueryWorkspaceParam;
|
||||
use client_api_entity::AuthProvider;
|
||||
use client_api_entity::CollabType;
|
||||
|
@ -657,6 +659,37 @@ impl Client {
|
|||
.into_data()
|
||||
}
|
||||
|
||||
/// List out the views in the workspace recursively.
|
||||
/// The depth parameter specifies the depth of the folder view tree to return(default: 1).
|
||||
/// e.g., depth=1 will return only up to `Shared` and `PrivateSpace`
|
||||
/// depth=2 will return up to `mydoc1`, `mydoc2`, `mydoc3`, `mydoc4`
|
||||
///
|
||||
/// . MyWorkspace
|
||||
/// ├── Shared
|
||||
/// │ ├── mydoc1
|
||||
/// │ └── mydoc2
|
||||
/// └── PrivateSpace
|
||||
/// ├── mydoc3
|
||||
/// └── mydoc4
|
||||
#[instrument(level = "info", skip_all, err)]
|
||||
pub async fn get_workspace_folder(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
depth: Option<u32>,
|
||||
) -> Result<FolderView, AppResponseError> {
|
||||
let url = format!("{}/api/workspace/{}/folder", self.base_url, workspace_id);
|
||||
let resp = self
|
||||
.http_client_with_auth(Method::GET, &url)
|
||||
.await?
|
||||
.query(&QueryWorkspaceFolder { depth })
|
||||
.send()
|
||||
.await?;
|
||||
log_request_id(&resp);
|
||||
AppResponse::<FolderView>::from_response(resp)
|
||||
.await?
|
||||
.into_data()
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all, err)]
|
||||
pub async fn open_workspace(&self, workspace_id: &str) -> Result<AFWorkspace, AppResponseError> {
|
||||
let url = format!("{}/api/workspace/{}/open", self.base_url, workspace_id);
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use bytes::Bytes;
|
||||
use client_api_entity::{workspace_dto::PublishedDuplicate, PublishInfo, UpdatePublishNamespace};
|
||||
use client_api_entity::{
|
||||
CreateGlobalCommentParams, CreateReactionParams, DeleteGlobalCommentParams, DeleteReactionParams,
|
||||
GetReactionQueryParams, GlobalComments, PublishInfo, Reactions, UpdatePublishNamespace,
|
||||
GetReactionQueryParams, GlobalComments, Reactions,
|
||||
};
|
||||
use reqwest::Method;
|
||||
use shared_entity::response::{AppResponse, AppResponseError};
|
||||
|
@ -259,6 +260,24 @@ impl Client {
|
|||
Ok(bytes)
|
||||
}
|
||||
|
||||
pub async fn duplicate_published_to_workspace(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
publish_duplicate: &PublishedDuplicate,
|
||||
) -> Result<(), AppResponseError> {
|
||||
let url = format!(
|
||||
"{}/api/workspace/{}/published-duplicate",
|
||||
self.base_url, workspace_id
|
||||
);
|
||||
let resp = self
|
||||
.http_client_with_auth(Method::POST, &url)
|
||||
.await?
|
||||
.json(publish_duplicate)
|
||||
.send()
|
||||
.await?;
|
||||
AppResponse::<()>::from_response(resp).await?.into_error()
|
||||
}
|
||||
|
||||
pub async fn get_published_view_reactions(
|
||||
&self,
|
||||
view_id: &uuid::Uuid,
|
||||
|
|
|
@ -8,6 +8,7 @@ edition = "2021"
|
|||
[dependencies]
|
||||
collab = { workspace = true }
|
||||
collab-entity = { workspace = true }
|
||||
collab-rt-entity = { workspace = true }
|
||||
validator = { version = "0.16", features = ["validator_derive", "derive"] }
|
||||
database-entity.workspace = true
|
||||
app-error = { workspace = true, features = ["sqlx_error", "validation_error"] }
|
||||
|
|
|
@ -496,11 +496,10 @@ pub fn select_collab_member_access_level(
|
|||
sqlx::query_as!(
|
||||
AFCollabMemberAccessLevelRow,
|
||||
r#"
|
||||
SELECT
|
||||
uid, oid, access_level
|
||||
SELECT uid, oid, access_level
|
||||
FROM af_collab_member
|
||||
INNER JOIN af_permissions
|
||||
ON af_collab_member.permission_id = af_permissions.id
|
||||
ON af_collab_member.permission_id = af_permissions.id
|
||||
"#
|
||||
)
|
||||
.fetch(pg_pool)
|
||||
|
@ -513,17 +512,17 @@ pub async fn select_collab_members(
|
|||
) -> Result<Vec<AFCollabMember>, AppError> {
|
||||
let members = sqlx::query(
|
||||
r#"
|
||||
SELECT af_collab_member.uid,
|
||||
af_collab_member.oid,
|
||||
af_permissions.id,
|
||||
af_permissions.name,
|
||||
af_permissions.access_level,
|
||||
af_permissions.description
|
||||
SELECT af_collab_member.uid,
|
||||
af_collab_member.oid,
|
||||
af_permissions.id,
|
||||
af_permissions.name,
|
||||
af_permissions.access_level,
|
||||
af_permissions.description
|
||||
FROM af_collab_member
|
||||
JOIN af_permissions ON af_collab_member.permission_id = af_permissions.id
|
||||
WHERE af_collab_member.oid = $1
|
||||
ORDER BY af_collab_member.created_at ASC
|
||||
"#,
|
||||
"#,
|
||||
)
|
||||
.bind(oid)
|
||||
.try_map(collab_member_try_from_row)
|
||||
|
@ -541,11 +540,11 @@ pub async fn select_collab_member<'a, E: Executor<'a, Database = Postgres>>(
|
|||
) -> Result<AFCollabMember, AppError> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT af_collab_member.uid, af_collab_member.oid, af_permissions.id, af_permissions.name, af_permissions.access_level, af_permissions.description
|
||||
FROM af_collab_member
|
||||
JOIN af_permissions ON af_collab_member.permission_id = af_permissions.id
|
||||
WHERE af_collab_member.uid = $1 AND af_collab_member.oid = $2
|
||||
"#,
|
||||
SELECT af_collab_member.uid, af_collab_member.oid, af_permissions.id, af_permissions.name, af_permissions.access_level, af_permissions.description
|
||||
FROM af_collab_member
|
||||
JOIN af_permissions ON af_collab_member.permission_id = af_permissions.id
|
||||
WHERE af_collab_member.uid = $1 AND af_collab_member.oid = $2
|
||||
"#,
|
||||
)
|
||||
.bind(uid)
|
||||
.bind(oid)
|
||||
|
@ -580,8 +579,8 @@ pub async fn is_collab_member_exists<'a, E: Executor<'a, Database = Postgres>>(
|
|||
) -> Result<bool, sqlx::Error> {
|
||||
let result = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT EXISTS (SELECT 1 FROM af_collab_member WHERE oid = $1 AND uid = $2 LIMIT 1)
|
||||
"#,
|
||||
SELECT EXISTS (SELECT 1 FROM af_collab_member WHERE oid = $1 AND uid = $2 LIMIT 1)
|
||||
"#,
|
||||
&oid,
|
||||
&uid,
|
||||
)
|
||||
|
@ -616,11 +615,30 @@ pub async fn is_collab_exists<'a, E: Executor<'a, Database = Postgres>>(
|
|||
) -> Result<bool, sqlx::Error> {
|
||||
let result = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT EXISTS (SELECT 1 FROM af_collab WHERE oid = $1 LIMIT 1)
|
||||
"#,
|
||||
SELECT EXISTS (SELECT 1 FROM af_collab WHERE oid = $1 LIMIT 1)
|
||||
"#,
|
||||
&oid,
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await;
|
||||
transform_record_not_found_error(result)
|
||||
}
|
||||
|
||||
pub async fn select_workspace_database_oid<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
workspace_id: &Uuid,
|
||||
) -> Result<String, sqlx::Error> {
|
||||
let partition_key = partition_key_from_collab_type(&CollabType::WorkspaceDatabase);
|
||||
sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT oid
|
||||
FROM af_collab
|
||||
WHERE workspace_id = $1
|
||||
AND partition_key = $2
|
||||
"#,
|
||||
&workspace_id,
|
||||
&partition_key,
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use database_entity::dto::{
|
|||
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_entity::CollabType;
|
||||
use collab_rt_entity::ClientCollabMessage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Transaction;
|
||||
use std::collections::HashMap;
|
||||
|
@ -123,6 +124,16 @@ pub trait CollabStorage: Send + Sync + 'static {
|
|||
from_editing_collab: bool,
|
||||
) -> AppResult<EncodedCollab>;
|
||||
|
||||
/// Sends a collab message to all connected clients.
|
||||
/// # Arguments
|
||||
/// * `object_id` - The ID of the collaboration object.
|
||||
/// * `collab_messages` - The list of collab messages to broadcast.
|
||||
async fn broadcast_encode_collab(
|
||||
&self,
|
||||
object_id: String,
|
||||
collab_messages: Vec<ClientCollabMessage>,
|
||||
) -> Result<(), AppError>;
|
||||
|
||||
async fn batch_get_collab(
|
||||
&self,
|
||||
uid: &i64,
|
||||
|
@ -223,6 +234,17 @@ where
|
|||
.await
|
||||
}
|
||||
|
||||
async fn broadcast_encode_collab(
|
||||
&self,
|
||||
object_id: String,
|
||||
collab_messages: Vec<ClientCollabMessage>,
|
||||
) -> Result<(), AppError> {
|
||||
self
|
||||
.as_ref()
|
||||
.broadcast_encode_collab(object_id, collab_messages)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn batch_get_collab(
|
||||
&self,
|
||||
uid: &i64,
|
||||
|
|
|
@ -5,6 +5,7 @@ pub mod history;
|
|||
pub mod index;
|
||||
pub mod listener;
|
||||
pub mod pg_row;
|
||||
pub mod publish;
|
||||
pub mod resource_usage;
|
||||
pub mod template;
|
||||
pub mod user;
|
||||
|
|
275
libs/database/src/publish.rs
Normal file
275
libs/database/src/publish.rs
Normal file
|
@ -0,0 +1,275 @@
|
|||
use app_error::AppError;
|
||||
use database_entity::dto::{PublishCollabItem, PublishInfo};
|
||||
use sqlx::{Executor, PgPool, Postgres};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn select_user_is_collab_publisher_for_all_views(
|
||||
pg_pool: &PgPool,
|
||||
user_uuid: &Uuid,
|
||||
workspace_uuid: &Uuid,
|
||||
view_ids: &[Uuid],
|
||||
) -> Result<bool, AppError> {
|
||||
let count = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT COUNT(*)
|
||||
FROM af_published_collab
|
||||
WHERE workspace_id = $1
|
||||
AND view_id = ANY($2)
|
||||
AND published_by = (SELECT uid FROM af_user WHERE uuid = $3)
|
||||
"#,
|
||||
workspace_uuid,
|
||||
view_ids,
|
||||
user_uuid,
|
||||
)
|
||||
.fetch_one(pg_pool)
|
||||
.await?;
|
||||
|
||||
match count {
|
||||
Some(c) => Ok(c == view_ids.len() as i64),
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn select_workspace_publish_namespace_exists<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
workspace_id: &Uuid,
|
||||
namespace: &str,
|
||||
) -> Result<bool, AppError> {
|
||||
let res = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT EXISTS(
|
||||
SELECT 1
|
||||
FROM af_workspace
|
||||
WHERE workspace_id = $1
|
||||
AND publish_namespace = $2
|
||||
)
|
||||
"#,
|
||||
workspace_id,
|
||||
namespace,
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
|
||||
Ok(res.unwrap_or(false))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn update_workspace_publish_namespace<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
workspace_id: &Uuid,
|
||||
new_namespace: &str,
|
||||
) -> Result<(), AppError> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
UPDATE af_workspace
|
||||
SET publish_namespace = $1
|
||||
WHERE workspace_id = $2
|
||||
"#,
|
||||
new_namespace,
|
||||
workspace_id,
|
||||
)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
|
||||
if res.rows_affected() != 1 {
|
||||
tracing::error!(
|
||||
"Failed to update workspace publish namespace, workspace_id: {}, new_namespace: {}, rows_affected: {}",
|
||||
workspace_id, new_namespace, res.rows_affected()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn select_workspace_publish_namespace<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
workspace_id: &Uuid,
|
||||
) -> Result<String, AppError> {
|
||||
let res = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT publish_namespace
|
||||
FROM af_workspace
|
||||
WHERE workspace_id = $1
|
||||
"#,
|
||||
workspace_id,
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn insert_or_replace_publish_collabs<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
workspace_id: &Uuid,
|
||||
publisher_uuid: &Uuid,
|
||||
publish_items: Vec<PublishCollabItem<serde_json::Value, Vec<u8>>>,
|
||||
) -> Result<(), AppError> {
|
||||
let item_count = publish_items.len();
|
||||
let mut view_ids: Vec<Uuid> = Vec::with_capacity(item_count);
|
||||
let mut publish_names: Vec<String> = Vec::with_capacity(item_count);
|
||||
let mut metadatas: Vec<serde_json::Value> = Vec::with_capacity(item_count);
|
||||
let mut blobs: Vec<Vec<u8>> = Vec::with_capacity(item_count);
|
||||
publish_items.into_iter().for_each(|item| {
|
||||
view_ids.push(item.meta.view_id);
|
||||
publish_names.push(item.meta.publish_name);
|
||||
metadatas.push(item.meta.metadata);
|
||||
blobs.push(item.data);
|
||||
});
|
||||
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO af_published_collab (workspace_id, view_id, publish_name, published_by, metadata, blob)
|
||||
SELECT * FROM UNNEST(
|
||||
(SELECT array_agg((SELECT $1::uuid)) FROM generate_series(1, $7))::uuid[],
|
||||
$2::uuid[],
|
||||
$3::text[],
|
||||
(SELECT array_agg((SELECT uid FROM af_user WHERE uuid = $4)) FROM generate_series(1, $7))::bigint[],
|
||||
$5::jsonb[],
|
||||
$6::bytea[]
|
||||
)
|
||||
ON CONFLICT (workspace_id, view_id) DO UPDATE
|
||||
SET metadata = EXCLUDED.metadata,
|
||||
blob = EXCLUDED.blob,
|
||||
published_by = EXCLUDED.published_by,
|
||||
publish_name = EXCLUDED.publish_name
|
||||
"#,
|
||||
workspace_id,
|
||||
&view_ids,
|
||||
&publish_names,
|
||||
publisher_uuid,
|
||||
&metadatas,
|
||||
&blobs,
|
||||
item_count as i32,
|
||||
)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
|
||||
if res.rows_affected() != item_count as u64 {
|
||||
tracing::warn!(
|
||||
"Failed to insert or replace publish collab meta batch, workspace_id: {}, publisher_uuid: {}, rows_affected: {}, item_count: {}",
|
||||
workspace_id, publisher_uuid, res.rows_affected(), item_count
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn select_publish_collab_meta<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
publish_namespace: &str,
|
||||
publish_name: &str,
|
||||
) -> Result<serde_json::Value, AppError> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
SELECT metadata
|
||||
FROM af_published_collab
|
||||
WHERE workspace_id = (SELECT workspace_id FROM af_workspace WHERE publish_namespace = $1)
|
||||
AND publish_name = $2
|
||||
"#,
|
||||
publish_namespace,
|
||||
publish_name,
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
let metadata: serde_json::Value = res.metadata;
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn delete_published_collabs<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
workspace_id: &Uuid,
|
||||
view_ids: &[Uuid],
|
||||
) -> Result<(), AppError> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
DELETE FROM af_published_collab
|
||||
WHERE workspace_id = $1
|
||||
AND view_id = ANY($2)
|
||||
"#,
|
||||
workspace_id,
|
||||
view_ids,
|
||||
)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
|
||||
if res.rows_affected() != view_ids.len() as u64 {
|
||||
tracing::error!(
|
||||
"Failed to delete published collabs, workspace_id: {}, view_ids: {:?}, rows_affected: {}",
|
||||
workspace_id,
|
||||
view_ids,
|
||||
res.rows_affected()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn select_published_data_for_view_id(
|
||||
pg_pool: &PgPool,
|
||||
view_id: &Uuid,
|
||||
) -> Result<Option<(serde_json::Value, Vec<u8>)>, AppError> {
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
SELECT metadata, blob
|
||||
FROM af_published_collab
|
||||
WHERE view_id = $1
|
||||
"#,
|
||||
view_id,
|
||||
)
|
||||
.fetch_optional(pg_pool)
|
||||
.await?;
|
||||
Ok(res.map(|res| (res.metadata, res.blob)))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn select_published_collab_blob<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
publish_namespace: &str,
|
||||
publish_name: &str,
|
||||
) -> Result<Vec<u8>, AppError> {
|
||||
let res = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT blob
|
||||
FROM af_published_collab
|
||||
WHERE workspace_id = (SELECT workspace_id FROM af_workspace WHERE publish_namespace = $1)
|
||||
AND publish_name = $2
|
||||
"#,
|
||||
publish_namespace,
|
||||
publish_name,
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn select_published_collab_info<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
view_id: &Uuid,
|
||||
) -> Result<PublishInfo, AppError> {
|
||||
let res = sqlx::query_as!(
|
||||
PublishInfo,
|
||||
r#"
|
||||
SELECT
|
||||
aw.publish_namespace AS namespace,
|
||||
apc.publish_name,
|
||||
apc.view_id
|
||||
FROM af_published_collab apc
|
||||
LEFT JOIN af_workspace aw
|
||||
ON apc.workspace_id = aw.workspace_id
|
||||
WHERE apc.view_id = $1;
|
||||
"#,
|
||||
view_id,
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
|
||||
Ok(res)
|
||||
}
|
|
@ -28,6 +28,7 @@ validator = { version = "0.16", features = ["validator_derive", "derive"], optio
|
|||
futures = "0.3.30"
|
||||
bytes = "1.6.0"
|
||||
log = "0.4.21"
|
||||
tracing = { workspace = true }
|
||||
|
||||
|
||||
[features]
|
||||
|
|
|
@ -2,5 +2,6 @@ pub mod ai_dto;
|
|||
pub mod auth_dto;
|
||||
pub mod billing_dto;
|
||||
pub mod history_dto;
|
||||
pub mod publish_dto;
|
||||
pub mod search_dto;
|
||||
pub mod workspace_dto;
|
||||
|
|
61
libs/shared-entity/src/dto/publish_dto.rs
Normal file
61
libs/shared-entity/src/dto/publish_dto.rs
Normal file
|
@ -0,0 +1,61 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::workspace_dto::{ViewIcon, ViewLayout};
|
||||
|
||||
/// Copied from AppFlowy-IO/AppFlowy/frontend/rust-lib/flowy-folder-pub/src/entities.rs
|
||||
/// TODO(zack): make AppFlowy use from this crate instead
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct PublishViewMeta {
|
||||
pub metadata: PublishViewMetaData,
|
||||
pub view_id: String,
|
||||
pub publish_name: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)]
|
||||
pub struct PublishViewMetaData {
|
||||
pub view: PublishViewInfo,
|
||||
pub child_views: Vec<PublishViewInfo>,
|
||||
pub ancestor_views: Vec<PublishViewInfo>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)]
|
||||
pub struct PublishViewInfo {
|
||||
pub view_id: String,
|
||||
pub name: String,
|
||||
pub icon: Option<ViewIcon>,
|
||||
pub layout: ViewLayout,
|
||||
pub extra: Option<String>,
|
||||
pub created_by: Option<i64>,
|
||||
pub last_edited_by: Option<i64>,
|
||||
pub last_edited_time: i64,
|
||||
pub created_at: i64,
|
||||
pub child_views: Option<Vec<PublishViewInfo>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct PublishDatabasePayload {
|
||||
pub meta: PublishViewMeta,
|
||||
pub data: PublishDatabaseData,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)]
|
||||
pub struct PublishDatabaseData {
|
||||
/// The encoded collab data for the database itself
|
||||
pub database_collab: Vec<u8>,
|
||||
|
||||
/// The encoded collab data for the database rows
|
||||
/// Use the row_id as the key
|
||||
pub database_row_collabs: HashMap<String, Vec<u8>>,
|
||||
|
||||
/// The encoded collab data for the documents inside the database rows
|
||||
/// It's not used for now
|
||||
pub database_row_document_collabs: HashMap<String, Vec<u8>>,
|
||||
|
||||
/// Visible view ids
|
||||
pub visible_database_view_ids: Vec<String>,
|
||||
|
||||
/// Relation view id map
|
||||
pub database_relations: HashMap<String, String>,
|
||||
}
|
|
@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
|
|||
use collab_entity::{CollabType, EncodedCollab};
|
||||
use database_entity::dto::{AFRole, AFWorkspaceInvitationStatus};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||
use std::ops::Deref;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -121,7 +122,54 @@ pub struct CollabResponse {
|
|||
pub object_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PublishedDuplicate {
|
||||
pub published_view_id: String,
|
||||
pub dest_view_id: String,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct FolderView {
|
||||
pub view_id: String,
|
||||
pub name: String,
|
||||
pub icon: Option<ViewIcon>,
|
||||
pub is_space: bool,
|
||||
pub is_private: bool,
|
||||
/// contains fields like `is_space`, and font information
|
||||
pub extra: Option<serde_json::Value>,
|
||||
pub children: Vec<FolderView>,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize_repr, Deserialize_repr)]
|
||||
#[repr(u8)]
|
||||
pub enum IconType {
|
||||
Emoji = 0,
|
||||
Url = 1,
|
||||
Icon = 2,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ViewIcon {
|
||||
pub ty: IconType,
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize_repr, Deserialize_repr)]
|
||||
#[repr(u8)]
|
||||
pub enum ViewLayout {
|
||||
Document = 0,
|
||||
Grid = 1,
|
||||
Board = 2,
|
||||
Calendar = 3,
|
||||
Chat = 4,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Deserialize, Serialize)]
|
||||
pub struct QueryWorkspaceParam {
|
||||
pub include_member_count: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Deserialize, Serialize)]
|
||||
pub struct QueryWorkspaceFolder {
|
||||
pub depth: Option<u32>,
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
# Generate the current dependency list
|
||||
cargo tree > current_deps.txt
|
||||
|
||||
BASELINE_COUNT=609
|
||||
BASELINE_COUNT=610
|
||||
CURRENT_COUNT=$(cat current_deps.txt | wc -l)
|
||||
|
||||
echo "Expected dependency count (baseline): $BASELINE_COUNT"
|
||||
|
|
|
@ -203,7 +203,7 @@ impl CollabMemCache {
|
|||
// Perform update only if the new timestamp is greater than the existing one
|
||||
if current_value
|
||||
.as_ref()
|
||||
.map_or(true, |(ts, _)| timestamp > *ts)
|
||||
.map_or(true, |(ts, _)| timestamp >= *ts)
|
||||
{
|
||||
let mut pipeline = pipe();
|
||||
let data = [timestamp.to_be_bytes().as_ref(), data].concat();
|
||||
|
|
|
@ -5,10 +5,11 @@ use std::time::Duration;
|
|||
use async_trait::async_trait;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_entity::CollabType;
|
||||
use collab_rt_entity::ClientCollabMessage;
|
||||
use itertools::{Either, Itertools};
|
||||
use sqlx::Transaction;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use tracing::warn;
|
||||
use tracing::{error, instrument, trace};
|
||||
use validator::Validate;
|
||||
|
||||
|
@ -123,7 +124,7 @@ where
|
|||
|
||||
async fn get_encode_collab_from_editing(&self, object_id: &str) -> Option<EncodedCollab> {
|
||||
let object_id = object_id.to_string();
|
||||
let (ret, rx) = oneshot::channel();
|
||||
let (ret, rx) = tokio::sync::oneshot::channel();
|
||||
let timeout_duration = Duration::from_secs(5);
|
||||
|
||||
// Attempt to send the command to the realtime server
|
||||
|
@ -410,4 +411,39 @@ where
|
|||
error!("Failed to remove connected user: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
async fn broadcast_encode_collab(
|
||||
&self,
|
||||
object_id: String,
|
||||
collab_messages: Vec<ClientCollabMessage>,
|
||||
) -> Result<(), AppError> {
|
||||
let (sender, recv) = tokio::sync::oneshot::channel();
|
||||
|
||||
self
|
||||
.rt_cmd_sender
|
||||
.send(CollaborationCommand::ServerSendCollabMessage {
|
||||
object_id,
|
||||
collab_messages,
|
||||
ret: sender,
|
||||
})
|
||||
.await
|
||||
.map_err(|err| {
|
||||
AppError::Unhandled(format!(
|
||||
"Failed to send encode collab command to realtime server: {}",
|
||||
err
|
||||
))
|
||||
})?;
|
||||
|
||||
match recv.await {
|
||||
Ok(res) =>
|
||||
if let Err(err) = res {
|
||||
error!("Failed to broadcast encode collab: {}", err);
|
||||
}
|
||||
,
|
||||
// caller may have dropped the receiver
|
||||
Err(err) => warn!("Failed to receive response from realtime server: {}", err),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
use crate::group::cmd::{GroupCommand, GroupCommandSender};
|
||||
use crate::{
|
||||
error::RealtimeError,
|
||||
group::cmd::{GroupCommand, GroupCommandSender},
|
||||
};
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_rt_entity::ClientCollabMessage;
|
||||
use dashmap::DashMap;
|
||||
use std::sync::Arc;
|
||||
use tracing::error;
|
||||
|
@ -13,6 +17,11 @@ pub enum CollaborationCommand {
|
|||
object_id: String,
|
||||
ret: EncodeCollabSender,
|
||||
},
|
||||
ServerSendCollabMessage {
|
||||
object_id: String,
|
||||
collab_messages: Vec<ClientCollabMessage>,
|
||||
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
|
||||
},
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_collaboration_command(
|
||||
|
@ -41,6 +50,24 @@ pub(crate) fn spawn_collaboration_command(
|
|||
},
|
||||
}
|
||||
},
|
||||
CollaborationCommand::ServerSendCollabMessage {
|
||||
object_id,
|
||||
collab_messages,
|
||||
ret,
|
||||
} => {
|
||||
if let Some(sender) = group_sender_by_object_id.get(&object_id) {
|
||||
if let Err(err) = sender
|
||||
.send(GroupCommand::HandleServerCollabMessage {
|
||||
object_id,
|
||||
collab_messages,
|
||||
ret,
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::error!("Send group command error: {}", err);
|
||||
};
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -218,7 +218,10 @@ impl CollabBroadcast {
|
|||
error!("fail to broadcast message:{}", err);
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
Err(e) => {
|
||||
error!("fail to receive message:{}", e);
|
||||
break;
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -313,7 +316,7 @@ async fn handle_client_messages<Sink>(
|
|||
Ok(response) => {
|
||||
trace!("[realtime]: sending response: {}", response);
|
||||
match sink.send(response.into()).await {
|
||||
Ok(_) => {},
|
||||
Ok(()) => {},
|
||||
Err(err) => {
|
||||
trace!("[realtime]: send failed: {}", err);
|
||||
break;
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::stream;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::entity::EncodedCollab;
|
||||
use dashmap::DashMap;
|
||||
use futures_util::StreamExt;
|
||||
|
@ -19,6 +21,7 @@ use crate::group::manager::GroupManager;
|
|||
/// Using [GroupCommand] to interact with the group
|
||||
/// - HandleClientCollabMessage: Handle the client message
|
||||
/// - EncodeCollab: Encode the collab
|
||||
/// - HandleServerCollabMessage: Handle the server message
|
||||
pub enum GroupCommand {
|
||||
HandleClientCollabMessage {
|
||||
user: RealtimeUser,
|
||||
|
@ -30,6 +33,11 @@ pub enum GroupCommand {
|
|||
object_id: String,
|
||||
ret: tokio::sync::oneshot::Sender<Option<EncodedCollab>>,
|
||||
},
|
||||
HandleServerCollabMessage {
|
||||
object_id: String,
|
||||
collab_messages: Vec<ClientCollabMessage>,
|
||||
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
|
||||
},
|
||||
}
|
||||
|
||||
pub type GroupCommandSender = tokio::sync::mpsc::Sender<GroupCommand>;
|
||||
|
@ -88,6 +96,18 @@ where
|
|||
warn!("Send encode collab fail");
|
||||
}
|
||||
},
|
||||
GroupCommand::HandleServerCollabMessage {
|
||||
object_id,
|
||||
collab_messages,
|
||||
ret,
|
||||
} => {
|
||||
let res = self
|
||||
.handle_server_collab_messages(object_id, collab_messages)
|
||||
.await;
|
||||
if let Err(err) = ret.send(res) {
|
||||
warn!("Send handle server collab message result fail: {:?}", err);
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
@ -169,6 +189,50 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// similar to `handle_client_collab_message`, but the messages are sent from the server instead.
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn handle_server_collab_messages(
|
||||
&self,
|
||||
object_id: String,
|
||||
messages: Vec<ClientCollabMessage>,
|
||||
) -> Result<(), RealtimeError> {
|
||||
if messages.is_empty() {
|
||||
warn!("Unexpected empty collab messages sent from server");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let server_rt_user = RealtimeUser {
|
||||
uid: 0,
|
||||
device_id: "server".to_string(),
|
||||
connect_at: chrono::Utc::now().timestamp_millis(),
|
||||
session_id: uuid::Uuid::new_v4().to_string(),
|
||||
app_version: "".to_string(),
|
||||
};
|
||||
|
||||
if let Some(group) = self.group_manager.get_group(&object_id).await {
|
||||
let (collab_message_sender, _collab_message_receiver) = futures::channel::mpsc::channel(1);
|
||||
let (mut message_by_oid_sender, message_by_oid_receiver) = futures::channel::mpsc::channel(1);
|
||||
group
|
||||
.subscribe(
|
||||
&server_rt_user,
|
||||
CollabOrigin::Server,
|
||||
collab_message_sender,
|
||||
message_by_oid_receiver,
|
||||
)
|
||||
.await;
|
||||
let message = HashMap::from([(object_id.clone(), messages)]);
|
||||
if let Err(err) = message_by_oid_sender.try_send(message) {
|
||||
tracing::error!(
|
||||
"failed to send message to group: {}, object_id: {}",
|
||||
err,
|
||||
object_id
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn subscribe_group(
|
||||
&self,
|
||||
user: &RealtimeUser,
|
||||
|
|
|
@ -164,6 +164,7 @@ impl CollabGroup {
|
|||
);
|
||||
|
||||
if let Some(mut old) = self.subscribers.insert((*user).clone(), sub) {
|
||||
tracing::warn!("{}: remove old subscriber: {}", &self.object_id, user);
|
||||
old.stop().await;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,8 @@ use collab_rt_entity::RealtimeMessage;
|
|||
use collab_rt_protocol::validate_encode_collab;
|
||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||
use database::user::select_uid_from_email;
|
||||
use database_entity::dto::PublishCollabItem;
|
||||
use database_entity::dto::PublishInfo;
|
||||
use database_entity::dto::*;
|
||||
use shared_entity::dto::workspace_dto::*;
|
||||
use shared_entity::response::AppResponseError;
|
||||
|
@ -143,6 +145,10 @@ pub fn workspace_scope() -> Scope {
|
|||
web::resource("/published/{publish_namespace}/{publish_name}/blob")
|
||||
.route(web::get().to(get_published_collab_blob_handler))
|
||||
)
|
||||
.service(
|
||||
web::resource("{workspace_id}/published-duplicate")
|
||||
.route(web::post().to(post_published_duplicate_handler))
|
||||
)
|
||||
.service(
|
||||
web::resource("/published-info/{view_id}")
|
||||
.route(web::get().to(get_published_collab_info_handler))
|
||||
|
@ -169,6 +175,10 @@ pub fn workspace_scope() -> Scope {
|
|||
.route(web::post().to(post_publish_collabs_handler))
|
||||
.route(web::delete().to(delete_published_collabs_handler))
|
||||
)
|
||||
.service(
|
||||
web::resource("/{workspace_id}/folder")
|
||||
.route(web::get().to(get_workspace_folder_handler))
|
||||
)
|
||||
.service(
|
||||
web::resource("/{workspace_id}/collab/{object_id}/member/list")
|
||||
.route(web::get().to(get_collab_member_list_handler)),
|
||||
|
@ -1049,7 +1059,7 @@ async fn put_publish_namespace_handler(
|
|||
) -> Result<Json<AppResponse<()>>> {
|
||||
let workspace_id = workspace_id.into_inner();
|
||||
let new_namespace = payload.into_inner().new_namespace;
|
||||
biz::workspace::ops::set_workspace_namespace(
|
||||
biz::workspace::publish::set_workspace_namespace(
|
||||
&state.pg_pool,
|
||||
&user_uuid,
|
||||
&workspace_id,
|
||||
|
@ -1065,7 +1075,7 @@ async fn get_publish_namespace_handler(
|
|||
) -> Result<Json<AppResponse<String>>> {
|
||||
let workspace_id = workspace_id.into_inner();
|
||||
let namespace =
|
||||
biz::workspace::ops::get_workspace_publish_namespace(&state.pg_pool, &workspace_id).await?;
|
||||
biz::workspace::publish::get_workspace_publish_namespace(&state.pg_pool, &workspace_id).await?;
|
||||
Ok(Json(AppResponse::Ok().with_data(namespace)))
|
||||
}
|
||||
|
||||
|
@ -1074,9 +1084,12 @@ async fn get_published_collab_handler(
|
|||
state: Data<AppState>,
|
||||
) -> Result<Json<serde_json::Value>> {
|
||||
let (workspace_namespace, publish_name) = path_param.into_inner();
|
||||
let metadata =
|
||||
biz::workspace::ops::get_published_collab(&state.pg_pool, &workspace_namespace, &publish_name)
|
||||
.await?;
|
||||
let metadata = biz::workspace::publish::get_published_collab(
|
||||
&state.pg_pool,
|
||||
&workspace_namespace,
|
||||
&publish_name,
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(metadata))
|
||||
}
|
||||
|
||||
|
@ -1085,7 +1098,7 @@ async fn get_published_collab_blob_handler(
|
|||
state: Data<AppState>,
|
||||
) -> Result<Vec<u8>> {
|
||||
let (publish_namespace, publish_name) = path_param.into_inner();
|
||||
let collab_data = biz::workspace::ops::get_published_collab_blob(
|
||||
let collab_data = biz::workspace::publish::get_published_collab_blob(
|
||||
&state.pg_pool,
|
||||
&publish_namespace,
|
||||
&publish_name,
|
||||
|
@ -1094,13 +1107,33 @@ async fn get_published_collab_blob_handler(
|
|||
Ok(collab_data)
|
||||
}
|
||||
|
||||
async fn post_published_duplicate_handler(
|
||||
user_uuid: UserUuid,
|
||||
workspace_id: web::Path<String>,
|
||||
state: Data<AppState>,
|
||||
params: Json<PublishedDuplicate>,
|
||||
) -> Result<Json<AppResponse<()>>> {
|
||||
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
|
||||
let params = params.into_inner();
|
||||
biz::workspace::publish_dup::duplicate_published_collab_to_workspace(
|
||||
&state.pg_pool,
|
||||
state.collab_access_control_storage.clone(),
|
||||
uid,
|
||||
params.published_view_id,
|
||||
workspace_id.into_inner(),
|
||||
params.dest_view_id,
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(AppResponse::Ok()))
|
||||
}
|
||||
|
||||
async fn get_published_collab_info_handler(
|
||||
view_id: web::Path<Uuid>,
|
||||
state: Data<AppState>,
|
||||
) -> Result<Json<AppResponse<PublishInfo>>> {
|
||||
let view_id = view_id.into_inner();
|
||||
let collab_data =
|
||||
biz::workspace::ops::get_published_collab_info(&state.pg_pool, &view_id).await?;
|
||||
biz::workspace::publish::get_published_collab_info(&state.pg_pool, &view_id).await?;
|
||||
Ok(Json(AppResponse::Ok().with_data(collab_data)))
|
||||
}
|
||||
|
||||
|
@ -1205,7 +1238,7 @@ async fn post_publish_collabs_handler(
|
|||
let meta: PublishCollabMetadata<serde_json::Value> = {
|
||||
let meta_len = payload_reader.read_u32_little_endian().await?;
|
||||
if meta_len > 4 * 1024 * 1024 {
|
||||
// 4MB Limit for metadata
|
||||
// 4MiB Limit for metadata
|
||||
return Err(AppError::InvalidRequest(String::from("metadata too large")).into());
|
||||
}
|
||||
if meta_len == 0 {
|
||||
|
@ -1219,8 +1252,8 @@ async fn post_publish_collabs_handler(
|
|||
|
||||
let data = {
|
||||
let data_len = payload_reader.read_u32_little_endian().await?;
|
||||
if data_len > 128 * 1024 * 1024 {
|
||||
// 128MB Limit for data
|
||||
if data_len > 32 * 1024 * 1024 {
|
||||
// 32MiB Limit for data
|
||||
return Err(AppError::InvalidRequest(String::from("data too large")).into());
|
||||
}
|
||||
let mut data_buffer = vec![0; data_len as usize];
|
||||
|
@ -1236,7 +1269,7 @@ async fn post_publish_collabs_handler(
|
|||
AppError::InvalidRequest(String::from("did not receive any data to publish")).into(),
|
||||
);
|
||||
}
|
||||
biz::workspace::ops::publish_collabs(&state.pg_pool, &workspace_id, &user_uuid, &accumulator)
|
||||
biz::workspace::publish::publish_collabs(&state.pg_pool, &workspace_id, &user_uuid, accumulator)
|
||||
.await?;
|
||||
Ok(Json(AppResponse::Ok()))
|
||||
}
|
||||
|
@ -1252,7 +1285,7 @@ async fn delete_published_collabs_handler(
|
|||
if view_ids.is_empty() {
|
||||
return Ok(Json(AppResponse::Ok()));
|
||||
}
|
||||
biz::workspace::ops::delete_published_workspace_collab(
|
||||
biz::workspace::publish::delete_published_workspace_collab(
|
||||
&state.pg_pool,
|
||||
&workspace_id,
|
||||
&view_ids,
|
||||
|
@ -1340,6 +1373,24 @@ async fn get_workspace_usage_handler(
|
|||
Ok(Json(AppResponse::Ok().with_data(res)))
|
||||
}
|
||||
|
||||
async fn get_workspace_folder_handler(
|
||||
user_uuid: UserUuid,
|
||||
workspace_id: web::Path<String>,
|
||||
state: Data<AppState>,
|
||||
query: web::Query<QueryWorkspaceFolder>,
|
||||
) -> Result<Json<AppResponse<FolderView>>> {
|
||||
let depth = query.depth.unwrap_or(1);
|
||||
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
|
||||
let folder_view = biz::collab::ops::get_user_workspace_structure(
|
||||
state.collab_access_control_storage.clone(),
|
||||
uid,
|
||||
workspace_id.into_inner(),
|
||||
depth,
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(AppResponse::Ok().with_data(folder_view)))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn parser_realtime_msg(
|
||||
payload: Bytes,
|
||||
|
|
162
src/biz/collab/folder_view.rs
Normal file
162
src/biz/collab/folder_view.rs
Normal file
|
@ -0,0 +1,162 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use collab_folder::Folder;
|
||||
use shared_entity::dto::workspace_dto::FolderView;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn collab_folder_to_folder_view(folder: &Folder, depth: u32) -> FolderView {
|
||||
let mut unviewable = HashSet::new();
|
||||
for private_section in folder.get_all_private_sections() {
|
||||
unviewable.insert(private_section.id);
|
||||
}
|
||||
for trash_view in folder.get_all_trash_sections() {
|
||||
unviewable.insert(trash_view.id);
|
||||
}
|
||||
|
||||
let mut private_views = HashSet::new();
|
||||
for private_section in folder.get_my_private_sections() {
|
||||
unviewable.remove(&private_section.id);
|
||||
private_views.insert(private_section.id);
|
||||
}
|
||||
|
||||
let workspace_id = folder.get_workspace_id().unwrap_or_else(|| {
|
||||
tracing::error!("failed to get workspace_id");
|
||||
Uuid::nil().to_string()
|
||||
});
|
||||
let root = match folder.get_view(&workspace_id) {
|
||||
Some(root) => root,
|
||||
None => {
|
||||
tracing::error!("failed to get root view, workspace_id: {}", workspace_id);
|
||||
return FolderView::default();
|
||||
},
|
||||
};
|
||||
|
||||
let extra = root.extra.as_deref().map(|extra| {
|
||||
serde_json::from_str::<serde_json::Value>(extra).unwrap_or_else(|e| {
|
||||
tracing::error!("failed to parse extra field({}): {}", extra, e);
|
||||
serde_json::Value::Null
|
||||
})
|
||||
});
|
||||
|
||||
FolderView {
|
||||
view_id: root.id.clone(),
|
||||
name: root.name.clone(),
|
||||
icon: root
|
||||
.icon
|
||||
.as_ref()
|
||||
.map(|icon| to_dto_view_icon(icon.clone())),
|
||||
is_space: false,
|
||||
is_private: false,
|
||||
extra,
|
||||
children: if depth == 0 {
|
||||
vec![]
|
||||
} else {
|
||||
root
|
||||
.children
|
||||
.iter()
|
||||
.filter(|v| !unviewable.contains(&v.id))
|
||||
.map(|v| {
|
||||
let intermediate = FolderViewIntermediate {
|
||||
folder,
|
||||
view_id: &v.id,
|
||||
unviewable: &unviewable,
|
||||
private_views: &private_views,
|
||||
depth,
|
||||
};
|
||||
FolderView::from(intermediate)
|
||||
})
|
||||
.collect()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
struct FolderViewIntermediate<'a> {
|
||||
folder: &'a Folder,
|
||||
view_id: &'a str,
|
||||
unviewable: &'a HashSet<String>,
|
||||
private_views: &'a HashSet<String>,
|
||||
depth: u32,
|
||||
}
|
||||
|
||||
impl<'a> From<FolderViewIntermediate<'a>> for FolderView {
|
||||
fn from(fv: FolderViewIntermediate) -> Self {
|
||||
let view = match fv.folder.get_view(fv.view_id) {
|
||||
Some(view) => view,
|
||||
None => {
|
||||
tracing::error!("failed to get view, view_id: {}", fv.view_id);
|
||||
return Self::default();
|
||||
},
|
||||
};
|
||||
let extra = view.extra.as_deref().map(|extra| {
|
||||
serde_json::from_str::<serde_json::Value>(extra).unwrap_or_else(|e| {
|
||||
tracing::error!("failed to parse extra field({}): {}", extra, e);
|
||||
serde_json::Value::Null
|
||||
})
|
||||
});
|
||||
|
||||
Self {
|
||||
view_id: view.id.clone(),
|
||||
name: view.name.clone(),
|
||||
icon: view
|
||||
.icon
|
||||
.as_ref()
|
||||
.map(|icon| to_dto_view_icon(icon.clone())),
|
||||
is_space: view_is_space(&view),
|
||||
is_private: fv.private_views.contains(&view.id),
|
||||
extra,
|
||||
children: if fv.depth == 1 {
|
||||
vec![]
|
||||
} else {
|
||||
view
|
||||
.children
|
||||
.iter()
|
||||
.filter(|v| !fv.unviewable.contains(&v.id))
|
||||
.map(|v| {
|
||||
FolderView::from(FolderViewIntermediate {
|
||||
folder: fv.folder,
|
||||
view_id: &v.id,
|
||||
unviewable: fv.unviewable,
|
||||
private_views: fv.private_views,
|
||||
depth: fv.depth - 1,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn view_is_space(view: &collab_folder::View) -> bool {
|
||||
let extra = match view.extra.as_ref() {
|
||||
Some(extra) => extra,
|
||||
None => return false,
|
||||
};
|
||||
let value = match serde_json::from_str::<serde_json::Value>(extra) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
tracing::error!("failed to parse extra field({}): {}", extra, e);
|
||||
return false;
|
||||
},
|
||||
};
|
||||
match value.get("is_space") {
|
||||
Some(is_space_str) => is_space_str.as_bool().unwrap_or(false),
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_dto_view_icon(icon: collab_folder::ViewIcon) -> shared_entity::dto::workspace_dto::ViewIcon {
|
||||
shared_entity::dto::workspace_dto::ViewIcon {
|
||||
ty: to_dto_view_icon_type(icon.ty),
|
||||
value: icon.value,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_dto_view_icon_type(
|
||||
icon: collab_folder::IconType,
|
||||
) -> shared_entity::dto::workspace_dto::IconType {
|
||||
match icon {
|
||||
collab_folder::IconType::Emoji => shared_entity::dto::workspace_dto::IconType::Emoji,
|
||||
collab_folder::IconType::Url => shared_entity::dto::workspace_dto::IconType::Url,
|
||||
collab_folder::IconType::Icon => shared_entity::dto::workspace_dto::IconType::Icon,
|
||||
}
|
||||
}
|
|
@ -1,2 +1,3 @@
|
|||
pub mod access_control;
|
||||
pub mod folder_view;
|
||||
pub mod ops;
|
||||
|
|
|
@ -1,17 +1,30 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use app_error::AppError;
|
||||
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
|
||||
use collab_entity::CollabType;
|
||||
use collab_entity::EncodedCollab;
|
||||
use collab_folder::{CollabOrigin, Folder};
|
||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||
use database_entity::dto::QueryCollab;
|
||||
use database_entity::dto::QueryCollabParams;
|
||||
use sqlx::PgPool;
|
||||
use std::ops::DerefMut;
|
||||
|
||||
use anyhow::Context;
|
||||
use sqlx::{types::Uuid, PgPool};
|
||||
use shared_entity::dto::workspace_dto::FolderView;
|
||||
use sqlx::types::Uuid;
|
||||
use tracing::{event, trace};
|
||||
use validator::Validate;
|
||||
|
||||
use access_control::collab::CollabAccessControl;
|
||||
use app_error::AppError;
|
||||
use database_entity::dto::{
|
||||
AFCollabMember, CollabMemberIdentify, InsertCollabMemberParams, QueryCollabMembers,
|
||||
UpdateCollabMemberParams,
|
||||
};
|
||||
|
||||
use super::folder_view::collab_folder_to_folder_view;
|
||||
|
||||
/// Create a new collab member
|
||||
/// If the collab member already exists, return [AppError::RecordAlreadyExists]
|
||||
/// If the collab member does not exist, create a new one
|
||||
|
@ -129,6 +142,7 @@ pub async fn delete_collab_member(
|
|||
.context("fail to commit the transaction to remove collab member")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_collab_member_list(
|
||||
pg_pool: &PgPool,
|
||||
params: &QueryCollabMembers,
|
||||
|
@ -137,3 +151,67 @@ pub async fn get_collab_member_list(
|
|||
let collab_member = database::collab::select_collab_members(¶ms.object_id, pg_pool).await?;
|
||||
Ok(collab_member)
|
||||
}
|
||||
|
||||
pub async fn get_user_workspace_structure(
|
||||
collab_storage: Arc<CollabAccessControlStorage>,
|
||||
uid: i64,
|
||||
workspace_id: String,
|
||||
depth: u32,
|
||||
) -> Result<FolderView, AppError> {
|
||||
let depth_limit = 10;
|
||||
if depth > depth_limit {
|
||||
return Err(AppError::InvalidRequest(format!(
|
||||
"Depth {} is too large (limit: {})",
|
||||
depth, depth_limit
|
||||
)));
|
||||
}
|
||||
let folder = get_latest_collab_folder(collab_storage, &uid, &workspace_id).await?;
|
||||
let folder_view: FolderView = collab_folder_to_folder_view(&folder, depth);
|
||||
Ok(folder_view)
|
||||
}
|
||||
|
||||
pub async fn get_latest_collab_folder(
|
||||
collab_storage: Arc<CollabAccessControlStorage>,
|
||||
uid: &i64,
|
||||
workspace_id: &str,
|
||||
) -> Result<Folder, AppError> {
|
||||
let encoded_collab = get_latest_collab_encoded(
|
||||
collab_storage,
|
||||
uid,
|
||||
workspace_id,
|
||||
workspace_id,
|
||||
CollabType::Folder,
|
||||
)
|
||||
.await?;
|
||||
let folder = Folder::from_collab_doc_state(
|
||||
uid,
|
||||
CollabOrigin::Server,
|
||||
encoded_collab.into(),
|
||||
workspace_id,
|
||||
vec![],
|
||||
)
|
||||
.map_err(|e| AppError::Unhandled(e.to_string()))?;
|
||||
Ok(folder)
|
||||
}
|
||||
|
||||
pub async fn get_latest_collab_encoded(
|
||||
collab_storage: Arc<CollabAccessControlStorage>,
|
||||
uid: &i64,
|
||||
workspace_id: &str,
|
||||
oid: &str,
|
||||
collab_type: CollabType,
|
||||
) -> Result<EncodedCollab, AppError> {
|
||||
collab_storage
|
||||
.get_encode_collab(
|
||||
GetCollabOrigin::User { uid: *uid },
|
||||
QueryCollabParams {
|
||||
workspace_id: workspace_id.to_string(),
|
||||
inner: QueryCollab {
|
||||
object_id: oid.to_string(),
|
||||
collab_type,
|
||||
},
|
||||
},
|
||||
true,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
pub mod access_control;
|
||||
pub mod ops;
|
||||
pub mod publish;
|
||||
pub mod publish_dup;
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use authentication::jwt::OptionalUserUuid;
|
||||
use database_entity::dto::{AFWorkspaceSettingsChange, PublishCollabItem};
|
||||
use database_entity::dto::AFWorkspaceSettingsChange;
|
||||
use database_entity::dto::PublishCollabItem;
|
||||
use database_entity::dto::PublishInfo;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use database_entity::dto::PublishInfo;
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -639,7 +640,7 @@ pub async fn update_workspace_settings(
|
|||
Ok(setting)
|
||||
}
|
||||
|
||||
async fn check_workspace_owner(
|
||||
pub async fn check_workspace_owner(
|
||||
pg_pool: &PgPool,
|
||||
user_uuid: &Uuid,
|
||||
workspace_id: &Uuid,
|
||||
|
|
156
src/biz/workspace/publish.rs
Normal file
156
src/biz/workspace/publish.rs
Normal file
|
@ -0,0 +1,156 @@
|
|||
use app_error::AppError;
|
||||
use database_entity::dto::{PublishCollabItem, PublishInfo};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use database::{
|
||||
publish::{
|
||||
delete_published_collabs, insert_or_replace_publish_collabs, select_publish_collab_meta,
|
||||
select_published_collab_blob, select_published_collab_info,
|
||||
select_user_is_collab_publisher_for_all_views, select_workspace_publish_namespace,
|
||||
select_workspace_publish_namespace_exists, update_workspace_publish_namespace,
|
||||
},
|
||||
workspace::select_user_is_workspace_owner,
|
||||
};
|
||||
|
||||
use super::ops::check_workspace_owner;
|
||||
|
||||
pub async fn publish_collabs(
|
||||
pg_pool: &PgPool,
|
||||
workspace_id: &Uuid,
|
||||
publisher_uuid: &Uuid,
|
||||
publish_items: Vec<PublishCollabItem<serde_json::Value, Vec<u8>>>,
|
||||
) -> Result<(), AppError> {
|
||||
for publish_item in &publish_items {
|
||||
check_collab_publish_name(publish_item.meta.publish_name.as_str())?;
|
||||
}
|
||||
insert_or_replace_publish_collabs(pg_pool, workspace_id, publisher_uuid, publish_items).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_published_collab(
|
||||
pg_pool: &PgPool,
|
||||
publish_namespace: &str,
|
||||
publish_name: &str,
|
||||
) -> Result<serde_json::Value, AppError> {
|
||||
let metadata = select_publish_collab_meta(pg_pool, publish_namespace, publish_name).await?;
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
pub async fn get_published_collab_blob(
|
||||
pg_pool: &PgPool,
|
||||
publish_namespace: &str,
|
||||
publish_name: &str,
|
||||
) -> Result<Vec<u8>, AppError> {
|
||||
select_published_collab_blob(pg_pool, publish_namespace, publish_name).await
|
||||
}
|
||||
|
||||
pub async fn get_published_collab_info(
|
||||
pg_pool: &PgPool,
|
||||
view_id: &Uuid,
|
||||
) -> Result<PublishInfo, AppError> {
|
||||
select_published_collab_info(pg_pool, view_id).await
|
||||
}
|
||||
|
||||
pub async fn delete_published_workspace_collab(
|
||||
pg_pool: &PgPool,
|
||||
workspace_id: &Uuid,
|
||||
view_ids: &[Uuid],
|
||||
user_uuid: &Uuid,
|
||||
) -> Result<(), AppError> {
|
||||
check_workspace_owner_or_publisher(pg_pool, user_uuid, workspace_id, view_ids).await?;
|
||||
delete_published_collabs(pg_pool, workspace_id, view_ids).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn check_workspace_owner_or_publisher(
|
||||
pg_pool: &PgPool,
|
||||
user_uuid: &Uuid,
|
||||
workspace_id: &Uuid,
|
||||
view_id: &[Uuid],
|
||||
) -> Result<(), AppError> {
|
||||
let is_owner = select_user_is_workspace_owner(pg_pool, user_uuid, workspace_id).await?;
|
||||
if !is_owner {
|
||||
let is_publisher =
|
||||
select_user_is_collab_publisher_for_all_views(pg_pool, user_uuid, workspace_id, view_id)
|
||||
.await?;
|
||||
if !is_publisher {
|
||||
return Err(AppError::UserUnAuthorized(
|
||||
"User is not the owner of the workspace or the publisher of the document".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_collab_publish_name(publish_name: &str) -> Result<(), AppError> {
|
||||
// Check len
|
||||
if publish_name.len() > 128 {
|
||||
return Err(AppError::InvalidRequest(
|
||||
"Publish name must be at most 128 characters long".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
// Only contain alphanumeric characters and hyphens
|
||||
for c in publish_name.chars() {
|
||||
if !c.is_alphanumeric() && c != '-' {
|
||||
return Err(AppError::InvalidRequest(
|
||||
"Publish name must only contain alphanumeric characters and hyphens".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_workspace_namespace(
|
||||
pg_pool: &PgPool,
|
||||
user_uuid: &Uuid,
|
||||
workspace_id: &Uuid,
|
||||
new_namespace: &str,
|
||||
) -> Result<(), AppError> {
|
||||
check_workspace_owner(pg_pool, user_uuid, workspace_id).await?;
|
||||
check_workspace_namespace(new_namespace).await?;
|
||||
if select_workspace_publish_namespace_exists(pg_pool, workspace_id, new_namespace).await? {
|
||||
return Err(AppError::PublishNamespaceAlreadyTaken(
|
||||
"publish namespace is already taken".to_string(),
|
||||
));
|
||||
};
|
||||
update_workspace_publish_namespace(pg_pool, workspace_id, new_namespace).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_workspace_publish_namespace(
|
||||
pg_pool: &PgPool,
|
||||
workspace_id: &Uuid,
|
||||
) -> Result<String, AppError> {
|
||||
select_workspace_publish_namespace(pg_pool, workspace_id).await
|
||||
}
|
||||
|
||||
async fn check_workspace_namespace(new_namespace: &str) -> Result<(), AppError> {
|
||||
// Check len
|
||||
if new_namespace.len() < 8 {
|
||||
return Err(AppError::InvalidRequest(
|
||||
"Namespace must be at least 8 characters long".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if new_namespace.len() > 64 {
|
||||
return Err(AppError::InvalidRequest(
|
||||
"Namespace must be at most 32 characters long".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
// Only contain alphanumeric characters and hyphens
|
||||
for c in new_namespace.chars() {
|
||||
if !c.is_alphanumeric() && c != '-' {
|
||||
return Err(AppError::InvalidRequest(
|
||||
"Namespace must only contain alphanumeric characters and hyphens".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: add more checks for reserved words
|
||||
|
||||
Ok(())
|
||||
}
|
959
src/biz/workspace/publish_dup.rs
Normal file
959
src/biz/workspace/publish_dup.rs
Normal file
|
@ -0,0 +1,959 @@
|
|||
use app_error::AppError;
|
||||
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
|
||||
use collab::core::collab::DataSource;
|
||||
use collab::preclude::Collab;
|
||||
|
||||
use collab::preclude::MapExt;
|
||||
use collab_database::views::ViewMap;
|
||||
use collab_database::workspace_database::WorkspaceDatabaseBody;
|
||||
use collab_document::blocks::DocumentData;
|
||||
use collab_document::document::Document;
|
||||
use collab_entity::CollabType;
|
||||
use collab_folder::{CollabOrigin, Folder, RepeatedViewIdentifier, View};
|
||||
use collab_rt_entity::{ClientCollabMessage, UpdateSync};
|
||||
use collab_rt_protocol::{Message, SyncMessage};
|
||||
use database::collab::{select_workspace_database_oid, CollabStorage};
|
||||
use database::publish::select_published_data_for_view_id;
|
||||
use database_entity::dto::CollabParams;
|
||||
use shared_entity::dto::publish_dto::{PublishDatabaseData, PublishViewInfo, PublishViewMetaData};
|
||||
use shared_entity::dto::workspace_dto;
|
||||
use shared_entity::dto::workspace_dto::ViewLayout;
|
||||
use sqlx::PgPool;
|
||||
use std::collections::HashSet;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use workspace_template::gen_view_id;
|
||||
use yrs::updates::encoder::Encode;
|
||||
use yrs::{Map, MapRef};
|
||||
|
||||
use crate::biz::collab::ops::get_latest_collab_encoded;
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn duplicate_published_collab_to_workspace(
|
||||
pg_pool: &PgPool,
|
||||
collab_storage: Arc<CollabAccessControlStorage>,
|
||||
dest_uid: i64,
|
||||
publish_view_id: String,
|
||||
dest_workspace_id: String,
|
||||
dest_view_id: String,
|
||||
) -> Result<(), AppError> {
|
||||
let copier = PublishCollabDuplicator::new(
|
||||
pg_pool.clone(),
|
||||
collab_storage.clone(),
|
||||
dest_uid,
|
||||
dest_workspace_id,
|
||||
dest_view_id,
|
||||
);
|
||||
|
||||
let time_now = chrono::Utc::now().timestamp_millis();
|
||||
copier.duplicate(&publish_view_id).await?;
|
||||
let elapsed = chrono::Utc::now().timestamp_millis() - time_now;
|
||||
tracing::info!(
|
||||
"duplicate_published_collab_to_workspace: elapsed time: {}ms",
|
||||
elapsed
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct PublishCollabDuplicator {
|
||||
/// for fetching and writing folder data
|
||||
/// of dest workspace
|
||||
collab_storage: Arc<CollabAccessControlStorage>,
|
||||
/// A map to store the old view_id that was duplicated and new view_id assigned.
|
||||
/// If value is none, it means the view_id is not published.
|
||||
duplicated_refs: HashMap<String, Option<String>>,
|
||||
/// published_database_id -> view_id
|
||||
duplicated_db_main_view: HashMap<String, String>,
|
||||
/// published_database_view_id -> new_view_id
|
||||
duplicated_db_view: HashMap<String, String>,
|
||||
/// new views to be added to the folder
|
||||
/// view_id -> view
|
||||
views_to_add: HashMap<String, View>,
|
||||
/// A list of database linked views to be added to workspace database
|
||||
workspace_databases: HashMap<String, Vec<String>>,
|
||||
/// A list of collab objects to added to the workspace (oid -> collab)
|
||||
collabs_to_insert: HashMap<String, (CollabType, Vec<u8>)>,
|
||||
/// time of duplication
|
||||
ts_now: i64,
|
||||
/// for fetching published data
|
||||
/// and writing them to dest workspace
|
||||
pg_pool: PgPool,
|
||||
/// user initiating the duplication
|
||||
duplicator_uid: i64,
|
||||
/// workspace to duplicate into
|
||||
dest_workspace_id: String,
|
||||
/// view of workspace to duplicate into
|
||||
dest_view_id: String,
|
||||
}
|
||||
|
||||
impl PublishCollabDuplicator {
|
||||
pub fn new(
|
||||
pg_pool: PgPool,
|
||||
collab_storage: Arc<CollabAccessControlStorage>,
|
||||
dest_uid: i64,
|
||||
dest_workspace_id: String,
|
||||
dest_view_id: String,
|
||||
) -> Self {
|
||||
let ts_now = chrono::Utc::now().timestamp();
|
||||
Self {
|
||||
ts_now,
|
||||
duplicated_refs: HashMap::new(),
|
||||
views_to_add: HashMap::new(),
|
||||
workspace_databases: HashMap::new(),
|
||||
collabs_to_insert: HashMap::new(),
|
||||
duplicated_db_main_view: HashMap::new(),
|
||||
duplicated_db_view: HashMap::new(),
|
||||
|
||||
pg_pool,
|
||||
collab_storage,
|
||||
duplicator_uid: dest_uid,
|
||||
dest_workspace_id,
|
||||
dest_view_id,
|
||||
}
|
||||
}
|
||||
|
||||
async fn duplicate(mut self, publish_view_id: &str) -> Result<(), AppError> {
|
||||
// new view after deep copy
|
||||
// this is the root of the document/database duplicated
|
||||
let mut root_view = match self.deep_copy(gen_view_id(), publish_view_id).await? {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
return Err(AppError::RecordNotFound(
|
||||
"view not found, it might be unpublished".to_string(),
|
||||
))
|
||||
},
|
||||
};
|
||||
root_view.parent_view_id.clone_from(&self.dest_view_id);
|
||||
|
||||
// destructuring self to own inner values, avoids cloning
|
||||
let PublishCollabDuplicator {
|
||||
collab_storage,
|
||||
duplicated_refs: _,
|
||||
duplicated_db_main_view: _,
|
||||
duplicated_db_view: _,
|
||||
mut views_to_add,
|
||||
workspace_databases,
|
||||
collabs_to_insert,
|
||||
ts_now: _,
|
||||
pg_pool,
|
||||
duplicator_uid,
|
||||
dest_workspace_id,
|
||||
dest_view_id: _,
|
||||
} = self;
|
||||
|
||||
// insert all collab object accumulated
|
||||
// for self.collabs_to_insert
|
||||
let mut txn = pg_pool.begin().await?;
|
||||
for (oid, (collab_type, encoded_collab)) in collabs_to_insert.into_iter() {
|
||||
collab_storage
|
||||
.insert_new_collab_with_transaction(
|
||||
&dest_workspace_id,
|
||||
&duplicator_uid,
|
||||
CollabParams {
|
||||
object_id: oid.clone(),
|
||||
encoded_collab_v1: encoded_collab,
|
||||
collab_type,
|
||||
embeddings: None,
|
||||
},
|
||||
&mut txn,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// update database if any
|
||||
if !workspace_databases.is_empty() {
|
||||
let ws_db_oid = select_workspace_database_oid(&pg_pool, &dest_workspace_id.parse()?).await?;
|
||||
let mut ws_db_collab = {
|
||||
let ws_database_ec = get_latest_collab_encoded(
|
||||
collab_storage.clone(),
|
||||
&duplicator_uid,
|
||||
&dest_workspace_id,
|
||||
&ws_db_oid,
|
||||
CollabType::WorkspaceDatabase,
|
||||
)
|
||||
.await?;
|
||||
collab_from_doc_state(ws_database_ec.doc_state.to_vec(), &ws_db_oid)?
|
||||
};
|
||||
|
||||
let ws_db_body = WorkspaceDatabaseBody::new(&mut ws_db_collab);
|
||||
|
||||
let (ws_db_updates, updated_ws_w_db_collab) = tokio::task::spawn_blocking(move || {
|
||||
let ws_db_updates = {
|
||||
let mut txn_wrapper = ws_db_collab.transact_mut();
|
||||
for (db_collab_id, linked_views) in &workspace_databases {
|
||||
ws_db_body.add_database(&mut txn_wrapper, db_collab_id, linked_views.clone());
|
||||
}
|
||||
txn_wrapper.encode_update_v1()
|
||||
};
|
||||
let updated_ws_w_db_collab = collab_to_bin(&ws_db_collab, CollabType::WorkspaceDatabase);
|
||||
(ws_db_updates, updated_ws_w_db_collab)
|
||||
})
|
||||
.await?;
|
||||
|
||||
collab_storage
|
||||
.insert_new_collab_with_transaction(
|
||||
&dest_workspace_id,
|
||||
&duplicator_uid,
|
||||
CollabParams {
|
||||
object_id: ws_db_oid.clone(),
|
||||
encoded_collab_v1: updated_ws_w_db_collab?,
|
||||
collab_type: CollabType::WorkspaceDatabase,
|
||||
embeddings: None,
|
||||
},
|
||||
&mut txn,
|
||||
)
|
||||
.await?;
|
||||
broadcast_update(&collab_storage, &ws_db_oid, ws_db_updates).await?;
|
||||
}
|
||||
|
||||
let collab_folder_encoded = get_latest_collab_encoded(
|
||||
collab_storage.clone(),
|
||||
&duplicator_uid,
|
||||
&dest_workspace_id,
|
||||
&dest_workspace_id,
|
||||
CollabType::Folder,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let cloned_dest_workspace_id = dest_workspace_id.clone();
|
||||
let mut folder = tokio::task::spawn_blocking(move || {
|
||||
Folder::from_collab_doc_state(
|
||||
duplicator_uid,
|
||||
CollabOrigin::Server,
|
||||
collab_folder_encoded.into(),
|
||||
&cloned_dest_workspace_id,
|
||||
vec![],
|
||||
)
|
||||
.map_err(|e| AppError::Unhandled(e.to_string()))
|
||||
})
|
||||
.await??;
|
||||
|
||||
let (encoded_update, updated_encoded_collab) = tokio::task::spawn_blocking(move || {
|
||||
let encoded_update = {
|
||||
let mut folder_txn = folder.collab.transact_mut();
|
||||
|
||||
let mut duplicated_view_ids = HashSet::new();
|
||||
duplicated_view_ids.insert(root_view.id.clone());
|
||||
folder.body.views.insert(&mut folder_txn, root_view, None);
|
||||
|
||||
// when child views are added, it must have a parent view that is previously added
|
||||
// TODO: if there are too many child views, consider using topological sort
|
||||
loop {
|
||||
if views_to_add.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut inserted = vec![];
|
||||
for (view_id, view) in views_to_add.iter() {
|
||||
if duplicated_view_ids.contains(&view.parent_view_id) {
|
||||
folder
|
||||
.body
|
||||
.views
|
||||
.insert(&mut folder_txn, view.clone(), None);
|
||||
duplicated_view_ids.insert(view_id.clone());
|
||||
inserted.push(view_id.clone());
|
||||
}
|
||||
}
|
||||
if inserted.is_empty() {
|
||||
tracing::error!(
|
||||
"views not inserted because parent_id does not exists: {:?}",
|
||||
views_to_add.keys()
|
||||
);
|
||||
break;
|
||||
}
|
||||
for view_id in inserted {
|
||||
views_to_add.remove(&view_id);
|
||||
}
|
||||
}
|
||||
|
||||
folder_txn.encode_update_v1()
|
||||
};
|
||||
|
||||
// update folder collab
|
||||
let updated_encoded_collab = collab_to_bin(&folder.collab, CollabType::Folder);
|
||||
(encoded_update, updated_encoded_collab)
|
||||
})
|
||||
.await?;
|
||||
|
||||
collab_storage
|
||||
.insert_new_collab_with_transaction(
|
||||
&dest_workspace_id,
|
||||
&duplicator_uid,
|
||||
CollabParams {
|
||||
object_id: dest_workspace_id.clone(),
|
||||
encoded_collab_v1: updated_encoded_collab?,
|
||||
collab_type: CollabType::Folder,
|
||||
embeddings: None,
|
||||
},
|
||||
&mut txn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// broadcast folder changes
|
||||
broadcast_update(&collab_storage, &dest_workspace_id, encoded_update).await?;
|
||||
|
||||
txn.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deep copy a published collab to the destination workspace.
|
||||
/// If None is returned, it means the view is not published.
|
||||
/// If Some is returned, a new view is created but without parent_view_id set.
|
||||
/// Caller should set the parent_view_id to the parent view.
|
||||
async fn deep_copy(
|
||||
&mut self,
|
||||
new_view_id: String,
|
||||
publish_view_id: &str,
|
||||
) -> Result<Option<View>, AppError> {
|
||||
tracing::info!(
|
||||
"deep_copy: new_view_id: {}, publish_view_id: {}",
|
||||
new_view_id,
|
||||
publish_view_id,
|
||||
);
|
||||
|
||||
// attempt to get metadata and doc_state for published view
|
||||
let (metadata, published_blob) = match self
|
||||
.get_published_data_for_view_id(&publish_view_id.parse()?)
|
||||
.await?
|
||||
{
|
||||
Some(published_data) => published_data,
|
||||
None => {
|
||||
tracing::warn!(
|
||||
"No published collab data found for view_id: {}",
|
||||
publish_view_id
|
||||
);
|
||||
return Ok(None);
|
||||
},
|
||||
};
|
||||
|
||||
// at this stage, we know that the view is published,
|
||||
// so we insert this knowledge into the duplicated_refs
|
||||
self
|
||||
.duplicated_refs
|
||||
.insert(publish_view_id.to_string(), new_view_id.clone().into());
|
||||
|
||||
match metadata.view.layout {
|
||||
ViewLayout::Document => {
|
||||
let doc_collab = collab_from_doc_state(published_blob, "")?;
|
||||
let doc = Document::open(doc_collab).map_err(|e| AppError::Unhandled(e.to_string()))?;
|
||||
let new_doc_view = self.deep_copy_doc(new_view_id, doc, metadata).await?;
|
||||
Ok(Some(new_doc_view))
|
||||
},
|
||||
ViewLayout::Grid | ViewLayout::Board | ViewLayout::Calendar => {
|
||||
let pub_view_id = metadata.view.view_id.clone();
|
||||
let db_payload = serde_json::from_slice::<PublishDatabaseData>(&published_blob)?;
|
||||
let new_db_view = self
|
||||
.deep_copy_database_view(new_view_id, db_payload, &metadata, &pub_view_id)
|
||||
.await?;
|
||||
Ok(Some(new_db_view))
|
||||
},
|
||||
t => {
|
||||
tracing::warn!("collab type not supported: {:?}", t);
|
||||
Ok(None)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn deep_copy_doc<'a>(
|
||||
&mut self,
|
||||
new_view_id: String,
|
||||
doc: Document,
|
||||
metadata: PublishViewMetaData,
|
||||
) -> Result<View, AppError> {
|
||||
let mut ret_view =
|
||||
self.new_folder_view(new_view_id.clone(), &metadata.view, ViewLayout::Document);
|
||||
|
||||
let mut doc_data = doc
|
||||
.get_document_data()
|
||||
.map_err(|e| AppError::Unhandled(e.to_string()))?;
|
||||
|
||||
if let Err(err) = self.deep_copy_doc_pages(&mut doc_data, &mut ret_view).await {
|
||||
tracing::error!("failed to deep copy doc pages: {}", err);
|
||||
}
|
||||
|
||||
if let Err(err) = self
|
||||
.deep_copy_doc_databases(&mut doc_data, &mut ret_view)
|
||||
.await
|
||||
{
|
||||
tracing::error!("failed to deep copy doc databases: {}", err);
|
||||
};
|
||||
|
||||
{
|
||||
// write modified doc_data back to storage
|
||||
let empty_collab = collab_from_doc_state(vec![], &new_view_id)?;
|
||||
let new_doc = tokio::task::spawn_blocking(move || {
|
||||
Document::open_with(empty_collab, Some(doc_data))
|
||||
.map_err(|e| AppError::Unhandled(e.to_string()))
|
||||
})
|
||||
.await??;
|
||||
let new_doc_bin = tokio::task::spawn_blocking(move || {
|
||||
new_doc
|
||||
.encode_collab()
|
||||
.map_err(|e| AppError::Unhandled(e.to_string()))
|
||||
.map(|ec| ec.encode_to_bytes())
|
||||
})
|
||||
.await?;
|
||||
|
||||
self
|
||||
.collabs_to_insert
|
||||
.insert(ret_view.id.clone(), (CollabType::Document, new_doc_bin??));
|
||||
}
|
||||
Ok(ret_view)
|
||||
}
|
||||
|
||||
async fn deep_copy_doc_pages(
|
||||
&mut self,
|
||||
doc_data: &mut DocumentData,
|
||||
ret_view: &mut View,
|
||||
) -> Result<(), AppError> {
|
||||
if let Some(text_map) = doc_data.meta.text_map.as_mut() {
|
||||
for (_key, value) in text_map.iter_mut() {
|
||||
let mut js_val = match serde_json::from_str::<serde_json::Value>(value) {
|
||||
Ok(js_val) => js_val,
|
||||
Err(e) => {
|
||||
tracing::error!("failed to parse text_map value({}): {}", value, e);
|
||||
continue;
|
||||
},
|
||||
};
|
||||
let js_array = match js_val.as_array_mut() {
|
||||
Some(js_array) => js_array,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let page_ids = js_array
|
||||
.iter_mut()
|
||||
.flat_map(|js_val| js_val.get_mut("attributes"))
|
||||
.flat_map(|attributes| attributes.get_mut("mention"))
|
||||
.filter(|mention| {
|
||||
mention.get("type").map_or(false, |type_| {
|
||||
type_.as_str().map_or(false, |type_| type_ == "page")
|
||||
})
|
||||
})
|
||||
.flat_map(|mention| mention.get_mut("page_id"));
|
||||
|
||||
for page_id in page_ids {
|
||||
let page_id_str = match page_id.as_str() {
|
||||
Some(page_id_str) => page_id_str,
|
||||
None => continue,
|
||||
};
|
||||
if let Some(new_page_id) = self.deep_copy_view(page_id_str, &ret_view.id).await? {
|
||||
*page_id = serde_json::json!(new_page_id);
|
||||
} else {
|
||||
tracing::warn!("deep_copy_doc_pages: view not found: {}", page_id_str);
|
||||
};
|
||||
}
|
||||
|
||||
*value = js_val.to_string();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// attempts to deep copy a view using `view_id`. returns a new_view_id of the duplicated view.
|
||||
/// if view is already duplicated, returns duplicated view's view_id (parent_view_id is not set
|
||||
/// from param `parent_view_id`)
|
||||
async fn deep_copy_view(
|
||||
&mut self,
|
||||
view_id: &str,
|
||||
parent_view_id: &String,
|
||||
) -> Result<Option<String>, AppError> {
|
||||
match self.duplicated_refs.get(view_id) {
|
||||
Some(new_view_id) => {
|
||||
if let Some(vid) = new_view_id {
|
||||
Ok(Some(vid.clone()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
},
|
||||
None => {
|
||||
// Call deep_copy and await the result
|
||||
if let Some(mut new_view) = Box::pin(self.deep_copy(gen_view_id(), view_id)).await? {
|
||||
if new_view.parent_view_id.is_empty() {
|
||||
new_view.parent_view_id.clone_from(parent_view_id);
|
||||
}
|
||||
self
|
||||
.duplicated_refs
|
||||
.insert(view_id.to_string(), Some(new_view.id.clone()));
|
||||
let ret_view_id = new_view.id.clone();
|
||||
self.views_to_add.insert(new_view.id.clone(), new_view);
|
||||
Ok(Some(ret_view_id))
|
||||
} else {
|
||||
tracing::warn!("view not found in deep_copy: {}", view_id);
|
||||
self.duplicated_refs.insert(view_id.to_string(), None);
|
||||
Ok(None)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn deep_copy_doc_databases(
|
||||
&mut self,
|
||||
doc_data: &mut DocumentData,
|
||||
ret_view: &mut View,
|
||||
) -> Result<(), AppError> {
|
||||
let db_blocks = doc_data
|
||||
.blocks
|
||||
.iter_mut()
|
||||
.filter(|(_, b)| b.ty == "grid" || b.ty == "board" || b.ty == "calendar");
|
||||
|
||||
for (block_id, block) in db_blocks {
|
||||
tracing::info!("deep_copy_doc_databases: block_id: {}", block_id);
|
||||
let block_view_id = block
|
||||
.data
|
||||
.get("view_id")
|
||||
.ok_or_else(|| AppError::RecordNotFound("view_id not found in block data".to_string()))?
|
||||
.as_str()
|
||||
.ok_or_else(|| AppError::RecordNotFound("view_id not a string".to_string()))?;
|
||||
|
||||
let block_parent_id = block
|
||||
.data
|
||||
.get("parent_id")
|
||||
.ok_or_else(|| AppError::RecordNotFound("view_id not found in block data".to_string()))?
|
||||
.as_str()
|
||||
.ok_or_else(|| AppError::RecordNotFound("view_id not a string".to_string()))?;
|
||||
|
||||
if let Some((new_view_id, new_parent_id)) = self
|
||||
.deep_copy_database_inline_doc(block_view_id, block_parent_id, &ret_view.id)
|
||||
.await?
|
||||
{
|
||||
block.data.insert(
|
||||
"view_id".to_string(),
|
||||
serde_json::Value::String(new_view_id),
|
||||
);
|
||||
block.data.insert(
|
||||
"parent_id".to_string(),
|
||||
serde_json::Value::String(new_parent_id),
|
||||
);
|
||||
} else {
|
||||
tracing::warn!("deep_copy_doc_databases: view not found: {}", block_view_id);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// deep copy database for doc
|
||||
/// returns new (view_id, parent_id)
|
||||
async fn deep_copy_database_inline_doc<'a>(
|
||||
&mut self,
|
||||
view_id: &str,
|
||||
parent_id: &str,
|
||||
doc_view_id: &String,
|
||||
) -> Result<Option<(String, String)>, AppError> {
|
||||
let (metadata, published_blob) = match self
|
||||
.get_published_data_for_view_id(&view_id.parse()?)
|
||||
.await?
|
||||
{
|
||||
Some(published_data) => published_data,
|
||||
None => {
|
||||
tracing::warn!("No published collab data found for view_id: {}", view_id);
|
||||
return Ok(None);
|
||||
},
|
||||
};
|
||||
|
||||
let published_db = serde_json::from_slice::<PublishDatabaseData>(&published_blob)?;
|
||||
let mut parent_view = self
|
||||
.deep_copy_database_view(gen_view_id(), published_db, &metadata, parent_id)
|
||||
.await?;
|
||||
let parent_view_id = parent_view.id.clone();
|
||||
if parent_view.parent_view_id.is_empty() {
|
||||
parent_view.parent_view_id.clone_from(doc_view_id);
|
||||
self
|
||||
.views_to_add
|
||||
.insert(parent_view.id.clone(), parent_view);
|
||||
}
|
||||
let duplicated_view_id = match self.duplicated_db_view.get(view_id) {
|
||||
Some(v) => v.clone(),
|
||||
None => {
|
||||
let view_info_by_id = view_info_by_view_id(&metadata);
|
||||
let view_info = view_info_by_id.get(view_id).ok_or_else(|| {
|
||||
AppError::RecordNotFound(format!("metadata not found for view: {}", view_id))
|
||||
})?;
|
||||
let mut new_folder_db_view =
|
||||
self.new_folder_view(view_id.to_string(), view_info, view_info.layout.clone());
|
||||
new_folder_db_view.parent_view_id = parent_view_id.clone();
|
||||
let new_folder_db_view_id = new_folder_db_view.id.clone();
|
||||
self
|
||||
.views_to_add
|
||||
.insert(new_folder_db_view.id.clone(), new_folder_db_view);
|
||||
new_folder_db_view_id
|
||||
},
|
||||
};
|
||||
Ok(Some((duplicated_view_id, parent_view_id)))
|
||||
}
|
||||
|
||||
/// Deep copy a published database (does not create folder views)
|
||||
/// checks if database is already published
|
||||
/// attempts to use `new_view_id` for `published_view_id` if not already published
|
||||
/// stores all view_id references in `duplicated_refs`
|
||||
/// returns (published_db_id, new_db_id, is_already_duplicated)
|
||||
async fn deep_copy_database<'a>(
|
||||
&mut self,
|
||||
published_db: &PublishDatabaseData,
|
||||
publish_view_id: &str,
|
||||
new_view_id: String,
|
||||
) -> Result<(String, String, bool), AppError> {
|
||||
// collab of database
|
||||
let mut db_collab = collab_from_doc_state(published_db.database_collab.clone(), "")?;
|
||||
let pub_db_id = get_database_id_from_collab(&db_collab)?;
|
||||
|
||||
// check if the database is already duplicated
|
||||
if let Some(db_id) = self.duplicated_refs.get(&pub_db_id).cloned().flatten() {
|
||||
return Ok((pub_db_id, db_id, true));
|
||||
}
|
||||
|
||||
let new_db_id = gen_view_id();
|
||||
self
|
||||
.duplicated_refs
|
||||
.insert(pub_db_id.clone(), Some(new_db_id.clone()));
|
||||
|
||||
// duplicate db collab rows
|
||||
for (old_id, row_bin_data) in &published_db.database_row_collabs {
|
||||
// assign a new id for the row
|
||||
let new_row_id = gen_view_id();
|
||||
let mut db_row_collab = collab_from_doc_state(row_bin_data.clone(), &new_row_id)?;
|
||||
|
||||
{
|
||||
// update database_id and row_id in data
|
||||
let mut txn = db_row_collab.context.transact_mut();
|
||||
let data = db_row_collab
|
||||
.data
|
||||
.get(&txn, "data")
|
||||
.ok_or_else(|| {
|
||||
AppError::RecordNotFound("no data found in database row collab".to_string())
|
||||
})?
|
||||
.cast::<MapRef>()
|
||||
.map_err(|err| AppError::Unhandled(format!("data not map: {:?}", err)))?;
|
||||
data.insert(&mut txn, "id", new_row_id.clone());
|
||||
data.insert(&mut txn, "database_id", new_db_id.clone());
|
||||
}
|
||||
|
||||
// write new row collab to storage
|
||||
let db_row_ec_bytes =
|
||||
tokio::task::spawn_blocking(move || collab_to_bin(&db_row_collab, CollabType::DatabaseRow))
|
||||
.await?;
|
||||
self.collabs_to_insert.insert(
|
||||
new_row_id.clone(),
|
||||
(CollabType::DatabaseRow, db_row_ec_bytes?),
|
||||
);
|
||||
self
|
||||
.duplicated_refs
|
||||
.insert(old_id.clone(), Some(new_row_id));
|
||||
}
|
||||
|
||||
// accumulate list of database views (Board, Cal, ...) to be linked to the database
|
||||
let mut new_db_view_ids: Vec<String> = vec![];
|
||||
{
|
||||
let mut txn = db_collab.context.transact_mut();
|
||||
let container = db_collab
|
||||
.data
|
||||
.get(&txn, "database")
|
||||
.ok_or_else(|| AppError::RecordNotFound("no database found in collab".to_string()))?
|
||||
.cast::<MapRef>()
|
||||
.map_err(|err| AppError::Unhandled(format!("not a map: {:?}", err)))?;
|
||||
container.insert(&mut txn, "id", new_db_id.clone());
|
||||
|
||||
let view_map = {
|
||||
let map_ref = db_collab
|
||||
.data
|
||||
.get_with_path(&txn, ["database", "views"])
|
||||
.ok_or_else(|| AppError::RecordNotFound("no views found in database".to_string()))?;
|
||||
ViewMap::new(map_ref, tokio::sync::broadcast::channel(1).0)
|
||||
};
|
||||
|
||||
// create new database views based on published views
|
||||
let mut db_views = view_map.get_all_views(&txn);
|
||||
|
||||
for db_view in db_views.iter_mut() {
|
||||
let new_view_id = if db_view.id == publish_view_id {
|
||||
self
|
||||
.duplicated_db_main_view
|
||||
.insert(pub_db_id.clone(), new_view_id.clone());
|
||||
new_view_id.clone()
|
||||
} else {
|
||||
gen_view_id()
|
||||
};
|
||||
self
|
||||
.duplicated_db_view
|
||||
.insert(db_view.id.clone(), new_view_id.clone());
|
||||
|
||||
db_view.id.clone_from(&new_view_id);
|
||||
db_view.database_id.clone_from(&new_db_id);
|
||||
new_db_view_ids.push(db_view.id.clone());
|
||||
|
||||
// update all views's row's id
|
||||
for row_order in db_view.row_orders.iter_mut() {
|
||||
if let Some(new_id) = self
|
||||
.duplicated_refs
|
||||
.get(row_order.id.as_str())
|
||||
.cloned()
|
||||
.flatten()
|
||||
{
|
||||
row_order.id = new_id.into();
|
||||
} else {
|
||||
// skip if row not found
|
||||
tracing::warn!("row not found: {}", row_order.id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// insert updated views back to db
|
||||
view_map.clear(&mut txn);
|
||||
for view in db_views {
|
||||
view_map.insert_view(&mut txn, view);
|
||||
}
|
||||
}
|
||||
|
||||
// write database collab to storage
|
||||
let db_encoded_collab =
|
||||
tokio::task::spawn_blocking(move || collab_to_bin(&db_collab, CollabType::Database)).await?;
|
||||
self.collabs_to_insert.insert(
|
||||
new_db_id.clone(),
|
||||
(CollabType::Database, db_encoded_collab?),
|
||||
);
|
||||
|
||||
// Add this database as linked view
|
||||
self
|
||||
.workspace_databases
|
||||
.insert(new_db_id.clone(), new_db_view_ids);
|
||||
|
||||
Ok((pub_db_id, new_db_id, false))
|
||||
}
|
||||
|
||||
/// Deep copy a published database to the destination workspace.
|
||||
/// Returns the Folder view for main view (`new_view_id`) and map from old to new view_id.
|
||||
/// If the database is already duplicated before, does not return the view with `new_view_id`
|
||||
async fn deep_copy_database_view<'a>(
|
||||
&mut self,
|
||||
new_view_id: String,
|
||||
published_db: PublishDatabaseData,
|
||||
metadata: &PublishViewMetaData,
|
||||
pub_view_id: &str,
|
||||
) -> Result<View, AppError> {
|
||||
// flatten nested view info into a map
|
||||
let view_info_by_id = view_info_by_view_id(metadata);
|
||||
|
||||
let (pub_db_id, _dup_db_id, db_alr_duplicated) = self
|
||||
.deep_copy_database(&published_db, pub_view_id, new_view_id)
|
||||
.await?;
|
||||
|
||||
if db_alr_duplicated {
|
||||
let duplicated_view_id = self
|
||||
.duplicated_db_view
|
||||
.get(pub_view_id)
|
||||
.cloned()
|
||||
.ok_or_else(|| AppError::RecordNotFound(format!("view not found: {}", pub_view_id)))?;
|
||||
|
||||
// db_view_id found but may not have been created due to visibility
|
||||
match self.views_to_add.get(&duplicated_view_id) {
|
||||
Some(v) => return Ok(v.clone()),
|
||||
None => {
|
||||
let main_view_id = self
|
||||
.duplicated_db_main_view
|
||||
.get(pub_db_id.as_str())
|
||||
.ok_or_else(|| {
|
||||
AppError::RecordNotFound(format!("main view not found: {}", pub_view_id))
|
||||
})?;
|
||||
|
||||
let view_info = view_info_by_id.get(main_view_id).ok_or_else(|| {
|
||||
AppError::RecordNotFound(format!("metadata not found for view: {}", main_view_id))
|
||||
})?;
|
||||
|
||||
let mut view =
|
||||
self.new_folder_view(duplicated_view_id, view_info, view_info.layout.clone());
|
||||
view.parent_view_id.clone_from(main_view_id);
|
||||
return Ok(view);
|
||||
},
|
||||
};
|
||||
} else {
|
||||
tracing::warn!("database not duplicated: {}", pub_view_id);
|
||||
}
|
||||
|
||||
// create a new view to be returned to the caller
|
||||
// view_id is the main view of the database
|
||||
// create the main view
|
||||
let main_view_id = self
|
||||
.duplicated_db_main_view
|
||||
.get(pub_db_id.as_str())
|
||||
.ok_or_else(|| AppError::RecordNotFound(format!("main view not found: {}", pub_view_id)))?;
|
||||
|
||||
let main_view_info = view_info_by_id.get(pub_view_id).ok_or_else(|| {
|
||||
AppError::RecordNotFound(format!("metadata not found for view: {}", pub_view_id))
|
||||
})?;
|
||||
let main_folder_view = self.new_folder_view(
|
||||
main_view_id.clone(),
|
||||
main_view_info,
|
||||
main_view_info.layout.clone(),
|
||||
);
|
||||
|
||||
// create other visible view which are child to the main view
|
||||
for vis_view_id in published_db.visible_database_view_ids {
|
||||
if vis_view_id == pub_view_id {
|
||||
// skip main view
|
||||
continue;
|
||||
}
|
||||
|
||||
let child_view_id = self
|
||||
.duplicated_db_view
|
||||
.get(&vis_view_id)
|
||||
.ok_or_else(|| AppError::RecordNotFound(format!("view not found: {}", vis_view_id)))?;
|
||||
|
||||
let child_view_info = view_info_by_id.get(&vis_view_id).ok_or_else(|| {
|
||||
AppError::RecordNotFound(format!("metadata not found for view: {}", vis_view_id))
|
||||
})?;
|
||||
|
||||
let mut child_folder_view = self.new_folder_view(
|
||||
child_view_id.clone(),
|
||||
view_info_by_id.get(&vis_view_id).ok_or_else(|| {
|
||||
AppError::RecordNotFound(format!("metadata not found for view: {}", vis_view_id))
|
||||
})?,
|
||||
child_view_info.layout.clone(),
|
||||
);
|
||||
child_folder_view.parent_view_id.clone_from(main_view_id);
|
||||
self
|
||||
.views_to_add
|
||||
.insert(child_folder_view.id.clone(), child_folder_view);
|
||||
}
|
||||
|
||||
Ok(main_folder_view)
|
||||
}
|
||||
|
||||
/// ceates a new folder view without parent_view_id set
|
||||
fn new_folder_view(
|
||||
&self,
|
||||
new_view_id: String,
|
||||
view_info: &PublishViewInfo,
|
||||
layout: ViewLayout,
|
||||
) -> View {
|
||||
View {
|
||||
id: new_view_id,
|
||||
parent_view_id: "".to_string(), // to be filled by caller
|
||||
name: view_info.name.clone(),
|
||||
desc: "".to_string(), // unable to get from metadata
|
||||
children: RepeatedViewIdentifier { items: vec![] }, // fill in while iterating children
|
||||
created_at: self.ts_now,
|
||||
is_favorite: false,
|
||||
layout: to_folder_view_layout(layout),
|
||||
icon: view_info.icon.clone().map(to_folder_view_icon),
|
||||
created_by: Some(self.duplicator_uid),
|
||||
last_edited_time: self.ts_now,
|
||||
last_edited_by: Some(self.duplicator_uid),
|
||||
extra: view_info.extra.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_published_data_for_view_id(
|
||||
&self,
|
||||
view_id: &uuid::Uuid,
|
||||
) -> Result<Option<(PublishViewMetaData, Vec<u8>)>, AppError> {
|
||||
match select_published_data_for_view_id(&self.pg_pool, view_id).await? {
|
||||
Some((js_val, blob)) => {
|
||||
let metadata = serde_json::from_value(js_val)?;
|
||||
Ok(Some((metadata, blob)))
|
||||
},
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// broadcast updates to collab group if exists
|
||||
async fn broadcast_update(
|
||||
collab_storage: &Arc<CollabAccessControlStorage>,
|
||||
oid: &str,
|
||||
encoded_update: Vec<u8>,
|
||||
) -> Result<(), AppError> {
|
||||
tracing::info!("broadcasting update to group: {}", oid);
|
||||
|
||||
let payload = Message::Sync(SyncMessage::Update(encoded_update)).encode_v1();
|
||||
let msg = ClientCollabMessage::ClientUpdateSync {
|
||||
data: UpdateSync {
|
||||
origin: CollabOrigin::Server,
|
||||
object_id: oid.to_string(),
|
||||
msg_id: chrono::Utc::now().timestamp_millis() as u64,
|
||||
payload: payload.into(),
|
||||
},
|
||||
};
|
||||
|
||||
collab_storage
|
||||
.broadcast_encode_collab(oid.to_string(), vec![msg])
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn view_info_by_view_id(meta: &PublishViewMetaData) -> HashMap<String, PublishViewInfo> {
|
||||
let mut acc = HashMap::new();
|
||||
acc.insert(meta.view.view_id.clone(), meta.view.clone());
|
||||
add_to_view_info(&mut acc, &meta.child_views);
|
||||
add_to_view_info(&mut acc, &meta.ancestor_views);
|
||||
acc
|
||||
}
|
||||
|
||||
fn add_to_view_info(acc: &mut HashMap<String, PublishViewInfo>, view_infos: &[PublishViewInfo]) {
|
||||
for view_info in view_infos {
|
||||
acc.insert(view_info.view_id.clone(), view_info.clone());
|
||||
if let Some(child_views) = &view_info.child_views {
|
||||
add_to_view_info(acc, child_views);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn collab_from_doc_state(doc_state: Vec<u8>, object_id: &str) -> Result<Collab, AppError> {
|
||||
let collab = Collab::new_with_source(
|
||||
CollabOrigin::Server,
|
||||
object_id,
|
||||
DataSource::DocStateV1(doc_state),
|
||||
vec![],
|
||||
false,
|
||||
)
|
||||
.map_err(|e| AppError::Unhandled(e.to_string()))?;
|
||||
Ok(collab)
|
||||
}
|
||||
|
||||
pub fn get_database_id_from_collab(db_collab: &Collab) -> Result<String, AppError> {
|
||||
let txn = db_collab.context.transact();
|
||||
let db_map = db_collab
|
||||
.get_with_txn(&txn, "database")
|
||||
.ok_or_else(|| AppError::RecordNotFound("no database found in database collab".to_string()))?
|
||||
.cast::<MapRef>()
|
||||
.map_err(|err| AppError::RecordNotFound(format!("database not a map: {:?}", err)))?;
|
||||
let db_id = db_map
|
||||
.get(&txn, "id")
|
||||
.ok_or_else(|| AppError::RecordNotFound("no id found in database".to_string()))?
|
||||
.to_string(&txn);
|
||||
Ok(db_id)
|
||||
}
|
||||
|
||||
fn to_folder_view_icon(icon: workspace_dto::ViewIcon) -> collab_folder::ViewIcon {
|
||||
collab_folder::ViewIcon {
|
||||
ty: to_folder_view_icon_type(icon.ty),
|
||||
value: icon.value,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_folder_view_icon_type(icon: workspace_dto::IconType) -> collab_folder::IconType {
|
||||
match icon {
|
||||
workspace_dto::IconType::Emoji => collab_folder::IconType::Emoji,
|
||||
workspace_dto::IconType::Url => collab_folder::IconType::Url,
|
||||
workspace_dto::IconType::Icon => collab_folder::IconType::Icon,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_folder_view_layout(layout: workspace_dto::ViewLayout) -> collab_folder::ViewLayout {
|
||||
match layout {
|
||||
ViewLayout::Document => collab_folder::ViewLayout::Document,
|
||||
ViewLayout::Grid => collab_folder::ViewLayout::Grid,
|
||||
ViewLayout::Board => collab_folder::ViewLayout::Board,
|
||||
ViewLayout::Calendar => collab_folder::ViewLayout::Calendar,
|
||||
ViewLayout::Chat => collab_folder::ViewLayout::Chat,
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_to_bin(collab: &Collab, collab_type: CollabType) -> Result<Vec<u8>, AppError> {
|
||||
let bin = collab
|
||||
.encode_collab_v1(|collab| collab_type.validate_require_data(collab))
|
||||
.map_err(|e| AppError::Unhandled(e.to_string()))?
|
||||
.encode_to_bytes()?;
|
||||
Ok(bin)
|
||||
}
|
|
@ -15,7 +15,8 @@ pub fn init_subscriber(app_env: &Environment, filters: Vec<String>) {
|
|||
.with_target(true)
|
||||
.with_max_level(tracing::Level::TRACE)
|
||||
.with_thread_ids(false)
|
||||
.with_file(false);
|
||||
.with_file(true)
|
||||
.with_line_number(true);
|
||||
|
||||
match app_env {
|
||||
Environment::Local => {
|
||||
|
|
|
@ -3,6 +3,8 @@ mod edit_workspace;
|
|||
mod invitation_crud;
|
||||
mod member_crud;
|
||||
mod publish;
|
||||
mod published_data;
|
||||
mod template;
|
||||
mod workspace_crud;
|
||||
mod workspace_folder;
|
||||
mod workspace_settings;
|
||||
|
|
|
@ -1,13 +1,29 @@
|
|||
use std::collections::HashMap;
|
||||
use appflowy_cloud::biz::collab::folder_view::collab_folder_to_folder_view;
|
||||
use appflowy_cloud::biz::workspace::publish_dup::{
|
||||
collab_from_doc_state, get_database_id_from_collab,
|
||||
};
|
||||
use collab::util::MapExt;
|
||||
use collab_database::views::ViewMap;
|
||||
use collab_database::workspace_database::WorkspaceDatabaseBody;
|
||||
use collab_entity::CollabType;
|
||||
use collab_folder::{CollabOrigin, Folder};
|
||||
use shared_entity::dto::publish_dto::PublishDatabaseData;
|
||||
use shared_entity::dto::publish_dto::PublishViewMetaData;
|
||||
use shared_entity::dto::workspace_dto::PublishedDuplicate;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
use app_error::ErrorCode;
|
||||
use client_api::entity::{AFRole, GlobalComment, PublishCollabItem, PublishCollabMetadata};
|
||||
use client_api::entity::{
|
||||
AFRole, GlobalComment, PublishCollabItem, PublishCollabMetadata, QueryCollab, QueryCollabParams,
|
||||
};
|
||||
use client_api_test::TestClient;
|
||||
use client_api_test::{generate_unique_registered_user_client, localhost_client};
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::workspace::published_data::{self};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_publish_namespace_set() {
|
||||
let (c, _user) = generate_unique_registered_user_client().await;
|
||||
|
@ -661,3 +677,318 @@ async fn workspace_member_publish_unpublish() {
|
|||
struct MyCustomMetadata {
|
||||
title: String,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn duplicate_to_workspace_references() {
|
||||
let client_1 = TestClient::new_user().await;
|
||||
let workspace_id = client_1.workspace_id().await;
|
||||
|
||||
// doc2 contains a reference to doc1
|
||||
let doc_2_view_id = uuid::Uuid::new_v4();
|
||||
let doc_2_metadata: PublishViewMetaData =
|
||||
serde_json::from_str(published_data::DOC_2_META).unwrap();
|
||||
let doc_2_doc_state = hex::decode(published_data::DOC_2_DOC_STATE_HEX).unwrap();
|
||||
|
||||
// doc_1_view_id needs to be fixed because doc_2 references it
|
||||
let doc_1_view_id: uuid::Uuid = "e8c4f99a-50ea-4758-bca0-afa7df5c2434".parse().unwrap();
|
||||
let doc_1_metadata: PublishViewMetaData =
|
||||
serde_json::from_str(published_data::DOC_1_META).unwrap();
|
||||
let doc_1_doc_state = hex::decode(published_data::DOC_1_DOC_STATE_HEX).unwrap();
|
||||
|
||||
// doc1 contains @reference database to grid1 (not inline)
|
||||
let grid_1_view_id: uuid::Uuid = "8e062f61-d7ae-4f4b-869c-f44c43149399".parse().unwrap();
|
||||
let grid_1_metadata: PublishViewMetaData =
|
||||
serde_json::from_str(published_data::GRID_1_META).unwrap();
|
||||
let grid_1_db_data = hex::decode(published_data::GRID_1_DB_DATA).unwrap();
|
||||
|
||||
client_1
|
||||
.api_client
|
||||
.publish_collabs(
|
||||
&workspace_id,
|
||||
vec![
|
||||
PublishCollabItem {
|
||||
meta: PublishCollabMetadata {
|
||||
view_id: doc_1_view_id,
|
||||
publish_name: doc_1_metadata.view.name.clone(),
|
||||
metadata: doc_1_metadata.clone(),
|
||||
},
|
||||
data: doc_1_doc_state,
|
||||
},
|
||||
PublishCollabItem {
|
||||
meta: PublishCollabMetadata {
|
||||
view_id: doc_2_view_id,
|
||||
publish_name: doc_2_metadata.view.name.clone(),
|
||||
metadata: doc_2_metadata.clone(),
|
||||
},
|
||||
data: doc_2_doc_state,
|
||||
},
|
||||
PublishCollabItem {
|
||||
meta: PublishCollabMetadata {
|
||||
view_id: grid_1_view_id,
|
||||
publish_name: grid_1_metadata.view.name.clone(),
|
||||
metadata: grid_1_metadata.clone(),
|
||||
},
|
||||
data: grid_1_db_data,
|
||||
},
|
||||
],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let client_2 = TestClient::new_user().await;
|
||||
let workspace_id_2 = client_2.workspace_id().await;
|
||||
let fv = client_2
|
||||
.api_client
|
||||
.get_workspace_folder(&workspace_id_2, Some(5))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// duplicate doc2 to workspace2
|
||||
// Result fv should be:
|
||||
// .
|
||||
// ├── Getting Started (existing)
|
||||
// └── doc2
|
||||
// └── doc1
|
||||
// └── grid1
|
||||
client_2
|
||||
.api_client
|
||||
.duplicate_published_to_workspace(
|
||||
&workspace_id_2,
|
||||
&PublishedDuplicate {
|
||||
published_view_id: doc_2_view_id.to_string(),
|
||||
dest_view_id: fv.view_id, // use the root view
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let fv = client_2
|
||||
.api_client
|
||||
.get_workspace_folder(&workspace_id_2, Some(5))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let doc_2_fv = fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == doc_2_metadata.view.name)
|
||||
.unwrap();
|
||||
assert_ne!(doc_2_fv.view_id, doc_1_view_id.to_string());
|
||||
|
||||
let doc_1_fv = doc_2_fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == doc_1_metadata.view.name)
|
||||
.unwrap();
|
||||
assert_ne!(doc_1_fv.view_id, doc_1_view_id.to_string());
|
||||
|
||||
let grid_1_fv = doc_1_fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == grid_1_metadata.view.name)
|
||||
.unwrap();
|
||||
assert_ne!(grid_1_fv.view_id, grid_1_view_id.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn duplicate_to_workspace_doc_inline_database() {
|
||||
let client_1 = TestClient::new_user().await;
|
||||
let workspace_id = client_1.workspace_id().await;
|
||||
|
||||
// doc3 contains inline database to a view in grid1 (view of grid1)
|
||||
let doc_3_view_id = uuid::Uuid::new_v4();
|
||||
let doc_3_metadata: PublishViewMetaData =
|
||||
serde_json::from_str(published_data::DOC_3_META).unwrap();
|
||||
let doc_3_doc_state = hex::decode(published_data::DOC_3_DOC_STATE_HEX).unwrap();
|
||||
|
||||
// view of grid1
|
||||
let view_of_grid_1_view_id: uuid::Uuid = "d8589e98-88fc-42e4-888c-b03338bf22bb".parse().unwrap();
|
||||
let view_of_grid_1_metadata: PublishViewMetaData =
|
||||
serde_json::from_str(published_data::VIEW_OF_GRID1_META).unwrap();
|
||||
let view_of_grid_1_db_data = hex::decode(published_data::VIEW_OF_GRID_1_DB_DATA).unwrap();
|
||||
let (pub_db_id, pub_row_ids) = get_database_id_and_row_ids(&view_of_grid_1_db_data);
|
||||
|
||||
client_1
|
||||
.api_client
|
||||
.publish_collabs(
|
||||
&workspace_id,
|
||||
vec![
|
||||
PublishCollabItem {
|
||||
meta: PublishCollabMetadata {
|
||||
view_id: doc_3_view_id,
|
||||
publish_name: doc_3_metadata.view.name.clone(),
|
||||
metadata: doc_3_metadata.clone(),
|
||||
},
|
||||
data: doc_3_doc_state,
|
||||
},
|
||||
PublishCollabItem {
|
||||
meta: PublishCollabMetadata {
|
||||
view_id: view_of_grid_1_view_id,
|
||||
publish_name: view_of_grid_1_metadata.view.name.replace(' ', "-"),
|
||||
metadata: view_of_grid_1_metadata.clone(),
|
||||
},
|
||||
data: view_of_grid_1_db_data,
|
||||
},
|
||||
],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut client_2 = TestClient::new_user().await;
|
||||
let workspace_id_2 = client_2.workspace_id().await;
|
||||
|
||||
// Open workspace to trigger group creation
|
||||
client_2
|
||||
.open_collab(&workspace_id_2, &workspace_id_2, CollabType::Folder)
|
||||
.await;
|
||||
|
||||
let fv = client_2
|
||||
.api_client
|
||||
.get_workspace_folder(&workspace_id_2, Some(5))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// duplicate doc3 to workspace2
|
||||
// Result fv should be:
|
||||
// .
|
||||
// ├── Getting Started (existing)
|
||||
// └── doc3
|
||||
// └── grid1
|
||||
// └── View of grid1
|
||||
client_2
|
||||
.api_client
|
||||
.duplicate_published_to_workspace(
|
||||
&workspace_id_2,
|
||||
&PublishedDuplicate {
|
||||
published_view_id: doc_3_view_id.to_string(),
|
||||
dest_view_id: fv.view_id, // use the root view
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let fv = client_2
|
||||
.api_client
|
||||
.get_workspace_folder(&workspace_id_2, Some(5))
|
||||
.await
|
||||
.unwrap();
|
||||
let doc_3_fv = fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == doc_3_metadata.view.name)
|
||||
.unwrap();
|
||||
let grid1_fv = doc_3_fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == "grid1")
|
||||
.unwrap();
|
||||
let _view_of_grid1_fv = grid1_fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == "View of grid1")
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let collab_resp = client_2
|
||||
.get_collab(QueryCollabParams {
|
||||
workspace_id: workspace_id_2.clone(),
|
||||
inner: QueryCollab {
|
||||
object_id: workspace_id_2.clone(),
|
||||
collab_type: CollabType::Folder,
|
||||
},
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let folder = Folder::from_collab_doc_state(
|
||||
client_2.uid().await,
|
||||
CollabOrigin::Server,
|
||||
collab_resp.encode_collab.into(),
|
||||
&workspace_id_2,
|
||||
vec![],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let folder_view = collab_folder_to_folder_view(&folder, 5);
|
||||
let doc_3_fv = folder_view
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == doc_3_metadata.view.name)
|
||||
.unwrap();
|
||||
assert_ne!(doc_3_fv.view_id, doc_3_view_id.to_string());
|
||||
|
||||
let grid1_fv = doc_3_fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == "grid1")
|
||||
.unwrap();
|
||||
assert_ne!(grid1_fv.view_id, view_of_grid_1_view_id.to_string());
|
||||
|
||||
let view_of_grid1_fv = grid1_fv
|
||||
.children
|
||||
.into_iter()
|
||||
.find(|v| v.name == "View of grid1")
|
||||
.unwrap();
|
||||
println!("{:#?}", view_of_grid1_fv);
|
||||
assert_ne!(view_of_grid1_fv.view_id, view_of_grid_1_view_id.to_string());
|
||||
|
||||
{
|
||||
// check that database_id is different
|
||||
let mut ws_db_collab = client_2
|
||||
.get_workspace_database_collab(&workspace_id_2)
|
||||
.await;
|
||||
let ws_db_body = WorkspaceDatabaseBody::new(&mut ws_db_collab);
|
||||
let txn = ws_db_collab.transact();
|
||||
let dup_grid1_db_id = ws_db_body
|
||||
.get_all_database_meta(&txn)
|
||||
.into_iter()
|
||||
.find(|db_meta| db_meta.linked_views.contains(&view_of_grid1_fv.view_id))
|
||||
.unwrap()
|
||||
.database_id;
|
||||
let db_collab_collab_resp = client_2
|
||||
.get_collab(QueryCollabParams {
|
||||
workspace_id: workspace_id_2,
|
||||
inner: QueryCollab {
|
||||
object_id: dup_grid1_db_id,
|
||||
collab_type: CollabType::Database,
|
||||
},
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let db_doc_state = db_collab_collab_resp.encode_collab.doc_state;
|
||||
let db_collab = collab_from_doc_state(db_doc_state.to_vec(), "").unwrap();
|
||||
let dup_db_id = get_database_id_from_collab(&db_collab).unwrap();
|
||||
assert_ne!(dup_db_id, pub_db_id);
|
||||
|
||||
let view_map = {
|
||||
let map_ref = db_collab
|
||||
.data
|
||||
.get_with_path(&txn, ["database", "views"])
|
||||
.unwrap();
|
||||
ViewMap::new(map_ref, tokio::sync::broadcast::channel(1).0)
|
||||
};
|
||||
|
||||
for db_view in view_map.get_all_views(&txn) {
|
||||
assert_eq!(db_view.database_id, dup_db_id);
|
||||
for row_order in db_view.row_orders {
|
||||
assert!(
|
||||
!pub_row_ids.contains(row_order.id.as_str()),
|
||||
"published row id is same as duplicated row id"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_database_id_and_row_ids(published_db_blob: &[u8]) -> (String, HashSet<String>) {
|
||||
let pub_db_data = serde_json::from_slice::<PublishDatabaseData>(published_db_blob).unwrap();
|
||||
let db_collab = collab_from_doc_state(pub_db_data.database_collab, "").unwrap();
|
||||
let pub_db_id = get_database_id_from_collab(&db_collab).unwrap();
|
||||
let row_ids: HashSet<String> = pub_db_data.database_row_collabs.into_keys().collect();
|
||||
(pub_db_id, row_ids)
|
||||
}
|
||||
|
|
350
tests/workspace/published_data.rs
Normal file
350
tests/workspace/published_data.rs
Normal file
File diff suppressed because one or more lines are too long
14
tests/workspace/workspace_folder.rs
Normal file
14
tests/workspace/workspace_folder.rs
Normal file
|
@ -0,0 +1,14 @@
|
|||
use client_api_test::generate_unique_registered_user_client;
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_workpace_folder() {
|
||||
let (c, _user) = generate_unique_registered_user_client().await;
|
||||
let workspaces = c.get_workspaces().await.unwrap();
|
||||
assert_eq!(workspaces.len(), 1);
|
||||
let workspace_id = workspaces[0].workspace_id.to_string();
|
||||
|
||||
let folder_view = c.get_workspace_folder(&workspace_id, None).await.unwrap();
|
||||
assert_eq!(folder_view.name, "Workspace");
|
||||
assert_eq!(folder_view.children[0].name, "General");
|
||||
assert_eq!(folder_view.children[0].children.len(), 0);
|
||||
}
|
|
@ -1,4 +1,3 @@
|
|||
use collab::core::collab::DataSource;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_folder::Folder;
|
||||
|
@ -25,7 +24,7 @@ fn load_yrs_0172_version_folder_using_current_yrs_version() {
|
|||
let folder = Folder::from_collab_doc_state(
|
||||
322319512080748544,
|
||||
CollabOrigin::Empty,
|
||||
DataSource::DocStateV1(encode_collab.doc_state.to_vec()),
|
||||
encode_collab.into(),
|
||||
"fake_id", // just use fake id
|
||||
vec![],
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue