feat: workspace usage test (#129)

* feat: workspace usage test

* test: add tests
This commit is contained in:
Nathan.fooo 2023-10-22 17:52:10 +08:00 committed by GitHub
parent a30745f1c4
commit 1aba1f0cf4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 682 additions and 69 deletions

View file

@ -0,0 +1,46 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM af_blob_metadata\n WHERE workspace_id = $1 \n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "workspace_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "file_id",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "file_type",
"type_info": "Varchar"
},
{
"ordinal": 3,
"name": "file_size",
"type_info": "Int8"
},
{
"ordinal": 4,
"name": "modified_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false,
false,
false
]
},
"hash": "022434c877aab8231e2728d917793c1296caa5762734f27be281f977685474f9"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT file_id FROM af_blob_metadata\n WHERE workspace_id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "file_id",
"type_info": "Varchar"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false
]
},
"hash": "71c15686124c05a4fdef066738eadd0ab17d6af1bfeffc480c8fe52a4e6edab8"
}

190
Cargo.lock generated
View file

@ -324,6 +324,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "adler32"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
[[package]]
name = "admin_frontend"
version = "0.1.0"
@ -492,6 +498,7 @@ dependencies = [
"futures-util",
"gotrue",
"gotrue-entity",
"image",
"infra",
"itertools",
"lazy_static",
@ -828,7 +835,7 @@ dependencies = [
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"miniz_oxide 0.7.1",
"object",
"rustc-demangle",
]
@ -1031,6 +1038,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "bytemuck"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -1111,6 +1124,7 @@ dependencies = [
"gotrue-entity",
"lib0",
"mime",
"mime_guess",
"parking_lot",
"realtime-entity",
"reqwest",
@ -1163,6 +1177,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "color_quant"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b"
[[package]]
name = "combine"
version = "4.6.6"
@ -1312,6 +1332,30 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
@ -1436,6 +1480,16 @@ dependencies = [
"validator",
]
[[package]]
name = "deflate"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73770f8e1fe7d64df17ca66ad28994a0a623ea497fa69486e14984e715c5d174"
dependencies = [
"adler32",
"byteorder",
]
[[package]]
name = "der"
version = "0.7.8"
@ -1656,7 +1710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010"
dependencies = [
"crc32fast",
"miniz_oxide",
"miniz_oxide 0.7.1",
]
[[package]]
@ -1888,6 +1942,16 @@ dependencies = [
"polyval",
]
[[package]]
name = "gif"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3edd93c6756b4dfaf2709eafcc345ba2636565295c198a9cfbf75fa5e3e00b06"
dependencies = [
"color_quant",
"weezl",
]
[[package]]
name = "gimli"
version = "0.28.0"
@ -2190,6 +2254,25 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed"
[[package]]
name = "image"
version = "0.23.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24ffcb7e7244a9bf19d35bf2883b9c080c4ced3c07a9895572178cdb8f13f6a1"
dependencies = [
"bytemuck",
"byteorder",
"color_quant",
"gif",
"jpeg-decoder",
"num-iter",
"num-rational",
"num-traits",
"png",
"scoped_threadpool",
"tiff",
]
[[package]]
name = "impl-more"
version = "0.1.6"
@ -2265,6 +2348,15 @@ dependencies = [
"libc",
]
[[package]]
name = "jpeg-decoder"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "229d53d58899083193af11e15917b5640cd40b29ff475a1fe4ef725deb02d0f2"
dependencies = [
"rayon",
]
[[package]]
name = "js-sys"
version = "0.3.64"
@ -2471,6 +2563,15 @@ version = "2.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c"
[[package]]
name = "memoffset"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
dependencies = [
"autocfg",
]
[[package]]
name = "mime"
version = "0.3.17"
@ -2502,6 +2603,25 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "791daaae1ed6889560f8c4359194f56648355540573244a5448a83ba1ecc7435"
dependencies = [
"adler32",
]
[[package]]
name = "miniz_oxide"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
dependencies = [
"adler",
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.7.1"
@ -2625,6 +2745,17 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.16"
@ -2955,6 +3086,18 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "png"
version = "0.16.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c3287920cb847dee3de33d301c463fba14dda99db24214ddf93f83d3021f4c6"
dependencies = [
"bitflags 1.3.2",
"crc32fast",
"deflate",
"miniz_oxide 0.3.7",
]
[[package]]
name = "polyval"
version = "0.6.1"
@ -3153,6 +3296,26 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "rayon"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "rcgen"
version = "0.10.0"
@ -3604,6 +3767,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "scoped_threadpool"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d51f5df5af43ab3f1360b429fa5e0152ac5ce8c0bd6485cae490332e96846a8"
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -4367,6 +4536,17 @@ dependencies = [
"once_cell",
]
[[package]]
name = "tiff"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a53f4706d65497df0c4349241deddf35f84cee19c87ed86ea8ca590f4464437"
dependencies = [
"jpeg-decoder",
"miniz_oxide 0.4.4",
"weezl",
]
[[package]]
name = "time"
version = "0.3.28"
@ -5017,6 +5197,12 @@ version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "weezl"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9193164d4de03a926d909d3bc7c30543cecb35400c02114792c2cae20d5e2dbb"
[[package]]
name = "whoami"
version = "1.4.1"

View file

@ -90,6 +90,7 @@ dotenv = "0.15.0"
scraper = "0.17.1"
client-api = { path = "libs/client-api", features = ["collab-sync"] }
opener = "0.6.1"
image = "0.23.14"
[[bin]]
name = "appflowy_cloud"

View file

@ -38,6 +38,7 @@ collab-entity = { version = "0.1.0" }
yrs = { version = "0.16.5", optional = true }
lib0 = { version = "0.16.3", features = ["lib0-serde"], optional = true }
realtime-entity = { workspace = true }
mime_guess = "2.0.4"
[features]

View file

@ -2,9 +2,9 @@ use crate::notify::{ClientToken, TokenStateReceiver};
use anyhow::{anyhow, Context};
use bytes::Bytes;
use database_entity::{
AFBlobRecord, AFCollabMember, AFCollabMembers, AFUserProfileView, AFWorkspaceMember,
BatchQueryCollabParams, BatchQueryCollabResult, CollabMemberIdentify, InsertCollabMemberParams,
InsertCollabParams, QueryCollabMembers, UpdateCollabMemberParams,
AFBlobMetadata, AFBlobRecord, AFCollabMember, AFCollabMembers, AFUserProfileView,
AFWorkspaceMember, BatchQueryCollabParams, BatchQueryCollabResult, CollabMemberIdentify,
InsertCollabMemberParams, InsertCollabParams, QueryCollabMembers, UpdateCollabMemberParams,
};
use database_entity::{AFWorkspaces, QueryCollabParams};
use database_entity::{DeleteCollabParams, RawData};
@ -27,14 +27,18 @@ use shared_entity::data::AppResponse;
use shared_entity::dto::auth_dto::SignInTokenResponse;
use shared_entity::dto::auth_dto::UpdateUsernameParams;
use shared_entity::dto::workspace_dto::{
CreateWorkspaceMembers, WorkspaceMemberChangeset, WorkspaceMembers,
CreateWorkspaceMembers, WorkspaceBlobMetadata, WorkspaceMemberChangeset, WorkspaceMembers,
WorkspaceSpaceUsage,
};
use shared_entity::error_code::url_missing_param;
use shared_entity::error_code::ErrorCode;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tracing::instrument;
use url::Url;
use uuid::Uuid;
/// `Client` is responsible for managing communication with the GoTrue API and cloud storage.
@ -713,13 +717,13 @@ impl Client {
Ok(format!("{}/{}/{}", self.ws_addr, access_token, device_id))
}
pub async fn put_file<T: Into<Bytes>, M: ToString>(
pub async fn put_blob<T: Into<Bytes>, M: ToString>(
&self,
workspace_id: &str,
data: T,
mime: M,
) -> Result<String, AppError> {
let url = format!("{}/api/file_storage/{}", self.base_url, workspace_id);
let url = format!("{}/api/file_storage/{}/blob", self.base_url, workspace_id);
let data = data.into();
let content_length = data.len();
let resp = self
@ -734,21 +738,43 @@ impl Client {
.await?
.into_data()?;
Ok(format!(
"{}/api/file_storage/{}/{}",
"{}/api/file_storage/{}/blob/{}",
self.base_url, workspace_id, record.file_id
))
}
pub async fn put_blob_with_path(
&self,
workspace_id: &str,
file_path: &str,
) -> Result<String, AppError> {
if file_path.is_empty() {
return Err(AppError::new(
ErrorCode::InvalidRequestParams,
"path is empty",
));
}
let mut file = File::open(&file_path).await?;
let mime = mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
self.put_blob(workspace_id, buffer, mime).await
}
/// Only expose this method for testing
#[cfg(debug_assertions)]
pub async fn put_file_with_content_length<T: Into<Bytes>>(
pub async fn put_blob_with_content_length<T: Into<Bytes>>(
&self,
workspace_id: &str,
data: T,
mime: &Mime,
content_length: usize,
) -> Result<AFBlobRecord, AppError> {
let url = format!("{}/api/file_storage/{}", self.base_url, workspace_id);
let url = format!("{}/api/file_storage/{}/blob", self.base_url, workspace_id);
let resp = self
.http_client_with_auth(Method::PUT, &url)
.await?
@ -764,10 +790,10 @@ impl Client {
/// Get the file with the given url. The url should be in the format of
/// `https://appflowy.io/api/file_storage/<workspace_id>/<file_id>`.
pub async fn get_file(&self, url: &str) -> Result<Bytes, AppError> {
Url::parse(url)?;
pub async fn get_blob<T: AsRef<str>>(&self, url: T) -> Result<Bytes, AppError> {
Url::parse(url.as_ref())?;
let resp = self
.http_client_with_auth(Method::GET, url)
.http_client_with_auth(Method::GET, url.as_ref())
.await?
.send()
.await?;
@ -789,7 +815,19 @@ impl Client {
}
}
pub async fn delete_file(&self, url: &str) -> Result<(), AppError> {
pub async fn get_blob_metadata<T: AsRef<str>>(&self, url: T) -> Result<AFBlobMetadata, AppError> {
let resp = self
.http_client_with_auth(Method::GET, url.as_ref())
.await?
.send()
.await?;
AppResponse::<AFBlobMetadata>::from_response(resp)
.await?
.into_data()
}
pub async fn delete_blob(&self, url: &str) -> Result<(), AppError> {
let resp = self
.http_client_with_auth(Method::DELETE, url)
.await?
@ -798,6 +836,36 @@ impl Client {
AppResponse::<()>::from_response(resp).await?.into_error()
}
pub async fn get_workspace_usage(
&self,
workspace_id: &str,
) -> Result<WorkspaceSpaceUsage, AppError> {
let url = format!("{}/api/file_storage/{}/usage", self.base_url, workspace_id);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.send()
.await?;
AppResponse::<WorkspaceSpaceUsage>::from_response(resp)
.await?
.into_data()
}
pub async fn get_workspace_all_blob_metadata(
&self,
workspace_id: &str,
) -> Result<WorkspaceBlobMetadata, AppError> {
let url = format!("{}/api/file_storage/{}/blobs", self.base_url, workspace_id);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.send()
.await?;
AppResponse::<WorkspaceBlobMetadata>::from_response(resp)
.await?
.into_data()
}
async fn http_client_with_auth(
&self,
method: Method,

View file

@ -347,12 +347,6 @@ impl AFBlobRecord {
}
}
impl AFBlobMetadata {
pub fn s3_path(&self) -> String {
format!("{}/{}", self.workspace_id, self.file_id)
}
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub enum QueryCollabResult {
Success { blob: RawData },

View file

@ -12,6 +12,7 @@ use database_entity::AFBlobMetadata;
use sqlx::PgPool;
use tokio::io::AsyncRead;
use tracing::{event, instrument};
use uuid::Uuid;
/// Maximum size of a blob in bytes.
@ -54,6 +55,7 @@ where
Self { client, pg_pool }
}
#[instrument(skip_all, err)]
pub async fn put_blob<R>(
&self,
blob_stream: R,
@ -65,15 +67,22 @@ where
R: AsyncRead + Unpin,
{
let (blob, file_id) = BlobStreamReader::new(blob_stream).finish().await?;
debug_assert!(blob.len() == file_size as usize);
// check file is exist or not
if is_blob_metadata_exists(&self.pg_pool, &workspace_id, &file_id).await? {
event!(tracing::Level::TRACE, "file:{} is already exist", file_id);
return Ok(file_id);
}
// query the storage space of the workspace
let usage = get_workspace_usage_size(&self.pg_pool, &workspace_id).await?;
event!(
tracing::Level::TRACE,
"workspace consumed space: {}, new file:{} with size: {}",
usage,
file_id,
file_size
);
if usage > MAX_USAGE {
return Err(DatabaseError::StorageSpaceNotEnough);
}
@ -90,7 +99,13 @@ where
)
.await
{
// ff the metadata is not saved, delete the blob.
event!(
tracing::Level::ERROR,
"failed to save metadata, file_id: {}, err: {}",
file_id,
err
);
// if the metadata is not saved, delete the blob.
self.client.delete_blob(&file_id).await?;
return Err(err);
}

View file

@ -99,6 +99,47 @@ pub async fn get_blob_metadata(
Ok(metadata)
}
/// Return all blob metadata of a workspace
#[instrument(level = "trace", skip_all, err)]
pub async fn get_all_workspace_blob_metadata(
pg_pool: &PgPool,
workspace_id: &Uuid,
) -> Result<Vec<AFBlobMetadata>, DatabaseError> {
let all_metadata = sqlx::query_as!(
AFBlobMetadata,
r#"
SELECT * FROM af_blob_metadata
WHERE workspace_id = $1
"#,
workspace_id,
)
.fetch_all(pg_pool)
.await?;
Ok(all_metadata)
}
/// Return all blob ids of a workspace
#[instrument(level = "trace", skip_all, err)]
pub async fn get_all_workspace_blob_ids(
pg_pool: &PgPool,
workspace_id: &Uuid,
) -> Result<Vec<String>, DatabaseError> {
let file_ids = sqlx::query!(
r#"
SELECT file_id FROM af_blob_metadata
WHERE workspace_id = $1
"#,
workspace_id
)
.fetch_all(pg_pool)
.await?
.into_iter()
.map(|record| record.file_id)
.collect();
Ok(file_ids)
}
/// Return the total size of a workspace in bytes
#[instrument(level = "trace", skip_all, err)]
pub async fn get_workspace_usage_size(
pool: &PgPool,

View file

@ -159,6 +159,12 @@ impl From<SystemTimeError> for AppError {
}
}
impl From<std::io::Error> for AppError {
fn from(value: std::io::Error) -> Self {
AppError::new(ErrorCode::IO, value.to_string())
}
}
#[cfg(feature = "cloud")]
impl From<s3::error::S3Error> for AppError {
fn from(value: s3::error::S3Error) -> Self {

View file

@ -1,9 +1,10 @@
use database_entity::AFRole;
use database_entity::{AFBlobMetadata, AFRole};
use serde::{Deserialize, Serialize};
use std::ops::Deref;
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Deserialize, Serialize)]
pub struct WorkspaceMembers(pub Vec<WorkspaceMember>);
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Deserialize, Serialize)]
pub struct WorkspaceMember(pub String);
impl Deref for WorkspaceMember {
type Target = String;
@ -18,7 +19,7 @@ impl From<Vec<String>> for WorkspaceMembers {
}
}
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Deserialize, Serialize)]
pub struct CreateWorkspaceMembers(pub Vec<CreateWorkspaceMember>);
impl From<Vec<CreateWorkspaceMember>> for CreateWorkspaceMembers {
fn from(value: Vec<CreateWorkspaceMember>) -> Self {
@ -26,13 +27,13 @@ impl From<Vec<CreateWorkspaceMember>> for CreateWorkspaceMembers {
}
}
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Deserialize, Serialize)]
pub struct CreateWorkspaceMember {
pub email: String,
pub role: AFRole,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Deserialize, Serialize)]
pub struct WorkspaceMemberChangeset {
pub email: String,
pub role: Option<AFRole>,
@ -58,3 +59,12 @@ impl WorkspaceMemberChangeset {
self
}
}
#[derive(Deserialize, Serialize)]
pub struct WorkspaceSpaceUsage {
pub total_capacity: u64,
pub consumed_capacity: u64,
}
#[derive(Serialize, Deserialize)]
pub struct WorkspaceBlobMetadata(pub Vec<AFBlobMetadata>);

View file

@ -66,6 +66,9 @@ pub enum ErrorCode {
#[error("Payload too large")]
PayloadTooLarge = 1016,
#[error("io error")]
IO = 1017,
}
/// Implements conversion from `anyhow::Error` to `ErrorCode`.

View file

@ -10,46 +10,59 @@ use actix_web::{
};
use actix_web::{HttpResponse, Result};
use chrono::DateTime;
use database::file::MAX_BLOB_SIZE;
use database_entity::AFBlobRecord;
use database::file::{MAX_BLOB_SIZE, MAX_USAGE};
use database::resource_usage::{get_all_workspace_blob_metadata, get_workspace_usage_size};
use database_entity::{AFBlobMetadata, AFBlobRecord};
use serde::Deserialize;
use shared_entity::app_error::AppError;
use shared_entity::data::{AppResponse, JsonAppResponse};
use shared_entity::dto::workspace_dto::{WorkspaceBlobMetadata, WorkspaceSpaceUsage};
use shared_entity::error_code::ErrorCode;
use sqlx::types::Uuid;
use std::pin::Pin;
use tokio::io::AsyncRead;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
use tracing::{instrument, trace};
use tracing::{event, instrument};
use tracing_actix_web::RequestId;
use crate::state::AppState;
pub fn file_storage_scope() -> Scope {
web::scope("/api/file_storage")
.service(web::resource("/{workspace_id}").route(web::put().to(put_handler)))
.service(web::resource("/{workspace_id}/blob").route(web::put().to(put_blob_handler)))
.service(
web::resource("/{workspace_id}/{file_id:.*}")
.route(web::get().to(get_handler))
.route(web::delete().to(delete_handler)),
web::resource("/{workspace_id}/blob/{file_id:.*}")
.route(web::get().to(get_blob_handler))
.route(web::delete().to(delete_blob_handler)),
)
.service(
web::resource("/{workspace_id}/metadata/{file_id:.*}")
.route(web::get().to(get_blob_metadata_handler)),
)
.service(
web::resource("/{workspace_id}/usage").route(web::get().to(get_workspace_usage_handler)),
)
.service(
web::resource("/{workspace_id}/blobs")
.route(web::get().to(get_all_workspace_blob_metadata_handler)),
)
}
#[derive(Deserialize, Debug)]
pub struct BlobPathInfo {
struct PathInfo {
workspace_id: Uuid,
file_id: String,
}
#[instrument(skip(state, payload), err)]
async fn put_handler(
async fn put_blob_handler(
state: Data<AppState>,
payload: Payload,
content_type: web::Header<ContentType>,
content_length: web::Header<ContentLength>,
workspace_id: web::Path<Uuid>,
required_id: RequestId,
request_id: RequestId,
) -> Result<JsonAppResponse<AFBlobRecord>> {
let content_length = content_length.into_inner().into_inner();
// Check content length, if it's too large, return error.
@ -66,7 +79,12 @@ async fn put_handler(
let blob_stream = payload_to_async_read(payload);
let workspace_id = workspace_id.into_inner();
trace!("start put blob: {}:{}", file_type, content_length);
event!(
tracing::Level::TRACE,
"start put blob: {}:{}",
file_type,
content_length
);
let file_id = state
.bucket_storage
.put_blob(blob_stream, workspace_id, file_type, content_length as i64)
@ -74,19 +92,20 @@ async fn put_handler(
.map_err(AppError::from)?;
let record = AFBlobRecord::new(file_id);
trace!("did put blob: {:?}", record);
event!(tracing::Level::TRACE, "did put blob: {:?}", record);
Ok(Json(AppResponse::Ok().with_data(record)))
}
#[instrument(level = "debug", skip(state), err)]
async fn delete_handler(
async fn delete_blob_handler(
state: Data<AppState>,
path: web::Path<BlobPathInfo>,
path: web::Path<PathInfo>,
) -> Result<JsonAppResponse<()>> {
let BlobPathInfo {
let PathInfo {
workspace_id,
file_id,
} = path.into_inner();
state
.bucket_storage
.delete_blob(&workspace_id, &file_id)
@ -96,13 +115,13 @@ async fn delete_handler(
}
#[instrument(skip(state), err)]
async fn get_handler(
async fn get_blob_handler(
state: Data<AppState>,
path: web::Path<BlobPathInfo>,
required_id: RequestId,
path: web::Path<PathInfo>,
request_id: RequestId,
req: HttpRequest,
) -> Result<HttpResponse<BoxBody>> {
let BlobPathInfo {
let PathInfo {
workspace_id,
file_id,
} = path.into_inner();
@ -150,7 +169,57 @@ async fn get_handler(
Ok(response)
}
fn payload_to_async_read(payload: actix_web::web::Payload) -> Pin<Box<dyn AsyncRead>> {
#[instrument(skip(state), err)]
async fn get_blob_metadata_handler(
state: Data<AppState>,
path: web::Path<PathInfo>,
) -> Result<JsonAppResponse<AFBlobMetadata>> {
let PathInfo {
workspace_id,
file_id,
} = path.into_inner();
// Get the metadata
let metadata = state
.bucket_storage
.get_blob_metadata(&workspace_id, &file_id)
.await
.map_err(AppError::from)?;
Ok(Json(AppResponse::Ok().with_data(metadata)))
}
#[instrument(level = "debug", skip(state), err)]
async fn get_workspace_usage_handler(
state: Data<AppState>,
workspace_id: web::Path<Uuid>,
) -> Result<JsonAppResponse<WorkspaceSpaceUsage>> {
let current = get_workspace_usage_size(&state.pg_pool, &workspace_id)
.await
.map_err(AppError::from)?;
let usage = WorkspaceSpaceUsage {
consumed_capacity: current,
total_capacity: MAX_USAGE,
};
Ok(AppResponse::Ok().with_data(usage).into())
}
// TODO(nathan): implement pagination
#[instrument(level = "debug", skip(state), err)]
async fn get_all_workspace_blob_metadata_handler(
state: Data<AppState>,
workspace_id: web::Path<Uuid>,
) -> Result<JsonAppResponse<WorkspaceBlobMetadata>> {
let workspace_blob_metadata = get_all_workspace_blob_metadata(&state.pg_pool, &workspace_id)
.await
.map_err(AppError::from)?;
Ok(
AppResponse::Ok()
.with_data(WorkspaceBlobMetadata(workspace_blob_metadata))
.into(),
)
}
fn payload_to_async_read(payload: Payload) -> Pin<Box<dyn AsyncRead>> {
let mapped =
payload.map(|chunk| chunk.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
let reader = StreamReader::new(mapped);

View file

@ -1 +0,0 @@
mod file_test;

View file

@ -1,11 +1,8 @@
use client_api::Client;
mod collab;
mod gotrue;
mod user;
mod util;
mod file_storage;
mod websocket;
mod workspace;

View file

@ -1,4 +1,5 @@
use assert_json_diff::assert_json_eq;
use bytes::Bytes;
use client_api::collab_sync::{SinkConfig, SyncObject, SyncPlugin};
use client_api::ws::{BusinessID, WSClient, WSClientConfig};
use collab::core::collab::MutexCollab;
@ -7,14 +8,21 @@ use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::Collab;
use collab_entity::CollabType;
use database_entity::{
AFAccessLevel, AFRole, InsertCollabMemberParams, QueryCollabParams, UpdateCollabMemberParams,
AFAccessLevel, AFBlobMetadata, AFRole, InsertCollabMemberParams, QueryCollabParams,
UpdateCollabMemberParams,
};
use image::io::Reader as ImageReader;
use serde_json::Value;
use shared_entity::app_error::AppError;
use shared_entity::dto::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset};
use shared_entity::dto::workspace_dto::{
CreateWorkspaceMember, WorkspaceMemberChangeset, WorkspaceSpaceUsage,
};
use sqlx::types::Uuid;
use std::collections::HashMap;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::tempdir;
use tokio::time::{timeout, Duration};
use tokio_stream::StreamExt;
@ -199,6 +207,46 @@ impl TestClient {
}
}
pub async fn download_blob<T: AsRef<str>>(&self, url: T) -> Vec<u8> {
self.api_client.get_blob(url).await.unwrap().to_vec()
}
#[allow(dead_code)]
pub async fn get_blob_metadata<T: AsRef<str>>(&self, url: T) -> AFBlobMetadata {
self.api_client.get_blob_metadata(url).await.unwrap()
}
pub async fn upload_blob<T: Into<Bytes>, M: ToString>(&self, data: T, mime: M) -> String {
let workspace_id = self.workspace_id().await;
self
.api_client
.put_blob(&workspace_id, data, mime)
.await
.unwrap()
}
pub async fn upload_file_with_path(&self, path: &str) -> String {
let workspace_id = self.workspace_id().await;
self
.api_client
.put_blob_with_path(&workspace_id, path)
.await
.unwrap()
}
pub async fn delete_file(&self, url: &str) {
self.api_client.delete_blob(url).await.unwrap();
}
pub async fn get_workspace_usage(&self) -> WorkspaceSpaceUsage {
let workspace_id = self.workspace_id().await;
self
.api_client
.get_workspace_usage(&workspace_id)
.await
.unwrap()
}
pub(crate) async fn workspace_id(&self) -> String {
self
.api_client
@ -426,3 +474,52 @@ pub async fn get_collab_json_from_server(
.unwrap()
.to_json_value()
}
pub fn generate_temp_file_path<T: AsRef<Path>>(file_name: T) -> TestTempFile {
let mut path = tempdir().unwrap().into_path();
path.push(file_name);
TestTempFile(path)
}
pub struct TestTempFile(PathBuf);
impl TestTempFile {
fn cleanup(dir: &PathBuf) {
let _ = std::fs::remove_dir_all(dir);
}
}
impl AsRef<Path> for TestTempFile {
fn as_ref(&self) -> &Path {
&self.0
}
}
impl Deref for TestTempFile {
type Target = PathBuf;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Drop for TestTempFile {
fn drop(&mut self) {
Self::cleanup(&self.0)
}
}
pub fn assert_image_equal<P: AsRef<Path>>(path1: P, path2: P) {
let img1 = ImageReader::open(path1)
.unwrap()
.decode()
.unwrap()
.into_rgba8();
let img2 = ImageReader::open(path2)
.unwrap()
.decode()
.unwrap()
.into_rgba8();
assert_eq!(img1, img2)
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View file

@ -0,0 +1,2 @@
mod put_and_get;
mod usage;

View file

@ -2,12 +2,15 @@ use crate::collab::workspace_id_from_client;
use reqwest::Url;
use shared_entity::error_code::ErrorCode;
use std::path::Path;
use crate::user::utils::generate_unique_registered_user_client;
use crate::util::test_client::{assert_image_equal, generate_temp_file_path, TestClient};
#[tokio::test]
async fn get_but_not_exists() {
let (c1, _user1) = generate_unique_registered_user_client().await;
let err = c1.get_file("not_exists_file_id").await.unwrap_err();
let err = c1.get_blob("not_exists_file_id").await.unwrap_err();
assert_eq!(err.code, ErrorCode::InvalidUrl);
}
@ -17,16 +20,16 @@ async fn put_and_get() {
let workspace_id = workspace_id_from_client(&c1).await;
let mime = mime::TEXT_PLAIN_UTF_8;
let data = "hello world";
let file_url = c1.put_file(&workspace_id, data, &mime).await.unwrap();
let file_url = c1.put_blob(&workspace_id, data, &mime).await.unwrap();
let url = Url::parse(&file_url).unwrap();
let file_id = url.path_segments().unwrap().last().unwrap();
assert_eq!(file_id, "uU0nuZNNPgilLlLX2n2r-sSE7-N6U4DukIj3rOLvzek=");
let got_data = c1.get_file(&file_url).await.unwrap();
let got_data = c1.get_blob(&file_url).await.unwrap();
assert_eq!(got_data, data.as_bytes());
c1.delete_file(&file_url).await.unwrap();
c1.delete_blob(&file_url).await.unwrap();
}
#[tokio::test]
@ -35,7 +38,7 @@ async fn put_giant_file() {
let workspace_id = workspace_id_from_client(&c1).await;
let mime = mime::TEXT_PLAIN_UTF_8;
let error = c1
.put_file_with_content_length(&workspace_id, "123", &mime, 10 * 1024 * 1024 * 1024)
.put_blob_with_content_length(&workspace_id, "123", &mime, 10 * 1024 * 1024 * 1024)
.await
.unwrap_err();
@ -49,17 +52,17 @@ async fn put_and_put_and_get() {
let mime = mime::TEXT_PLAIN_UTF_8;
let data1 = "my content 1";
let data2 = "my content 2";
let url_1 = c1.put_file(&workspace_id, data1, &mime).await.unwrap();
let url_2 = c1.put_file(&workspace_id, data2, &mime).await.unwrap();
let url_1 = c1.put_blob(&workspace_id, data1, &mime).await.unwrap();
let url_2 = c1.put_blob(&workspace_id, data2, &mime).await.unwrap();
let got_data = c1.get_file(&url_1).await.unwrap();
let got_data = c1.get_blob(&url_1).await.unwrap();
assert_eq!(got_data, data1.as_bytes());
let got_data = c1.get_file(&url_2).await.unwrap();
let got_data = c1.get_blob(&url_2).await.unwrap();
assert_eq!(got_data, data2.as_bytes());
c1.delete_file(&url_1).await.unwrap();
c1.delete_file(&url_2).await.unwrap();
c1.delete_blob(&url_1).await.unwrap();
c1.delete_blob(&url_2).await.unwrap();
}
#[tokio::test]
@ -68,9 +71,29 @@ async fn put_delete_get() {
let workspace_id = workspace_id_from_client(&c1).await;
let mime = mime::TEXT_PLAIN_UTF_8;
let data = "my contents";
let url = c1.put_file(&workspace_id, data, &mime).await.unwrap();
c1.delete_file(&url).await.unwrap();
let url = c1.put_blob(&workspace_id, data, &mime).await.unwrap();
c1.delete_blob(&url).await.unwrap();
let err = c1.get_file(&url).await.unwrap_err();
let err = c1.get_blob(&url).await.unwrap_err();
assert_eq!(err.code, ErrorCode::RecordNotFound);
}
#[tokio::test]
async fn put_and_download_png() {
let image_path = "tests/workspace/blob/asset/16kb_logo.png";
let client = TestClient::new_user_without_ws_conn().await;
let url = client.upload_file_with_path(image_path).await;
let url = Url::parse(&url).unwrap();
let file_id = url.path_segments().unwrap().last().unwrap();
assert_eq!(file_id, "u_XUU5snbo_IDNQlqacesNyT6LVkxBKbnk3d82oq1og=");
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 15694);
// download the image and check the content is equal to the original file
let temp_file = generate_temp_file_path("temp.png");
std::fs::write(&temp_file, client.download_blob(&url).await).unwrap();
assert_image_equal(Path::new(image_path), &temp_file);
}

View file

@ -0,0 +1,32 @@
use crate::util::test_client::TestClient;
#[tokio::test]
async fn workspace_usage_put_blob_test() {
let client = TestClient::new_user_without_ws_conn().await;
let mime = mime::TEXT_PLAIN_UTF_8;
let file_1 = client.upload_blob("123", &mime).await;
let file_2 = client.upload_blob("456", &mime).await;
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 6);
// after the test, delete the files
client.delete_file(&file_1).await;
client.delete_file(&file_2).await;
}
#[tokio::test]
async fn workspace_usage_put_and_then_delete_blob_test() {
let client = TestClient::new_user_without_ws_conn().await;
let mime = mime::TEXT_PLAIN_UTF_8;
let file_1 = client.upload_blob("123", &mime).await;
let file_2 = client.upload_blob("456", &mime).await;
client.delete_file(&file_1).await;
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 3);
client.delete_file(&file_2).await;
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 0);
}

View file

@ -1 +1,2 @@
mod blob;
mod member_crud;