feat: Support s3 as published collab storage (#798)

* chore: remove unused published collab methods

* feat: support s3 as published collab storage
This commit is contained in:
Khor Shu Heng 2024-09-11 20:52:33 +08:00 committed by GitHub
parent fff93e6083
commit abae8d2d1b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 661 additions and 174 deletions

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT workspace_id, metadata\n FROM af_published_collab\n WHERE view_id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "workspace_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "metadata",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false
]
},
"hash": "2dda0bc4d9486a49c0af00d8ee4408c970a2ba3533217c130281e7db5a4e3d6b"
}

View file

@ -0,0 +1,29 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT workspace_id, view_id\n FROM af_published_collab\n WHERE workspace_id = (SELECT workspace_id FROM af_workspace WHERE publish_namespace = $1)\n AND publish_name = $2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "workspace_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "view_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Text",
"Text"
]
},
"nullable": [
false,
false
]
},
"hash": "d205df7e6a71335bc457f560fa5a941c738cd1f8e7c3369b0b24bb34fbb1c6eb"
}

View file

@ -952,6 +952,12 @@ pub struct PublishCollabMetadata<Metadata> {
pub metadata: Metadata,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct PublishCollabKey {
pub workspace_id: uuid::Uuid,
pub view_id: uuid::Uuid,
}
#[derive(Debug)]
pub struct PublishCollabItem<Meta, Data> {
pub meta: PublishCollabMetadata<Meta>,

View file

@ -32,6 +32,8 @@ pub trait BucketClient {
async fn delete_blob(&self, object_key: &str) -> Result<Self::ResponseData, AppError>;
async fn delete_blobs(&self, object_key: Vec<String>) -> Result<Self::ResponseData, AppError>;
async fn get_blob(&self, object_key: &str) -> Result<Self::ResponseData, AppError>;
async fn create_upload(

View file

@ -142,7 +142,38 @@ impl BucketClient for AwsS3BucketClientImpl {
.await
.map_err(|err| anyhow!("Failed to delete object to S3: {}", err))?;
Ok(S3ResponseData::new(output))
Ok(S3ResponseData::from(output))
}
async fn delete_blobs(&self, object_keys: Vec<String>) -> Result<Self::ResponseData, AppError> {
let mut delete_object_ids: Vec<aws_sdk_s3::types::ObjectIdentifier> = vec![];
for obj in object_keys {
let obj_id = aws_sdk_s3::types::ObjectIdentifier::builder()
.key(obj)
.build()
.map_err(|err| {
AppError::Internal(anyhow!("Failed to create object identifier: {}", err))
})?;
delete_object_ids.push(obj_id);
}
let output = self
.client
.delete_objects()
.bucket(&self.bucket)
.delete(
Delete::builder()
.set_objects(Some(delete_object_ids))
.build()
.map_err(|err| {
AppError::Internal(anyhow!("Failed to create delete object request: {}", err))
})?,
)
.send()
.await
.map_err(|err| anyhow!("Failed to delete objects from S3: {}", err))?;
Ok(S3ResponseData::from(output))
}
async fn get_blob(&self, object_key: &str) -> Result<Self::ResponseData, AppError> {
@ -398,14 +429,25 @@ impl ResponseBlob for S3ResponseData {
}
}
impl S3ResponseData {
pub fn new(_output: DeleteObjectOutput) -> Self {
impl From<DeleteObjectOutput> for S3ResponseData {
fn from(_: DeleteObjectOutput) -> Self {
S3ResponseData {
data: Vec::new(),
content_type: None,
}
}
}
impl From<DeleteObjectsOutput> for S3ResponseData {
fn from(_: DeleteObjectsOutput) -> Self {
S3ResponseData {
data: Vec::new(),
content_type: None,
}
}
}
impl S3ResponseData {
pub fn new_with_data(data: Vec<u8>, content_type: Option<String>) -> Self {
S3ResponseData { data, content_type }
}

View file

@ -1,5 +1,5 @@
use app_error::AppError;
use database_entity::dto::{PublishCollabItem, PublishInfo};
use database_entity::dto::{PublishCollabItem, PublishCollabKey, PublishInfo};
use sqlx::{Executor, PgPool, Postgres};
use uuid::Uuid;
@ -210,6 +210,24 @@ pub async fn delete_published_collabs<'a, E: Executor<'a, Database = Postgres>>(
Ok(())
}
#[inline]
pub async fn select_published_metadata_for_view_id(
pg_pool: &PgPool,
view_id: &Uuid,
) -> Result<Option<(Uuid, serde_json::Value)>, AppError> {
let res = sqlx::query!(
r#"
SELECT workspace_id, metadata
FROM af_published_collab
WHERE view_id = $1
"#,
view_id,
)
.fetch_optional(pg_pool)
.await?;
Ok(res.map(|res| (res.workspace_id, res.metadata)))
}
#[inline]
pub async fn select_published_data_for_view_id(
pg_pool: &PgPool,
@ -228,6 +246,28 @@ pub async fn select_published_data_for_view_id(
Ok(res.map(|res| (res.metadata, res.blob)))
}
#[inline]
pub async fn select_published_collab_workspace_view_id<'a, E: Executor<'a, Database = Postgres>>(
executor: E,
publish_namespace: &str,
publish_name: &str,
) -> Result<PublishCollabKey, AppError> {
let key = sqlx::query_as!(
PublishCollabKey,
r#"
SELECT workspace_id, view_id
FROM af_published_collab
WHERE workspace_id = (SELECT workspace_id FROM af_workspace WHERE publish_namespace = $1)
AND publish_name = $2
"#,
publish_namespace,
publish_name,
)
.fetch_one(executor)
.await?;
Ok(key)
}
#[inline]
pub async fn select_published_collab_blob<'a, E: Executor<'a, Database = Postgres>>(
executor: E,

View file

@ -1,6 +1,6 @@
use database_entity::dto::{
AFRole, AFWorkspaceInvitation, AFWorkspaceInvitationStatus, AFWorkspaceSettings, GlobalComment,
PublishCollabItem, PublishInfo, Reaction,
PublishInfo, Reaction,
};
use futures_util::stream::BoxStream;
use sqlx::{types::uuid, Executor, PgPool, Postgres, Transaction};
@ -1007,62 +1007,6 @@ pub async fn select_workspace_publish_namespace<'a, E: Executor<'a, Database = P
Ok(res)
}
#[inline]
pub async fn insert_or_replace_publish_collab_metas<'a, E: Executor<'a, Database = Postgres>>(
executor: E,
workspace_id: &Uuid,
publisher_uuid: &Uuid,
publish_item: &[PublishCollabItem<serde_json::Value, Vec<u8>>],
) -> Result<(), AppError> {
let view_ids: Vec<Uuid> = publish_item.iter().map(|item| item.meta.view_id).collect();
let publish_names: Vec<String> = publish_item
.iter()
.map(|item| item.meta.publish_name.clone())
.collect();
let metadatas: Vec<serde_json::Value> = publish_item
.iter()
.map(|item| item.meta.metadata.clone())
.collect();
let blobs: Vec<Vec<u8>> = publish_item.iter().map(|item| item.data.clone()).collect();
let res = sqlx::query!(
r#"
INSERT INTO af_published_collab (workspace_id, view_id, publish_name, published_by, metadata, blob)
SELECT * FROM UNNEST(
(SELECT array_agg((SELECT $1::uuid)) FROM generate_series(1, $7))::uuid[],
$2::uuid[],
$3::text[],
(SELECT array_agg((SELECT uid FROM af_user WHERE uuid = $4)) FROM generate_series(1, $7))::bigint[],
$5::jsonb[],
$6::bytea[]
)
ON CONFLICT (workspace_id, view_id) DO UPDATE
SET metadata = EXCLUDED.metadata,
blob = EXCLUDED.blob,
published_by = EXCLUDED.published_by,
publish_name = EXCLUDED.publish_name
"#,
workspace_id,
&view_ids,
&publish_names,
publisher_uuid,
&metadatas,
&blobs,
publish_item.len() as i32,
)
.execute(executor)
.await?;
if res.rows_affected() != publish_item.len() as u64 {
tracing::warn!(
"Failed to insert or replace publish collab meta batch, workspace_id: {}, publisher_uuid: {}, rows_affected: {}",
workspace_id, publisher_uuid, res.rows_affected()
);
}
Ok(())
}
#[inline]
pub async fn select_publish_collab_meta<'a, E: Executor<'a, Database = Postgres>>(
executor: E,

View file

@ -7,6 +7,7 @@ use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::exemplar::CounterWithExemplar;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use std::sync::Arc;
use uuid::Uuid;
@ -122,3 +123,87 @@ impl RequestMetrics {
.inc_by(1, trace_id.clone().map(|s| TraceLabel { trace_id: s }));
}
}
#[derive(Clone)]
pub struct PublishedCollabMetrics {
success_write_published_collab_count: Gauge,
fallback_write_published_collab_count: Gauge,
failure_write_published_collab_count: Gauge,
success_read_published_collab_count: Gauge,
fallback_read_published_collab_count: Gauge,
failure_read_published_collab_count: Gauge,
}
impl PublishedCollabMetrics {
fn init() -> Self {
Self {
success_write_published_collab_count: Default::default(),
fallback_write_published_collab_count: Default::default(),
failure_write_published_collab_count: Default::default(),
success_read_published_collab_count: Default::default(),
fallback_read_published_collab_count: Default::default(),
failure_read_published_collab_count: Default::default(),
}
}
pub fn register(registry: &mut Registry) -> Self {
let metrics = Self::init();
let published_collab_registry = registry.sub_registry_with_prefix("published_collab");
published_collab_registry.register(
"write_success_count",
"successfully published collab",
metrics.success_write_published_collab_count.clone(),
);
published_collab_registry.register(
"write_fallback_count",
"successfully published collab to fallback store",
metrics.fallback_write_published_collab_count.clone(),
);
published_collab_registry.register(
"read_success_count",
"successfully read published collab",
metrics.success_read_published_collab_count.clone(),
);
published_collab_registry.register(
"write_failure_count",
"failed to publish collab",
metrics.failure_write_published_collab_count.clone(),
);
published_collab_registry.register(
"read_failure_count",
"failed to read published collab",
metrics.failure_read_published_collab_count.clone(),
);
published_collab_registry.register(
"read_fallback_count",
"failed to read published collab from primary store",
metrics.fallback_read_published_collab_count.clone(),
);
metrics
}
pub fn incr_success_write_count(&self, count: i64) {
self.success_write_published_collab_count.inc_by(count);
}
pub fn incr_fallback_write_count(&self, count: i64) {
self.fallback_write_published_collab_count.inc_by(count);
}
pub fn incr_failure_write_count(&self, count: i64) {
self.failure_write_published_collab_count.inc_by(count);
}
pub fn incr_success_read_count(&self, count: i64) {
self.success_read_published_collab_count.inc_by(count);
}
pub fn incr_fallback_read_count(&self, count: i64) {
self.fallback_read_published_collab_count.inc_by(count);
}
pub fn incr_failure_read_count(&self, count: i64) {
self.failure_read_published_collab_count.inc_by(count);
}
}

View file

@ -1009,12 +1009,10 @@ async fn get_published_collab_handler(
state: Data<AppState>,
) -> Result<Json<serde_json::Value>> {
let (workspace_namespace, publish_name) = path_param.into_inner();
let metadata = biz::workspace::publish::get_published_collab(
&state.pg_pool,
&workspace_namespace,
&publish_name,
)
.await?;
let metadata = state
.published_collab_store
.get_collab_metadata(&workspace_namespace, &publish_name)
.await?;
Ok(Json(metadata))
}
@ -1023,12 +1021,10 @@ async fn get_published_collab_blob_handler(
state: Data<AppState>,
) -> Result<Vec<u8>> {
let (publish_namespace, publish_name) = path_param.into_inner();
let collab_data = biz::workspace::publish::get_published_collab_blob(
&state.pg_pool,
&publish_namespace,
&publish_name,
)
.await?;
let collab_data = state
.published_collab_store
.get_collab_blob_by_publish_namespace(&publish_namespace, &publish_name)
.await?;
Ok(collab_data)
}
@ -1042,6 +1038,7 @@ async fn post_published_duplicate_handler(
let params = params.into_inner();
biz::workspace::publish_dup::duplicate_published_collab_to_workspace(
&state.pg_pool,
state.bucket_client.clone(),
state.collab_access_control_storage.clone(),
uid,
params.published_view_id,
@ -1057,8 +1054,10 @@ async fn get_published_collab_info_handler(
state: Data<AppState>,
) -> Result<Json<AppResponse<PublishInfo>>> {
let view_id = view_id.into_inner();
let collab_data =
biz::workspace::publish::get_published_collab_info(&state.pg_pool, &view_id).await?;
let collab_data = state
.published_collab_store
.get_collab_publish_info(&view_id)
.await?;
Ok(Json(AppResponse::Ok().with_data(collab_data)))
}
@ -1194,7 +1193,9 @@ async fn post_publish_collabs_handler(
AppError::InvalidRequest(String::from("did not receive any data to publish")).into(),
);
}
biz::workspace::publish::publish_collabs(&state.pg_pool, &workspace_id, &user_uuid, accumulator)
state
.published_collab_store
.publish_collabs(accumulator, &workspace_id, &user_uuid)
.await?;
Ok(Json(AppResponse::Ok()))
}
@ -1210,13 +1211,10 @@ async fn delete_published_collabs_handler(
if view_ids.is_empty() {
return Ok(Json(AppResponse::Ok()));
}
biz::workspace::publish::delete_published_workspace_collab(
&state.pg_pool,
&workspace_id,
&view_ids,
&user_uuid,
)
.await?;
state
.published_collab_store
.delete_collab(&workspace_id, &view_ids, &user_uuid)
.await?;
Ok(Json(AppResponse::Ok()))
}

View file

@ -55,7 +55,12 @@ use crate::api::ws::ws_scope;
use crate::biz::collab::access_control::CollabMiddlewareAccessControl;
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl;
use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting};
use crate::biz::workspace::publish::{
PublishedCollabPostgresStore, PublishedCollabS3StoreWithPostgresFallback, PublishedCollabStore,
};
use crate::config::config::{
Config, DatabaseSetting, GoTrueSetting, PublishedCollabStorageBackend, S3Setting,
};
use crate::mailer::Mailer;
use crate::middleware::access_control_mw::MiddlewareAccessControlTransform;
use crate::middleware::metrics_mw::MetricsMiddleware;
@ -172,6 +177,7 @@ pub async fn run_actix_server(
.app_data(Data::new(state.config.gotrue.jwt_secret.clone()))
.app_data(Data::new(state.clone()))
.app_data(Data::new(storage.clone()))
.app_data(Data::new(state.published_collab_store.clone()))
});
server = match pair {
@ -213,6 +219,27 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
pg_pool.clone(),
));
// Published Collab Storage
info!("Setting up Published Collab storage...");
let published_collab_store: Arc<dyn PublishedCollabStore> =
match config.published_collab.storage_backend {
PublishedCollabStorageBackend::Postgres => {
info!("Using Postgres as the Published Collab storage backend ...");
Arc::new(PublishedCollabPostgresStore::new(
metrics.published_collab_metrics.clone(),
pg_pool.clone(),
))
},
PublishedCollabStorageBackend::S3WithPostgresBackup => {
info!("Using S3 as the Published Collab storage backend with Postgres as the backup ...");
Arc::new(PublishedCollabS3StoreWithPostgresFallback::new(
metrics.published_collab_metrics.clone(),
pg_pool.clone(),
s3_client.clone(),
))
},
};
// Gotrue
info!("Connecting to GoTrue...");
let gotrue_client = get_gotrue_client(&config.gotrue).await?;
@ -306,6 +333,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
collab_access_control,
workspace_access_control,
bucket_storage,
published_collab_store,
bucket_client: s3_client,
pg_listeners,
access_control,

View file

@ -1,6 +1,5 @@
use authentication::jwt::OptionalUserUuid;
use database_entity::dto::AFWorkspaceSettingsChange;
use database_entity::dto::PublishCollabItem;
use database_entity::dto::PublishInfo;
use std::collections::HashMap;
@ -129,20 +128,6 @@ pub async fn get_workspace_publish_namespace(
select_workspace_publish_namespace(pg_pool, workspace_id).await
}
pub async fn publish_collabs(
pg_pool: &PgPool,
workspace_id: &Uuid,
publisher_uuid: &Uuid,
publish_items: &[PublishCollabItem<serde_json::Value, Vec<u8>>],
) -> Result<(), AppError> {
for publish_item in publish_items {
check_collab_publish_name(publish_item.meta.publish_name.as_str())?;
}
insert_or_replace_publish_collab_metas(pg_pool, workspace_id, publisher_uuid, publish_items)
.await?;
Ok(())
}
pub async fn get_published_collab(
pg_pool: &PgPool,
publish_namespace: &str,
@ -716,23 +701,3 @@ async fn check_if_user_is_allowed_to_delete_comment(
}
Ok(())
}
fn check_collab_publish_name(publish_name: &str) -> Result<(), AppError> {
// Check len
if publish_name.len() > 128 {
return Err(AppError::InvalidRequest(
"Publish name must be at most 128 characters long".to_string(),
));
}
// Only contain alphanumeric characters and hyphens
for c in publish_name.chars() {
if !c.is_alphanumeric() && c != '-' {
return Err(AppError::InvalidRequest(
"Publish name must only contain alphanumeric characters and hyphens".to_string(),
));
}
}
Ok(())
}

View file

@ -1,68 +1,30 @@
use std::sync::Arc;
use app_error::AppError;
use async_trait::async_trait;
use database_entity::dto::{PublishCollabItem, PublishInfo};
use shared_entity::dto::publish_dto::PublishViewMetaData;
use sqlx::PgPool;
use tracing::debug;
use uuid::Uuid;
use database::{
file::{s3_client_impl::AwsS3BucketClientImpl, BucketClient, ResponseBlob},
publish::{
delete_published_collabs, insert_or_replace_publish_collabs, select_publish_collab_meta,
select_published_collab_blob, select_published_collab_info,
select_user_is_collab_publisher_for_all_views, select_workspace_publish_namespace,
select_workspace_publish_namespace_exists, update_workspace_publish_namespace,
select_published_collab_workspace_view_id, select_published_data_for_view_id,
select_published_metadata_for_view_id, select_user_is_collab_publisher_for_all_views,
select_workspace_publish_namespace, select_workspace_publish_namespace_exists,
update_workspace_publish_namespace,
},
workspace::select_user_is_workspace_owner,
};
use crate::api::metrics::PublishedCollabMetrics;
use super::ops::check_workspace_owner;
pub async fn publish_collabs(
pg_pool: &PgPool,
workspace_id: &Uuid,
publisher_uuid: &Uuid,
publish_items: Vec<PublishCollabItem<serde_json::Value, Vec<u8>>>,
) -> Result<(), AppError> {
for publish_item in &publish_items {
check_collab_publish_name(publish_item.meta.publish_name.as_str())?;
}
insert_or_replace_publish_collabs(pg_pool, workspace_id, publisher_uuid, publish_items).await?;
Ok(())
}
pub async fn get_published_collab(
pg_pool: &PgPool,
publish_namespace: &str,
publish_name: &str,
) -> Result<serde_json::Value, AppError> {
let metadata = select_publish_collab_meta(pg_pool, publish_namespace, publish_name).await?;
Ok(metadata)
}
pub async fn get_published_collab_blob(
pg_pool: &PgPool,
publish_namespace: &str,
publish_name: &str,
) -> Result<Vec<u8>, AppError> {
select_published_collab_blob(pg_pool, publish_namespace, publish_name).await
}
pub async fn get_published_collab_info(
pg_pool: &PgPool,
view_id: &Uuid,
) -> Result<PublishInfo, AppError> {
select_published_collab_info(pg_pool, view_id).await
}
pub async fn delete_published_workspace_collab(
pg_pool: &PgPool,
workspace_id: &Uuid,
view_ids: &[Uuid],
user_uuid: &Uuid,
) -> Result<(), AppError> {
check_workspace_owner_or_publisher(pg_pool, user_uuid, workspace_id, view_ids).await?;
delete_published_collabs(pg_pool, workspace_id, view_ids).await?;
Ok(())
}
async fn check_workspace_owner_or_publisher(
pg_pool: &PgPool,
user_uuid: &Uuid,
@ -103,6 +65,10 @@ fn check_collab_publish_name(publish_name: &str) -> Result<(), AppError> {
Ok(())
}
fn get_collab_s3_key(workspace_id: &Uuid, view_id: &Uuid) -> String {
format!("published-collab/{}/{}", workspace_id, view_id)
}
pub async fn set_workspace_namespace(
pg_pool: &PgPool,
user_uuid: &Uuid,
@ -154,3 +120,302 @@ async fn check_workspace_namespace(new_namespace: &str) -> Result<(), AppError>
Ok(())
}
#[async_trait]
pub trait PublishedCollabStore: Sync + Send + 'static {
async fn publish_collabs(
&self,
published_items: Vec<PublishCollabItem<serde_json::Value, Vec<u8>>>,
workspace_id: &Uuid,
user_uuid: &Uuid,
) -> Result<(), AppError>;
async fn get_collab_with_view_metadata_by_view_id(
&self,
view_id: &Uuid,
) -> Result<Option<(PublishViewMetaData, Vec<u8>)>, AppError>;
async fn get_collab_metadata(
&self,
publish_namespace: &str,
publish_name: &str,
) -> Result<serde_json::Value, AppError>;
async fn get_collab_publish_info(&self, view_id: &Uuid) -> Result<PublishInfo, AppError>;
async fn get_collab_blob_by_publish_namespace(
&self,
publish_namespace: &str,
publish_name: &str,
) -> Result<Vec<u8>, AppError>;
async fn delete_collab(
&self,
workspace_id: &Uuid,
view_ids: &[Uuid],
user_uuid: &Uuid,
) -> Result<(), AppError>;
}
pub struct PublishedCollabPostgresStore {
metrics: Arc<PublishedCollabMetrics>,
pg_pool: PgPool,
}
impl PublishedCollabPostgresStore {
pub fn new(metrics: Arc<PublishedCollabMetrics>, pg_pool: PgPool) -> Self {
Self { metrics, pg_pool }
}
}
#[async_trait]
impl PublishedCollabStore for PublishedCollabPostgresStore {
async fn publish_collabs(
&self,
publish_items: Vec<PublishCollabItem<serde_json::Value, Vec<u8>>>,
workspace_id: &Uuid,
user_uuid: &Uuid,
) -> Result<(), AppError> {
for publish_item in &publish_items {
check_collab_publish_name(publish_item.meta.publish_name.as_str())?;
}
let publish_items_batch_size = publish_items.len() as i64;
let result =
insert_or_replace_publish_collabs(&self.pg_pool, workspace_id, user_uuid, publish_items)
.await;
if result.is_err() {
self
.metrics
.incr_failure_write_count(publish_items_batch_size);
} else {
self
.metrics
.incr_success_write_count(publish_items_batch_size);
}
result
}
async fn get_collab_metadata(
&self,
publish_namespace: &str,
publish_name: &str,
) -> Result<serde_json::Value, AppError> {
let metadata =
select_publish_collab_meta(&self.pg_pool, publish_namespace, publish_name).await?;
Ok(metadata)
}
async fn get_collab_with_view_metadata_by_view_id(
&self,
view_id: &Uuid,
) -> Result<Option<(PublishViewMetaData, Vec<u8>)>, AppError> {
let result = match select_published_data_for_view_id(&self.pg_pool, view_id).await? {
Some((js_val, blob)) => {
let metadata = serde_json::from_value(js_val)?;
Ok(Some((metadata, blob)))
},
None => Ok(None),
};
if result.is_err() {
self.metrics.incr_failure_read_count(1);
} else {
self.metrics.incr_success_read_count(1);
}
result
}
async fn get_collab_publish_info(&self, view_id: &Uuid) -> Result<PublishInfo, AppError> {
select_published_collab_info(&self.pg_pool, view_id).await
}
async fn get_collab_blob_by_publish_namespace(
&self,
publish_namespace: &str,
publish_name: &str,
) -> Result<Vec<u8>, AppError> {
let result = select_published_collab_blob(&self.pg_pool, publish_namespace, publish_name).await;
if result.is_err() {
self.metrics.incr_failure_read_count(1);
} else {
self.metrics.incr_success_read_count(1);
}
result
}
async fn delete_collab(
&self,
workspace_id: &Uuid,
view_ids: &[Uuid],
user_uuid: &Uuid,
) -> Result<(), AppError> {
check_workspace_owner_or_publisher(&self.pg_pool, user_uuid, workspace_id, view_ids).await?;
delete_published_collabs(&self.pg_pool, workspace_id, view_ids).await?;
Ok(())
}
}
pub struct PublishedCollabS3StoreWithPostgresFallback {
metrics: Arc<PublishedCollabMetrics>,
pg_pool: PgPool,
bucket_client: AwsS3BucketClientImpl,
}
impl PublishedCollabS3StoreWithPostgresFallback {
pub fn new(
metrics: Arc<PublishedCollabMetrics>,
pg_pool: PgPool,
bucket_client: AwsS3BucketClientImpl,
) -> Self {
Self {
metrics,
pg_pool,
bucket_client,
}
}
}
#[async_trait]
impl PublishedCollabStore for PublishedCollabS3StoreWithPostgresFallback {
async fn publish_collabs(
&self,
publish_items: Vec<PublishCollabItem<serde_json::Value, Vec<u8>>>,
workspace_id: &Uuid,
user_uuid: &Uuid,
) -> Result<(), AppError> {
let publish_items_batch_size = publish_items.len() as i64;
let mut handles: Vec<tokio::task::JoinHandle<()>> = vec![];
for publish_item in &publish_items {
check_collab_publish_name(publish_item.meta.publish_name.as_str())?;
let object_key = get_collab_s3_key(workspace_id, &publish_item.meta.view_id);
let data = publish_item.data.clone();
let bucket_client = self.bucket_client.clone();
let metrics = self.metrics.clone();
let handle = tokio::spawn(async move {
let result = bucket_client.put_blob(&object_key, &data).await;
if let Err(err) = result {
debug!("Failed to publish collab to S3: {}", err);
} else {
metrics.incr_success_write_count(1);
}
});
handles.push(handle);
}
for handle in handles {
handle.await?;
}
let result =
insert_or_replace_publish_collabs(&self.pg_pool, workspace_id, user_uuid, publish_items)
.await;
if result.is_err() {
self
.metrics
.incr_failure_write_count(publish_items_batch_size);
} else {
self
.metrics
.incr_fallback_write_count(publish_items_batch_size);
}
result
}
async fn get_collab_metadata(
&self,
publish_namespace: &str,
publish_name: &str,
) -> Result<serde_json::Value, AppError> {
let metadata =
select_publish_collab_meta(&self.pg_pool, publish_namespace, publish_name).await?;
Ok(metadata)
}
async fn get_collab_with_view_metadata_by_view_id(
&self,
view_id: &Uuid,
) -> Result<Option<(PublishViewMetaData, Vec<u8>)>, AppError> {
let result = select_published_metadata_for_view_id(&self.pg_pool, view_id).await?;
match result {
Some((workspace_id, js_val)) => {
let metadata = serde_json::from_value(js_val)?;
let object_key = get_collab_s3_key(&workspace_id, view_id);
match self.bucket_client.get_blob(&object_key).await {
Ok(resp) => {
self.metrics.incr_success_read_count(1);
Ok(Some((metadata, resp.to_blob())))
},
Err(_) => {
let result = match select_published_data_for_view_id(&self.pg_pool, view_id).await? {
Some((js_val, blob)) => {
let metadata = serde_json::from_value(js_val)?;
Ok(Some((metadata, blob)))
},
None => Ok(None),
};
if result.is_err() {
self.metrics.incr_failure_read_count(1);
} else {
self.metrics.incr_fallback_read_count(1);
}
result
},
}
},
None => {
self.metrics.incr_success_read_count(1);
Ok(None)
},
}
}
async fn get_collab_publish_info(&self, view_id: &Uuid) -> Result<PublishInfo, AppError> {
select_published_collab_info(&self.pg_pool, view_id).await
}
async fn get_collab_blob_by_publish_namespace(
&self,
publish_namespace: &str,
publish_name: &str,
) -> Result<Vec<u8>, AppError> {
let collab_key =
select_published_collab_workspace_view_id(&self.pg_pool, publish_namespace, publish_name)
.await?;
let object_key = get_collab_s3_key(&collab_key.workspace_id, &collab_key.view_id);
let resp = self.bucket_client.get_blob(&object_key).await;
match resp {
Ok(resp) => {
self.metrics.incr_success_read_count(1);
Ok(resp.to_blob())
},
Err(err) => {
debug!(
"Failed to get published collab blob {} from S3 due to {}",
object_key, err
);
let result =
select_published_collab_blob(&self.pg_pool, publish_namespace, publish_name).await;
if result.is_err() {
self.metrics.incr_failure_read_count(1);
} else {
self.metrics.incr_fallback_read_count(1);
}
result
},
}
}
async fn delete_collab(
&self,
workspace_id: &Uuid,
view_ids: &[Uuid],
user_uuid: &Uuid,
) -> Result<(), AppError> {
check_workspace_owner_or_publisher(&self.pg_pool, user_uuid, workspace_id, view_ids).await?;
let object_keys = view_ids
.iter()
.map(|view_id| get_collab_s3_key(workspace_id, view_id))
.collect::<Vec<String>>();
self.bucket_client.delete_blobs(object_keys).await?;
delete_published_collabs(&self.pg_pool, workspace_id, view_ids).await?;
Ok(())
}
}

View file

@ -17,7 +17,11 @@ use collab_rt_entity::{ClientCollabMessage, UpdateSync};
use collab_rt_protocol::{Message, SyncMessage};
use database::collab::GetCollabOrigin;
use database::collab::{select_workspace_database_oid, CollabStorage};
use database::file::s3_client_impl::AwsS3BucketClientImpl;
use database::file::BucketClient;
use database::file::ResponseBlob;
use database::publish::select_published_data_for_view_id;
use database::publish::select_published_metadata_for_view_id;
use database_entity::dto::CollabParams;
use shared_entity::dto::publish_dto::{PublishDatabaseData, PublishViewInfo, PublishViewMetaData};
use shared_entity::dto::workspace_dto;
@ -38,6 +42,7 @@ use crate::biz::collab::ops::get_latest_collab_encoded;
#[allow(clippy::too_many_arguments)]
pub async fn duplicate_published_collab_to_workspace(
pg_pool: &PgPool,
bucket_client: AwsS3BucketClientImpl,
collab_storage: Arc<CollabAccessControlStorage>,
dest_uid: i64,
publish_view_id: String,
@ -46,6 +51,7 @@ pub async fn duplicate_published_collab_to_workspace(
) -> Result<(), AppError> {
let copier = PublishCollabDuplicator::new(
pg_pool.clone(),
bucket_client,
collab_storage.clone(),
dest_uid,
dest_workspace_id,
@ -87,6 +93,8 @@ pub struct PublishCollabDuplicator {
/// for fetching published data
/// and writing them to dest workspace
pg_pool: PgPool,
/// for fetching published data from s3
bucket_client: AwsS3BucketClientImpl,
/// user initiating the duplication
duplicator_uid: i64,
/// workspace to duplicate into
@ -98,6 +106,7 @@ pub struct PublishCollabDuplicator {
impl PublishCollabDuplicator {
pub fn new(
pg_pool: PgPool,
bucket_client: AwsS3BucketClientImpl,
collab_storage: Arc<CollabAccessControlStorage>,
dest_uid: i64,
dest_workspace_id: String,
@ -113,8 +122,8 @@ impl PublishCollabDuplicator {
duplicated_db_main_view: HashMap::new(),
duplicated_db_view: HashMap::new(),
duplicated_db_row: HashMap::new(),
pg_pool,
bucket_client,
collab_storage,
duplicator_uid: dest_uid,
dest_workspace_id,
@ -147,6 +156,7 @@ impl PublishCollabDuplicator {
collabs_to_insert,
ts_now: _,
pg_pool,
bucket_client: _,
duplicator_uid,
dest_workspace_id,
dest_view_id,
@ -1098,10 +1108,21 @@ impl PublishCollabDuplicator {
&self,
view_id: &uuid::Uuid,
) -> Result<Option<(PublishViewMetaData, Vec<u8>)>, AppError> {
match select_published_data_for_view_id(&self.pg_pool, view_id).await? {
Some((js_val, blob)) => {
let result = select_published_metadata_for_view_id(&self.pg_pool, view_id).await?;
match result {
Some((workspace_id, js_val)) => {
let metadata = serde_json::from_value(js_val)?;
Ok(Some((metadata, blob)))
let object_key = format!("published-collab/{}/{}", workspace_id, view_id);
match self.bucket_client.get_blob(&object_key).await {
Ok(resp) => Ok(Some((metadata, resp.to_blob()))),
Err(_) => match select_published_data_for_view_id(&self.pg_pool, view_id).await? {
Some((js_val, blob)) => {
let metadata = serde_json::from_value(js_val)?;
Ok(Some((metadata, blob)))
},
None => Ok(None),
},
}
},
None => Ok(None),
}

View file

@ -21,6 +21,7 @@ pub struct Config {
pub appflowy_ai: AppFlowyAISetting,
pub grpc_history: GrpcHistorySetting,
pub collab: CollabSetting,
pub published_collab: PublishedCollabSetting,
pub mailer: MailerSetting,
pub apple_oauth: AppleOAuthSetting,
}
@ -140,6 +141,29 @@ pub struct CollabSetting {
pub edit_state_max_secs: i64,
}
#[derive(Clone, Debug)]
pub enum PublishedCollabStorageBackend {
Postgres,
S3WithPostgresBackup,
}
#[derive(Clone, Debug)]
pub struct PublishedCollabSetting {
pub storage_backend: PublishedCollabStorageBackend,
}
impl TryFrom<&str> for PublishedCollabStorageBackend {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"postgres" => Ok(PublishedCollabStorageBackend::Postgres),
"s3_with_postgres_backup" => Ok(PublishedCollabStorageBackend::S3WithPostgresBackup),
_ => Err(anyhow::anyhow!("Invalid PublishedCollabStorageBackend")),
}
}
}
// Default values favor local development.
pub fn get_configuration() -> Result<Config, anyhow::Error> {
let config = Config {
@ -205,6 +229,11 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
},
published_collab: PublishedCollabSetting {
storage_backend: get_env_var("APPFLOWY_PUBLISHED_COLLAB_STORAGE_BACKEND", "postgres")
.as_str()
.try_into()?,
},
mailer: MailerSetting {
smtp_host: get_env_var("APPFLOWY_MAILER_SMTP_HOST", "smtp.gmail.com"),
smtp_port: get_env_var("APPFLOWY_MAILER_SMTP_PORT", "465").parse()?,

View file

@ -26,8 +26,9 @@ use snowflake::Snowflake;
use tonic_proto::history::history_client::HistoryClient;
use workspace_access::WorkspaceAccessControlImpl;
use crate::api::metrics::RequestMetrics;
use crate::api::metrics::{PublishedCollabMetrics, RequestMetrics};
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::publish::PublishedCollabStore;
use crate::config::config::Config;
use crate::mailer::Mailer;
@ -45,6 +46,7 @@ pub struct AppState {
pub collab_access_control: CollabAccessControlImpl,
pub workspace_access_control: WorkspaceAccessControlImpl,
pub bucket_storage: Arc<S3BucketStorage>,
pub published_collab_store: Arc<dyn PublishedCollabStore>,
pub bucket_client: AwsS3BucketClientImpl,
pub pg_listeners: Arc<PgListeners>,
pub access_control: AccessControl,
@ -123,6 +125,7 @@ pub struct AppMetrics {
pub realtime_metrics: Arc<CollabRealtimeMetrics>,
pub access_control_metrics: Arc<AccessControlMetrics>,
pub collab_metrics: Arc<CollabMetrics>,
pub published_collab_metrics: Arc<PublishedCollabMetrics>,
}
impl Default for AppMetrics {
@ -138,12 +141,14 @@ impl AppMetrics {
let realtime_metrics = Arc::new(CollabRealtimeMetrics::register(&mut registry));
let access_control_metrics = Arc::new(AccessControlMetrics::register(&mut registry));
let collab_metrics = Arc::new(CollabMetrics::register(&mut registry));
let published_collab_metrics = Arc::new(PublishedCollabMetrics::register(&mut registry));
Self {
registry: Arc::new(registry),
request_metrics,
realtime_metrics,
access_control_metrics,
collab_metrics,
published_collab_metrics,
}
}
}