feat: revamp storage api (#251)

* feat: revamp storage api

* feat: client api add method to generate url

* feat: remove some unused deps

* feat: upgrade deps

* feat: add mime to resp
This commit is contained in:
Zack 2024-01-11 00:32:11 +08:00 committed by GitHub
parent 284e2bf638
commit 97d1bb532c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 738 additions and 824 deletions

View file

@ -1,49 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_blob_metadata\n (workspace_id, file_id, file_type, file_size)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (workspace_id, file_id) DO UPDATE SET\n file_type = $3,\n file_size = $4\n RETURNING *\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "workspace_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "file_id",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "file_type",
"type_info": "Varchar"
},
{
"ordinal": 3,
"name": "file_size",
"type_info": "Int8"
},
{
"ordinal": 4,
"name": "modified_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid",
"Varchar",
"Varchar",
"Int8"
]
},
"nullable": [
false,
false,
false,
false,
false
]
},
"hash": "23d43a5e70852dbad75e2b5cbcc7439d4177ea04518eee95f0aadbfa3520a1d9"
}

View file

@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_blob_metadata\n (workspace_id, file_id, file_type, file_size)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (workspace_id, file_id) DO UPDATE SET\n file_type = $3,\n file_size = $4\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Varchar",
"Varchar",
"Int8"
]
},
"nullable": []
},
"hash": "6cc4a7da11a37413c9951983ee3f30de933cc6357a66c8e10366fde27acaefea"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM af_blob_metadata\n WHERE workspace_id = $1 \n ",
"query": "\n SELECT * FROM af_blob_metadata\n WHERE workspace_id = $1\n ",
"describe": {
"columns": [
{
@ -42,5 +42,5 @@
false
]
},
"hash": "022434c877aab8231e2728d917793c1296caa5762734f27be281f977685474f9"
"hash": "74de473589a405c3ab567e72a881869321095e2de497b2c1866c547f939c359c"
}

View file

@ -1,47 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM af_blob_metadata\n WHERE workspace_id = $1 AND file_id = $2\n RETURNING *\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "workspace_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "file_id",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "file_type",
"type_info": "Varchar"
},
{
"ordinal": 3,
"name": "file_size",
"type_info": "Int8"
},
{
"ordinal": 4,
"name": "modified_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": [
false,
false,
false,
false,
false
]
},
"hash": "ce5d380d59bdc8381c370c1c3ac4ad8323677d22657b6a1223b02dac5c9d5371"
}

View file

@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM af_blob_metadata\n WHERE workspace_id = $1 AND file_id = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": []
},
"hash": "d0a24b554fe420d7ebf856ae7f1525aff3695fc97e2f43041dc54a4e62a88746"
}

586
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,23 +7,22 @@ edition = "2021"
[dependencies]
actix = "0.13"
actix-web = { version = "4.3.1", features = ["openssl"] }
actix-http = "3.3.1"
actix-web = { version = "4.4.1", features = ["openssl"] }
actix-http = "3.5.1"
actix-rt = "2"
actix-web-actors = { version = "4.2.0" }
actix-service = "2.0.2"
actix-identity = "0.6.0"
actix-cors = "0.6.4"
actix-router = "0.5.1"
actix-cors = "0.6.5"
actix-router = "0.5.2"
actix-session = { version = "0.8", features = ["redis-rs-tls-session"] }
openssl = { version = "0.10.45", features = ["vendored"] }
openssl = { version = "0.10.62", features = ["vendored"] }
# serde
serde_json.workspace = true
serde.workspace = true
serde-aux = "4.1.2"
tokio = { version = "1.26.0", features = [
tokio = { version = "1.35.1", features = [
"macros",
"rt-multi-thread",
"sync",
@ -31,41 +30,39 @@ tokio = { version = "1.26.0", features = [
"time",
] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.9", features = ["io"] }
futures = "0.3.17"
futures-util ={ version = "0.3.26" , features = ["std","io"] }
once_cell = "1.13.0"
chrono = { version = "0.4.23", features = ["serde", "clock"], default-features = false }
tokio-util = { version = "0.7.10", features = ["io"] }
futures = "0.3.30"
futures-util ={ version = "0.3.30" , features = ["std","io"] }
once_cell = "1.19.0"
chrono = { version = "0.4.31", features = ["serde", "clock"], default-features = false }
derive_more = { version = "0.99" }
argon2 = { version = "0.5", features = ["std"] }
secrecy = { version = "0.8", features = ["serde"] }
rand = { version = "0.8", features = ["std_rng"] }
anyhow = "1.0.40"
thiserror = "1.0.24"
reqwest = { version = "0.11.20", default-features = false, features = ["json", "rustls-tls", "cookies"] }
unicode-segmentation = "1.0"
anyhow = "1.0.79"
thiserror = "1.0.56"
reqwest = { version = "0.11.23", default-features = false, features = ["json", "rustls-tls", "cookies"] }
unicode-segmentation = "1.10"
lazy_static = "1.4.0"
fancy-regex = "0.11.0"
validator = "0.16.0"
bytes = "1.4.0"
validator = "0.16.1"
bytes = "1.5.0"
rcgen = { version = "0.10.0", features = ["pem", "x509-parser"] }
mime = "0.3.17"
rust-s3 = {version = "0.33.0", default-features = false, features = ["tokio-rustls-tls", "with-tokio", "no-verify-ssl"] }
redis = "0.23.3"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["registry", "env-filter", "ansi", "json"] }
tracing-bunyan-formatter = "0.3.6"
tracing-actix-web = "0.7"
tracing-log = "0.1.1"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter", "ansi", "json"] }
tracing-bunyan-formatter = "0.3.9"
tracing-log = "0.1.4"
sqlx = { version = "0.7", default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono", "migrate"] }
async-trait = "0.1.73"
async-trait = "0.1.77"
prometheus-client = "0.22.0"
itertools = "0.11"
axum_session = "0.7.0"
uuid = "1.4.1"
uuid = "1.6.1"
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
prost = "0.12.1"
casbin = { version = "2.0.9" }
prost = "0.12.3"
casbin = { version = "2.1.0" }
dotenvy = "0.15.7"
url = "2.5.0"
brotli = "3.4.0"
@ -90,8 +87,8 @@ realtime-entity.workspace = true
[dev-dependencies]
once_cell = "1.7.2"
tempfile = "3.4.0"
once_cell = "1.19.0"
tempfile = "3.9.0"
assert-json-diff = "2.0.2"
scraper = "0.17.1"
client-api = { path = "libs/client-api", features = ["collab-sync", "test_util"] }
@ -131,15 +128,15 @@ realtime-entity = { path = "libs/realtime-entity" }
realtime-protocol = { path = "libs/realtime-protocol" }
database-entity = { path = "libs/database-entity" }
app-error = { path = "libs/app_error" }
serde_json = "1.0.108"
serde = { version = "1.0.108", features = ["derive"] }
serde_repr = "0.1.17"
bytes = "1.4.0"
serde_json = "1.0.111"
serde = { version = "1.0.195", features = ["derive"] }
serde_repr = "0.1.18"
bytes = "1.5.0"
workspace-template = { path = "libs/workspace-template" }
uuid = { version = "1.4.1", features = ["v4"] }
anyhow = "1.0.75"
tokio = { version = "1.34", features = ["sync"] }
yrs = "0.17.1"
uuid = { version = "1.6.1", features = ["v4"] }
anyhow = "1.0.79"
tokio = { version = "1.35", features = ["sync"] }
yrs = "0.17.2"
bincode = "1.3.3"
[profile.release]

View file

@ -10,20 +10,20 @@ edition = "2021"
gotrue = { path = "../libs/gotrue" }
gotrue-entity = { path = "../libs/gotrue-entity" }
anyhow = "1.0.75"
anyhow = "1.0.79"
axum = {version = "0.6.20", features = ["json"]}
tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros"] }
tokio = { version = "1.35.1", features = ["rt-multi-thread", "macros"] }
askama = "0.12.1"
axum-extra = { version = "0.8.0", features = ["cookie"] }
serde.workspace = true
serde_json.workspace = true
redis = { version = "0.23.3", features = [ "aio", "tokio-comp", "connection-manager"] }
uuid = { version = "1.4.1", features = ["v4"] }
uuid = { version = "1.6.1", features = ["v4"] }
dotenv = "0.15.0"
reqwest = "0.11.22"
reqwest = "0.11.23"
tower-service = "0.3.2"
tower-http = { version = "0.4.4", features = ["cors", "fs"] }
tower = "0.4.13"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
jwt = "0.16.0"

View file

@ -6,16 +6,16 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
thiserror = "1.0.47"
serde_repr = "0.1.17"
thiserror = "1.0.56"
serde_repr = "0.1.18"
serde.workspace = true
anyhow = "1.0.75"
uuid = { version = "1.4.1", features = ["v4"] }
anyhow = "1.0.79"
uuid = { version = "1.6.1", features = ["v4"] }
sqlx = { version = "0.7", default-features = false, features = ["postgres", "json"], optional = true }
validator = { version = "0.16", optional = true }
rust-s3 = { version = "0.33.0", optional = true }
url = { version = "2.4.1"}
actix-web = { version = "4.3.1", optional = true }
url = { version = "2.5.0"}
actix-web = { version = "4.4.1", optional = true }
reqwest = { version = "0.11" }
serde_json.workspace = true
tokio = { workspace = true, optional = true }

View file

@ -6,15 +6,15 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
reqwest = { version = "0.11.20", default-features = false, features = ["stream","json","multipart"] }
anyhow = "1.0.75"
reqwest = { version = "0.11.23", default-features = false, features = ["stream","json","multipart"] }
anyhow = "1.0.79"
serde_json.workspace = true
serde_repr = "0.1.17"
serde_repr = "0.1.18"
gotrue = { path = "../gotrue" }
gotrue-entity = { path = "../gotrue-entity" }
shared_entity = { path = "../shared-entity" }
database-entity.workspace = true
url = "2.4.1"
url = "2.5.0"
tokio-stream = { version = "0.1.14" }
parking_lot = "0.12.1"
mime = "0.3.17"
@ -23,15 +23,15 @@ brotli = "3.4.0"
# ws
tracing = { version = "0.1" }
thiserror = "1.0.39"
thiserror = "1.0.56"
serde.workspace = true
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
futures-core = "0.3.26"
tokio = { version = "1.35", features = ["full"] }
futures-util = "0.3.30"
futures-core = "0.3.30"
tokio-retry = "0.3"
bytes = "1.0"
uuid = "1.4.1"
bytes = "1.5"
uuid = "1.6.1"
scraper = { version = "0.17.1", optional = true }
# collab sync
@ -42,8 +42,8 @@ realtime-entity = { workspace = true, features = ["tungstenite"] }
realtime-protocol = { workspace = true }
workspace-template = { workspace = true, optional = true }
mime_guess = "2.0.4"
async-trait = { version = "0.1.73" }
prost = "0.12.1"
async-trait = { version = "0.1.77" }
prost = "0.12.3"
bincode = "1.3.3"

View file

@ -41,8 +41,6 @@ use shared_entity::response::{AppResponse, AppResponseError};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_retry::strategy::FixedInterval;
use tokio_retry::RetryIf;
use tokio_tungstenite::tungstenite::Message;
@ -1055,17 +1053,23 @@ impl Client {
Ok(format!("{}/{}/{}", self.ws_addr, access_token, device_id))
}
pub async fn put_blob<T: Into<Bytes>, M: ToString>(
pub fn get_blob_url(&self, workspace_id: &str, file_id: &str) -> String {
format!(
"{}/api/file_storage/{}/blob/{}",
self.base_url, workspace_id, file_id
)
}
pub async fn put_blob<T: Into<Bytes>>(
&self,
workspace_id: &str,
url: &str,
data: T,
mime: M,
) -> Result<String, AppResponseError> {
let url = format!("{}/api/file_storage/{}/blob", self.base_url, workspace_id);
mime: &Mime,
) -> Result<(), AppResponseError> {
let data = data.into();
let content_length = data.len();
let resp = self
.http_client_with_auth(Method::PUT, &url)
.http_client_with_auth(Method::PUT, url)
.await?
.header(header::CONTENT_TYPE, mime.to_string())
.header(header::CONTENT_LENGTH, content_length)
@ -1073,46 +1077,20 @@ impl Client {
.send()
.await?;
log_request_id(&resp);
let record = AppResponse::<AFBlobRecord>::from_response(resp)
.await?
.into_data()?;
Ok(format!(
"{}/api/file_storage/{}/blob/{}",
self.base_url, workspace_id, record.file_id
))
}
pub async fn put_blob_with_path(
&self,
workspace_id: &str,
file_path: &str,
) -> Result<String, AppResponseError> {
if file_path.is_empty() {
return Err(AppError::InvalidRequest("path is empty".to_owned()).into());
}
let mut file = File::open(&file_path).await?;
let mime = mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
self.put_blob(workspace_id, buffer, mime).await
Ok(())
}
/// Only expose this method for testing
#[cfg(debug_assertions)]
pub async fn put_blob_with_content_length<T: Into<Bytes>>(
&self,
workspace_id: &str,
url: &str,
data: T,
mime: &Mime,
content_length: usize,
) -> Result<AFBlobRecord, AppResponseError> {
let url = format!("{}/api/file_storage/{}/blob", self.base_url, workspace_id);
let resp = self
.http_client_with_auth(Method::PUT, &url)
.http_client_with_auth(Method::PUT, url)
.await?
.header(header::CONTENT_TYPE, mime.to_string())
.header(header::CONTENT_LENGTH, content_length)
@ -1127,10 +1105,9 @@ impl Client {
/// Get the file with the given url. The url should be in the format of
/// `https://appflowy.io/api/file_storage/<workspace_id>/<file_id>`.
pub async fn get_blob<T: AsRef<str>>(&self, url: T) -> Result<Bytes, AppResponseError> {
Url::parse(url.as_ref())?;
pub async fn get_blob(&self, url: &str) -> Result<(Mime, Vec<u8>), AppResponseError> {
let resp = self
.http_client_with_auth(Method::GET, url.as_ref())
.http_client_with_auth(Method::GET, url)
.await?
.send()
.await?;
@ -1138,15 +1115,35 @@ impl Client {
match resp.status() {
reqwest::StatusCode::OK => {
// get mime from resp header
let mime = {
match resp.headers().get(header::CONTENT_TYPE) {
Some(v) => match v.to_str() {
Ok(v) => match v.parse::<Mime>() {
Ok(v) => v,
Err(e) => {
tracing::error!("failed to parse mime from header: {:?}", e);
mime::TEXT_PLAIN
},
},
Err(e) => {
tracing::error!("failed to get mime from header: {:?}", e);
mime::TEXT_PLAIN
},
},
None => mime::TEXT_PLAIN,
}
};
let mut stream = resp.bytes_stream();
let mut acc: Vec<u8> = Vec::new();
while let Some(raw_bytes) = stream.next().await {
acc.extend_from_slice(&raw_bytes?);
}
Ok(Bytes::from(acc))
Ok((mime, acc))
},
reqwest::StatusCode::NOT_FOUND => Err(AppResponseError::from(AppError::RecordNotFound(
url.as_ref().to_owned(),
url.to_owned(),
))),
c => Err(AppResponseError::from(AppError::Unhandled(format!(
"status code: {}, message: {}",
@ -1156,12 +1153,9 @@ impl Client {
}
}
pub async fn get_blob_metadata<T: AsRef<str>>(
&self,
url: T,
) -> Result<BlobMetadata, AppResponseError> {
pub async fn get_blob_metadata(&self, url: &str) -> Result<BlobMetadata, AppResponseError> {
let resp = self
.http_client_with_auth(Method::GET, url.as_ref())
.http_client_with_auth(Method::GET, url)
.await?
.send()
.await?;

View file

@ -11,10 +11,10 @@ serde_json.workspace = true
collab-entity = { version = "0.1.0" }
validator = { version = "0.16", features = ["validator_derive", "derive"] }
chrono = {version="0.4",features = ["serde"]}
uuid = { version = "1.4.1", features = ["serde", "v4"] }
thiserror = "1.0.47"
anyhow = "1.0.75"
uuid = { version = "1.6.1", features = ["serde", "v4"] }
thiserror = "1.0.56"
anyhow = "1.0.79"
tracing = "0.1"
serde_repr = "0.1.17"
serde_repr = "0.1.18"
app-error = { workspace = true }
bincode = "1.3.3"
bincode = "1.3.3"

View file

@ -12,24 +12,24 @@ validator = { version = "0.16", features = ["validator_derive", "derive"] }
database-entity.workspace = true
app-error = { workspace = true, features = ["sqlx_error", "validation_error", "s3_error"] }
tokio = { version = "1.26", features = ["sync"] }
async-trait = "0.1.73"
anyhow = "1.0.75"
tokio = { version = "1.35", features = ["sync"] }
async-trait = "0.1.77"
anyhow = "1.0.79"
serde.workspace = true
serde_json.workspace = true
sqlx = { version = "0.7", default-features = false, features = ["postgres", "chrono", "uuid", "macros", "runtime-tokio-rustls", "rust_decimal"] }
tracing = { version = "0.1.37" }
uuid = { version = "1.4.1", features = ["serde", "v4"] }
tracing = { version = "0.1.40" }
uuid = { version = "1.6.1", features = ["serde", "v4"] }
chrono = {version="0.4",features = ["serde"]}
redis = "0.23.3"
futures-util = "0.3.28"
bytes = "1.0"
futures-util = "0.3.30"
bytes = "1.5"
rust-s3 = { version = "0.33.0", optional = true }
sha2 = "0.10.8"
base64 = "0.21.0"
rust_decimal = "1.32.0"
base64 = "0.21.6"
rust_decimal = "1.33.1"
[features]
default = ["s3"]
s3 = ["rust-s3"]
s3 = ["rust-s3"]

View file

@ -16,11 +16,11 @@ pub struct BucketClientS3Impl(s3::Bucket);
impl BucketClient for BucketClientS3Impl {
type ResponseData = S3ResponseData;
async fn put_blob<P>(&self, id: P, blob: Vec<u8>) -> Result<(), AppError>
async fn pub_blob<P>(&self, id: P, content: &[u8]) -> Result<(), AppError>
where
P: AsRef<str> + Send,
{
let code = self.0.put_object(id, &blob).await?.status_code();
let code = self.0.put_object(id, content).await?.status_code();
check_s3_status_code(code)?;
Ok(())
}

View file

@ -1,4 +1,3 @@
use crate::file::utils::BlobStreamReader;
use crate::pg_row::AFBlobMetadataRow;
use crate::resource_usage::{
delete_blob_metadata, get_blob_metadata, get_workspace_usage_size, insert_blob_metadata,
@ -7,8 +6,7 @@ use crate::resource_usage::{
use app_error::AppError;
use async_trait::async_trait;
use sqlx::PgPool;
use tokio::io::AsyncRead;
use tracing::{event, instrument};
use tracing::{event, instrument, warn};
use uuid::Uuid;
/// Maximum size of a blob in bytes.
@ -23,7 +21,7 @@ pub trait ResponseBlob {
pub trait BucketClient {
type ResponseData: ResponseBlob;
async fn put_blob<P>(&self, id: P, blob: Vec<u8>) -> Result<(), AppError>
async fn pub_blob<P>(&self, id: P, content: &[u8]) -> Result<(), AppError>
where
P: AsRef<str> + Send;
@ -51,71 +49,59 @@ where
#[instrument(skip_all, err)]
#[inline]
pub async fn put_blob<R>(
pub async fn put_blob(
&self,
blob_stream: R,
workspace_id: Uuid,
file_id: String,
file_data: Vec<u8>,
file_type: String,
file_size: i64,
) -> Result<String, AppError>
where
R: AsyncRead + Unpin,
{
let (blob, file_id) = BlobStreamReader::new(blob_stream).finish().await?;
// check file is exist or not
) -> Result<(), AppError> {
if is_blob_metadata_exists(&self.pg_pool, &workspace_id, &file_id).await? {
event!(tracing::Level::TRACE, "file:{} is already exist", file_id);
return Ok(file_id);
warn!(
"file already exists, workspace_id: {}, file_id: {}",
workspace_id, file_id
);
return Ok(());
}
// query the storage space of the workspace
let obj_key = format!("{}/{}", workspace_id, file_id);
let usage = get_workspace_usage_size(&self.pg_pool, &workspace_id).await?;
event!(
tracing::Level::TRACE,
"workspace consumed space: {}, new file:{} with size: {}",
usage,
obj_key,
file_id,
file_size
file_data.len(),
);
if usage > MAX_USAGE {
return Err(AppError::StorageSpaceNotEnough);
}
self.client.put_blob(&file_id, blob).await?;
let obj_key = format!("{}/{}", workspace_id, file_id);
// save the metadata
if let Err(err) = insert_blob_metadata(
&self.pg_pool,
let mut tx = self.pg_pool.begin().await?;
insert_blob_metadata(
&mut tx,
&file_id,
&workspace_id,
&file_type,
file_size,
file_data.len(),
)
.await
{
event!(
tracing::Level::ERROR,
"failed to save metadata, file_id: {}, err: {}",
file_id,
err
);
// if the metadata is not saved, delete the blob.
self.client.delete_blob(&file_id).await?;
return Err(err);
}
.await?;
Ok(file_id)
self.client.pub_blob(obj_key, &file_data).await?;
tx.commit().await?;
Ok(())
}
pub async fn delete_blob(
&self,
workspace_id: &Uuid,
file_id: &str,
) -> Result<AFBlobMetadataRow, AppError> {
self.client.delete_blob(file_id).await?;
let resp = delete_blob_metadata(&self.pg_pool, workspace_id, file_id).await?;
Ok(resp)
pub async fn delete_blob(&self, workspace_id: &Uuid, file_id: &str) -> Result<(), AppError> {
let obj_key = format!("{}/{}", workspace_id, file_id);
let mut tx = self.pg_pool.begin().await?;
delete_blob_metadata(&mut tx, workspace_id, file_id).await?;
self.client.delete_blob(obj_key).await?;
tx.commit().await?;
Ok(())
}
pub async fn get_blob_metadata(
@ -127,8 +113,9 @@ where
Ok(metadata)
}
pub async fn get_blob(&self, file_id: &str) -> Result<Vec<u8>, AppError> {
let blob = self.client.get_blob(file_id).await?.to_blob();
pub async fn get_blob(&self, workspace_id: &Uuid, file_id: &str) -> Result<Vec<u8>, AppError> {
let obj_key = format!("{}/{}", workspace_id, file_id);
let blob = self.client.get_blob(obj_key).await?.to_blob();
Ok(blob)
}
}

View file

@ -1,61 +1,61 @@
use base64::alphabet::URL_SAFE;
use base64::engine::general_purpose::PAD;
use base64::engine::GeneralPurpose;
use base64::Engine;
use sha2::{Digest, Sha256};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{self, AsyncRead, AsyncReadExt, ReadBuf};
pub const URL_SAFE_ENGINE: GeneralPurpose = GeneralPurpose::new(&URL_SAFE, PAD);
pub struct BlobStreamReader<R> {
reader: R,
hasher: Sha256,
}
impl<R> AsyncRead for BlobStreamReader<R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let before = buf.filled().len();
let poll = Pin::new(&mut self.reader).poll_read(cx, buf);
let after = buf.filled().len();
if after > before {
self.hasher.update(&buf.filled()[before..after]);
}
poll
}
}
impl<R> BlobStreamReader<R>
where
R: AsyncRead + Unpin,
{
pub fn new(reader: R) -> Self {
Self {
reader,
hasher: Sha256::new(),
}
}
pub async fn finish(mut self) -> io::Result<(Vec<u8>, String)> {
let mut buffer = Vec::new();
let _ = self.read_to_end(&mut buffer).await?;
let hash = URL_SAFE_ENGINE.encode(self.hasher.finalize());
Ok((buffer, hash))
}
}
impl<R> AsRef<R> for BlobStreamReader<R>
where
R: AsyncRead + Unpin,
{
fn as_ref(&self) -> &R {
&self.reader
}
}
// use base64::Engine;
// use base64::alphabet::URL_SAFE;
// use base64::engine::general_purpose::PAD;
// use base64::engine::GeneralPurpose;
// use sha2::{Digest, Sha256};
// use std::pin::Pin;
// use std::task::{Context, Poll};
// use tokio::io::{self, AsyncRead, ReadBuf, AsyncReadExt};
//
// pub const URL_SAFE_ENGINE: GeneralPurpose = GeneralPurpose::new(&URL_SAFE, PAD);
// pub struct BlobStreamReader<R> {
// reader: R,
// hasher: Sha256,
// }
//
// impl<R> AsyncRead for BlobStreamReader<R>
// where
// R: AsyncRead + Unpin,
// {
// fn poll_read(
// mut self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// buf: &mut ReadBuf<'_>,
// ) -> Poll<io::Result<()>> {
// let before = buf.filled().len();
// let poll = Pin::new(&mut self.reader).poll_read(cx, buf);
// let after = buf.filled().len();
// if after > before {
// self.hasher.update(&buf.filled()[before..after]);
// }
// poll
// }
// }
//
// impl<R> BlobStreamReader<R>
// where
// R: AsyncRead + Unpin,
// {
// pub fn new(reader: R) -> Self {
// Self {
// reader,
// hasher: Sha256::new(),
// }
// }
//
// pub async fn finish(mut self) -> io::Result<(Vec<u8>, String)> {
// let mut buffer = Vec::new();
// let _ = self.read_to_end(&mut buffer).await?;
// let hash = URL_SAFE_ENGINE.encode(self.hasher.finalize());
// Ok((buffer, hash))
// }
// }
//
// impl<R> AsRef<R> for BlobStreamReader<R>
// where
// R: AsyncRead + Unpin,
// {
// fn as_ref(&self) -> &R {
// &self.reader
// }
// }

View file

@ -130,7 +130,7 @@ pub struct AFCollabMemberRow {
pub permission_id: i64,
}
#[derive(FromRow, Serialize, Deserialize)]
#[derive(Debug, FromRow, Serialize, Deserialize)]
pub struct AFBlobMetadataRow {
pub workspace_id: Uuid,
pub file_id: String,

View file

@ -1,8 +1,10 @@
use std::ops::DerefMut;
use crate::pg_row::AFBlobMetadataRow;
use app_error::AppError;
use rust_decimal::prelude::ToPrimitive;
use sqlx::types::Decimal;
use sqlx::PgPool;
use sqlx::{PgPool, Transaction};
use tracing::instrument;
use uuid::Uuid;
@ -32,14 +34,13 @@ pub async fn is_blob_metadata_exists(
#[instrument(level = "trace", skip_all, err)]
pub async fn insert_blob_metadata(
pg_pool: &PgPool,
tx: &mut Transaction<'_, sqlx::Postgres>,
file_id: &str,
workspace_id: &Uuid,
file_type: &str,
file_size: i64,
) -> Result<AFBlobMetadataRow, AppError> {
let metadata = sqlx::query_as!(
AFBlobMetadataRow,
file_size: usize,
) -> Result<(), AppError> {
let res = sqlx::query!(
r#"
INSERT INTO af_blob_metadata
(workspace_id, file_id, file_type, file_size)
@ -47,38 +48,39 @@ pub async fn insert_blob_metadata(
ON CONFLICT (workspace_id, file_id) DO UPDATE SET
file_type = $3,
file_size = $4
RETURNING *
"#,
workspace_id,
file_id,
file_type,
file_size
file_size as i64,
)
.fetch_one(pg_pool)
.execute(tx.deref_mut())
.await?;
Ok(metadata)
let n = res.rows_affected();
assert_eq!(n, 1);
Ok(())
}
#[instrument(level = "trace", skip_all, err)]
#[inline]
pub async fn delete_blob_metadata(
pg_pool: &PgPool,
tx: &mut Transaction<'_, sqlx::Postgres>,
workspace_id: &Uuid,
file_id: &str,
) -> Result<AFBlobMetadataRow, AppError> {
let metadata = sqlx::query_as!(
AFBlobMetadataRow,
) -> Result<(), AppError> {
let result = sqlx::query!(
r#"
DELETE FROM af_blob_metadata
WHERE workspace_id = $1 AND file_id = $2
RETURNING *
"#,
workspace_id,
file_id,
)
.fetch_one(pg_pool)
.execute(tx.deref_mut())
.await?;
Ok(metadata)
let n = result.rows_affected();
assert_eq!(n, 1);
Ok(())
}
#[instrument(level = "trace", skip_all, err)]
@ -112,7 +114,7 @@ pub async fn get_all_workspace_blob_metadata(
AFBlobMetadataRow,
r#"
SELECT * FROM af_blob_metadata
WHERE workspace_id = $1
WHERE workspace_id = $1
"#,
workspace_id,
)

View file

@ -9,12 +9,12 @@ edition = "2021"
x25519-dalek = { version = "2.0.0" , features = ["getrandom"] }
rand = "0.8.5"
hex = "0.4.3"
anyhow = "1.0.75"
anyhow = "1.0.79"
aes-gcm = { version = "0.10.3" }
base64 = "0.21.5"
hkdf = { version = "0.12.3" }
base64 = "0.21.6"
hkdf = { version = "0.12.4" }
sha2 = "0.10.8"
serde = { version = "1.0.188", features = ["derive"] }
serde = { version = "1.0.195", features = ["derive"] }
bincode.workspace = true
bytes.workspace = true

View file

@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
serde.workspace = true
serde_json.workspace = true
anyhow = "1.0.75"
anyhow = "1.0.79"
lazy_static = "1.4.0"
jsonwebtoken = "8.3.0"
app-error = { workspace = true, features = ["gotrue_error"] }
app-error = { workspace = true, features = ["gotrue_error"] }

View file

@ -8,10 +8,10 @@ edition = "2021"
[dependencies]
serde.workspace = true
serde_json.workspace = true
futures-util = "0.3.8"
anyhow = "1.0.75"
reqwest = { version = "0.11.20", default-features = false, features = ["json", "rustls-tls", "cookies"] }
tokio = { version = "1.0.1", features = ["sync", "macros"] }
futures-util = "0.3.30"
anyhow = "1.0.79"
reqwest = { version = "0.11.23", default-features = false, features = ["json", "rustls-tls", "cookies"] }
tokio = { version = "1.35.1", features = ["sync", "macros"] }
infra = { path = "../infra" }
gotrue-entity = { path = "../gotrue-entity" }
tracing = "0.1"
tracing = "0.1"

View file

@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
reqwest = { version = "0.11.20", default-features = false }
anyhow = "1.0.75"
reqwest = { version = "0.11.23", default-features = false }
anyhow = "1.0.79"
serde.workspace = true
serde_json.workspace = true
serde_json.workspace = true

View file

@ -10,15 +10,15 @@ collab = { version = "0.1.0" }
collab-entity = { version = "0.1.0" }
serde.workspace = true
serde_json.workspace = true
bytes = { version = "1.0", features = ["serde"] }
anyhow = "1.0.75"
bytes = { version = "1.5", features = ["serde"] }
anyhow = "1.0.79"
actix = { version = "0.13", optional = true }
bincode.workspace = true
tokio-tungstenite = { version = "0.20.1", optional = true }
prost = "0.12.1"
prost = "0.12.3"
database-entity.workspace = true
yrs.workspace = true
thiserror = "1.0.48"
thiserror = "1.0.56"
realtime-protocol.workspace = true
[features]
@ -27,4 +27,4 @@ tungstenite = ["tokio-tungstenite"]
[build-dependencies]
protoc-bin-vendored = { version = "3.0" }
prost-build = "0.12.1"
prost-build = "0.12.3"

View file

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
yrs.workspace = true
thiserror = "1.0.48"
thiserror = "1.0.56"
serde.workspace = true
collab = { version = "0.1.0" }
bincode.workspace = true

View file

@ -10,19 +10,19 @@ actix = "0.13"
actix-web-actors = { version = "4.2.0" }
serde.workspace = true
serde_json.workspace = true
thiserror = "1.0.30"
bytes = { version = "1.0", features = ["serde"] }
thiserror = "1.0.56"
bytes = { version = "1.5", features = ["serde"] }
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
tracing = "0.1.25"
futures-util = "0.3.26"
tracing = "0.1.40"
futures-util = "0.3.30"
tokio-util = { version = "0.7", features = ["codec"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio = { version = "1.26.0", features = ["net", "sync", "macros"] }
async-trait = "0.1.73"
anyhow = "1.0.75"
serde_repr = "0.1.17"
tokio = { version = "1.35.1", features = ["net", "sync", "macros"] }
async-trait = "0.1.77"
anyhow = "1.0.79"
serde_repr = "0.1.18"
tokio-retry = "0.3.0"
reqwest = "0.11.18"
reqwest = "0.11.23"
app-error = { workspace = true }
collab = { version = "0.1.0"}
@ -30,22 +30,22 @@ collab-entity = { version = "0.1.0" }
database = { path = "../database" }
database-entity.workspace = true
yrs.workspace = true
chrono = "0.4.30"
chrono = "0.4.31"
realtime-entity = { workspace = true, features = ["actix_message"] }
realtime-protocol.workspace = true
uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
actix = "0.13"
actix-web = { version = "4.3.1" }
actix-web = { version = "4.4.1" }
actix-rt = "2"
actix-web-actors = { version = "4.2.0" }
once_cell = "1.18.0"
reqwest = "0.11.18"
tracing = { version = "0.1.37" }
tracing-subscriber = { version = "0.3.16", features = ["registry", "env-filter", "ansi", "json"] }
tracing-bunyan-formatter = "0.3.6"
tracing-log = "0.1.1"
serde-aux = "4.2.0"
tempfile = "3.8.0"
once_cell = "1.19.0"
reqwest = "0.11.23"
tracing = { version = "0.1.40" }
tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter", "ansi", "json"] }
tracing-bunyan-formatter = "0.3.9"
tracing-log = "0.1.4"
serde-aux = "4.3.1"
tempfile = "3.9.0"
assert-json-diff = "2.0.2"

View file

@ -6,20 +6,20 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.75"
serde = "1.0.188"
anyhow = "1.0.79"
serde = "1.0.195"
serde_json.workspace = true
serde_repr = "0.1.17"
thiserror = "1.0.47"
reqwest = "0.11.18"
uuid = { version = "1.3.3", features = ["v4"] }
serde_repr = "0.1.18"
thiserror = "1.0.56"
reqwest = "0.11.23"
uuid = { version = "1.6.1", features = ["v4"] }
gotrue-entity = { path = "../gotrue-entity" }
database-entity.workspace = true
collab-entity = { version = "0.1.0" }
app-error = { workspace = true }
chrono = "0.4.31"
actix-web = { version = "4.4.0", default-features = false, features = ["http2"], optional = true }
actix-web = { version = "4.4.1", default-features = false, features = ["http2"], optional = true }
validator = { version = "0.16", features = ["validator_derive", "derive"], optional = true }
rust-s3 = { version = "0.33.0", optional = true }

View file

@ -7,8 +7,8 @@ edition = "2021"
[dependencies]
serde.workspace = true
chrono = { version = "0.4.23", features = ["serde", "clock"], default-features = false }
chrono = { version = "0.4.31", features = ["serde", "clock"], default-features = false }
jwt = "0.16.0"
thiserror = "1.0.30"
thiserror = "1.0.56"
hmac = "0.12.1"
sha2 = "0.10.6"
sha2 = "0.10.8"

View file

@ -11,11 +11,11 @@ collab = { version = "0.1.0"}
collab-folder = { version = "0.1.0"}
collab-document = { version = "0.1.0"}
collab-entity = { version = "0.1.0"}
async-trait = "0.1.73"
async-trait = "0.1.77"
anyhow.workspace = true
tokio.workspace = true
uuid.workspace = true
indexmap = "2.1.0"
serde_json.workspace = true
nanoid = "0.4.0"
serde = { version = "1.0.188", features = ["derive"] }
serde = { version = "1.0.195", features = ["derive"] }

View file

@ -14,12 +14,11 @@ use chrono::DateTime;
use database::file::{MAX_BLOB_SIZE, MAX_USAGE};
use database::resource_usage::{get_all_workspace_blob_metadata, get_workspace_usage_size};
use database_entity::dto::AFBlobRecord;
use serde::Deserialize;
use shared_entity::dto::workspace_dto::{BlobMetadata, RepeatedBlobMetaData, WorkspaceSpaceUsage};
use shared_entity::response::{AppResponse, AppResponseError, JsonAppResponse};
use sqlx::types::Uuid;
use std::pin::Pin;
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
use tracing::{event, instrument};
@ -28,14 +27,14 @@ use crate::state::AppState;
pub fn file_storage_scope() -> Scope {
web::scope("/api/file_storage")
.service(web::resource("/{workspace_id}/blob").route(web::put().to(put_blob_handler)))
.service(
web::resource("/{workspace_id}/blob/{file_id:.*}")
web::resource("/{workspace_id}/blob/{file_id}")
.route(web::put().to(put_blob_handler))
.route(web::get().to(get_blob_handler))
.route(web::delete().to(delete_blob_handler)),
)
.service(
web::resource("/{workspace_id}/metadata/{file_id:.*}")
web::resource("/{workspace_id}/metadata/{file_id}")
.route(web::get().to(get_blob_metadata_handler)),
)
.service(
@ -47,43 +46,60 @@ pub fn file_storage_scope() -> Scope {
)
}
#[derive(Deserialize, Debug)]
struct PathInfo {
workspace_id: Uuid,
file_id: String,
}
#[instrument(skip(state, payload), err)]
async fn put_blob_handler(
state: Data<AppState>,
payload: Payload,
path: web::Path<(Uuid, String)>,
content_type: web::Header<ContentType>,
content_length: web::Header<ContentLength>,
workspace_id: web::Path<Uuid>,
payload: Payload,
) -> Result<JsonAppResponse<AFBlobRecord>> {
let (workspace_id, file_id) = path.into_inner();
let content_length = content_length.into_inner().into_inner();
// Check content length, if it's too large, return error.
if content_length > MAX_BLOB_SIZE {
return Ok(
AppResponse::from(AppError::PayloadTooLarge(
"The uploading file is too large".to_string(),
))
.into(),
);
}
let file_type = content_type.into_inner().0.to_string();
let blob_stream = payload_to_async_read(payload);
let workspace_id = workspace_id.into_inner();
let content_type = content_type.into_inner().to_string();
let content = {
// Check content length, if it's too large, return error.
if content_length > MAX_BLOB_SIZE {
return Ok(
AppResponse::from(AppError::PayloadTooLarge(
"The uploading file is too large".to_string(),
))
.into(),
);
}
let mut payload_reader = payload_to_async_read(payload);
let mut content = vec![0; content_length];
let n = payload_reader.read_exact(&mut content).await?;
assert_eq!(n, content_length);
let res = payload_reader.read_u8().await;
match res {
Ok(_) => {
return Ok(
AppResponse::from(AppError::PayloadTooLarge(
"Content length is {}, but the actual content is larger".to_string(),
))
.into(),
);
},
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => (),
_ => return Err(AppError::Internal(anyhow::anyhow!(e)).into()),
},
};
content
};
event!(
tracing::Level::TRACE,
"start put blob: {}:{}",
file_type,
"start put blob. workspace_id: {}, file_id: {}, content_length: {}",
workspace_id,
file_id,
content_length
);
let file_id = state
state
.bucket_storage
.put_blob(blob_stream, workspace_id, file_type, content_length as i64)
.put_blob(workspace_id, file_id.to_string(), content, content_type)
.await
.map_err(AppResponseError::from)?;
@ -95,13 +111,9 @@ async fn put_blob_handler(
#[instrument(level = "debug", skip(state), err)]
async fn delete_blob_handler(
state: Data<AppState>,
path: web::Path<PathInfo>,
path: web::Path<(Uuid, String)>,
) -> Result<JsonAppResponse<()>> {
let PathInfo {
workspace_id,
file_id,
} = path.into_inner();
let (workspace_id, file_id) = path.into_inner();
state
.bucket_storage
.delete_blob(&workspace_id, &file_id)
@ -113,13 +125,10 @@ async fn delete_blob_handler(
#[instrument(level = "debug", skip(state), err)]
async fn get_blob_handler(
state: Data<AppState>,
path: web::Path<PathInfo>,
path: web::Path<(Uuid, String)>,
req: HttpRequest,
) -> Result<HttpResponse<BoxBody>> {
let PathInfo {
workspace_id,
file_id,
} = path.into_inner();
let (workspace_id, file_id) = path.into_inner();
// Get the metadata
let result = state
@ -149,7 +158,7 @@ async fn get_blob_handler(
}
let blob = state
.bucket_storage
.get_blob(&file_id)
.get_blob(&workspace_id, &file_id)
.await
.map_err(AppResponseError::from)?;
@ -167,12 +176,9 @@ async fn get_blob_handler(
#[instrument(level = "debug", skip(state), err)]
async fn get_blob_metadata_handler(
state: Data<AppState>,
path: web::Path<PathInfo>,
path: web::Path<(Uuid, String)>,
) -> Result<JsonAppResponse<BlobMetadata>> {
let PathInfo {
workspace_id,
file_id,
} = path.into_inner();
let (workspace_id, file_id) = path.into_inner();
// Get the metadata
let metadata = state

View file

@ -17,7 +17,7 @@ use database_entity::dto::{
InsertCollabMemberParams, QueryCollab, QueryCollabParams, QuerySnapshotParams, SnapshotData,
UpdateCollabMemberParams,
};
use image::io::Reader as ImageReader;
use mime::Mime;
use serde_json::Value;
use shared_entity::dto::workspace_dto::{
BlobMetadata, CreateWorkspaceMember, WorkspaceMemberChangeset, WorkspaceSpaceUsage,
@ -28,7 +28,6 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::tempdir;
use tokio::time::{timeout, Duration};
use tokio_stream::StreamExt;
@ -255,35 +254,22 @@ impl TestClient {
}
}
pub async fn download_blob<T: AsRef<str>>(&self, url: T) -> Vec<u8> {
self.api_client.get_blob(url).await.unwrap().to_vec()
}
#[allow(dead_code)]
pub async fn get_blob_metadata<T: AsRef<str>>(&self, url: T) -> BlobMetadata {
self.api_client.get_blob_metadata(url).await.unwrap()
pub async fn get_blob_metadata(&self, workspace_id: &str, file_id: &str) -> BlobMetadata {
let url = self.api_client.get_blob_url(workspace_id, file_id);
self.api_client.get_blob_metadata(&url).await.unwrap()
}
pub async fn upload_blob<T: Into<Bytes>, M: ToString>(&self, data: T, mime: M) -> String {
pub async fn upload_blob<T: Into<Bytes>>(&self, file_id: &str, data: T, mime: &Mime) {
let workspace_id = self.workspace_id().await;
self
.api_client
.put_blob(&workspace_id, data, mime)
.await
.unwrap()
let url = self.api_client.get_blob_url(&workspace_id, file_id);
self.api_client.put_blob(&url, data, mime).await.unwrap()
}
pub async fn upload_file_with_path(&self, path: &str) -> String {
pub async fn delete_file(&self, file_id: &str) {
let workspace_id = self.workspace_id().await;
self
.api_client
.put_blob_with_path(&workspace_id, path)
.await
.unwrap()
}
pub async fn delete_file(&self, url: &str) {
self.api_client.delete_blob(url).await.unwrap();
let url = self.api_client.get_blob_url(&workspace_id, file_id);
self.api_client.delete_blob(&url).await.unwrap();
}
pub async fn get_workspace_usage(&self) -> WorkspaceSpaceUsage {
@ -710,12 +696,6 @@ pub async fn get_collab_json_from_server(
.to_json_value()
}
pub fn generate_temp_file_path<T: AsRef<Path>>(file_name: T) -> TestTempFile {
let mut path = tempdir().unwrap().into_path();
path.push(file_name);
TestTempFile(path)
}
pub struct TestTempFile(PathBuf);
impl TestTempFile {
@ -743,18 +723,3 @@ impl Drop for TestTempFile {
Self::cleanup(&self.0)
}
}
pub fn assert_image_equal<P: AsRef<Path>>(path1: P, path2: P) {
let img1 = ImageReader::open(path1)
.unwrap()
.decode()
.unwrap()
.into_rgba8();
let img2 = ImageReader::open(path2)
.unwrap()
.decode()
.unwrap()
.into_rgba8();
assert_eq!(img1, img2)
}

View file

@ -1,15 +1,27 @@
use crate::collab::workspace_id_from_client;
use crate::user::utils::generate_unique_registered_user_client;
use crate::util::test_client::{assert_image_equal, generate_temp_file_path, TestClient};
use app_error::ErrorCode;
use reqwest::Url;
use std::path::Path;
#[tokio::test]
async fn get_but_not_exists() {
let (c1, _user1) = generate_unique_registered_user_client().await;
let err = c1.get_blob("not_exists_file_id").await.unwrap_err();
assert_eq!(err.code, ErrorCode::InvalidUrl);
let url = c1.get_blob_url("hello", "world");
let err = c1.get_blob(&url).await.unwrap_err();
assert_eq!(err.code, ErrorCode::RecordNotFound);
let workspace_id = c1
.get_workspaces()
.await
.unwrap()
.0
.first()
.unwrap()
.workspace_id
.to_string();
let url = c1.get_blob_url(&workspace_id, "world");
let err = c1.get_blob(&url).await.unwrap_err();
assert_eq!(err.code, ErrorCode::RecordNotFound);
}
#[tokio::test]
@ -18,16 +30,15 @@ async fn put_and_get() {
let workspace_id = workspace_id_from_client(&c1).await;
let mime = mime::TEXT_PLAIN_UTF_8;
let data = "hello world";
let file_url = c1.put_blob(&workspace_id, data, &mime).await.unwrap();
let file_id = uuid::Uuid::new_v4().to_string();
let url = c1.get_blob_url(&workspace_id, &file_id);
c1.put_blob(&url, data, &mime).await.unwrap();
let url = Url::parse(&file_url).unwrap();
let file_id = url.path_segments().unwrap().last().unwrap();
assert_eq!(file_id, "uU0nuZNNPgilLlLX2n2r-sSE7-N6U4DukIj3rOLvzek=");
let (got_mime, got_data) = c1.get_blob(&url).await.unwrap();
assert_eq!(got_data, Vec::from(data));
assert_eq!(got_mime, mime);
let got_data = c1.get_blob(&file_url).await.unwrap();
assert_eq!(got_data, data.as_bytes());
c1.delete_blob(&file_url).await.unwrap();
c1.delete_blob(&url).await.unwrap();
}
#[tokio::test]
@ -35,8 +46,11 @@ async fn put_giant_file() {
let (c1, _user1) = generate_unique_registered_user_client().await;
let workspace_id = workspace_id_from_client(&c1).await;
let mime = mime::TEXT_PLAIN_UTF_8;
let file_id = uuid::Uuid::new_v4().to_string();
let url = c1.get_blob_url(&workspace_id, &file_id);
let error = c1
.put_blob_with_content_length(&workspace_id, "123", &mime, 10 * 1024 * 1024 * 1024)
.put_blob_with_content_length(&url, "123", &mime, 10 * 1024 * 1024 * 1024)
.await
.unwrap_err();
@ -50,14 +64,20 @@ async fn put_and_put_and_get() {
let mime = mime::TEXT_PLAIN_UTF_8;
let data1 = "my content 1";
let data2 = "my content 2";
let url_1 = c1.put_blob(&workspace_id, data1, &mime).await.unwrap();
let url_2 = c1.put_blob(&workspace_id, data2, &mime).await.unwrap();
let file_id_1 = uuid::Uuid::new_v4().to_string();
let file_id_2 = uuid::Uuid::new_v4().to_string();
let url_1 = c1.get_blob_url(&workspace_id, &file_id_1);
let url_2 = c1.get_blob_url(&workspace_id, &file_id_2);
c1.put_blob(&url_1, data1, &mime).await.unwrap();
c1.put_blob(&url_2, data2, &mime).await.unwrap();
let got_data = c1.get_blob(&url_1).await.unwrap();
assert_eq!(got_data, data1.as_bytes());
let (got_mime, got_data) = c1.get_blob(&url_1).await.unwrap();
assert_eq!(got_data, Vec::from(data1));
assert_eq!(got_mime, mime);
let got_data = c1.get_blob(&url_2).await.unwrap();
assert_eq!(got_data, data2.as_bytes());
let (got_mime, got_data) = c1.get_blob(&url_2).await.unwrap();
assert_eq!(got_data, Vec::from(data2));
assert_eq!(got_mime, mime);
c1.delete_blob(&url_1).await.unwrap();
c1.delete_blob(&url_2).await.unwrap();
@ -69,29 +89,12 @@ async fn put_delete_get() {
let workspace_id = workspace_id_from_client(&c1).await;
let mime = mime::TEXT_PLAIN_UTF_8;
let data = "my contents";
let url = c1.put_blob(&workspace_id, data, &mime).await.unwrap();
let file_id = uuid::Uuid::new_v4().to_string();
let url = c1.get_blob_url(&workspace_id, &file_id);
c1.put_blob(&url, data, &mime).await.unwrap();
c1.delete_blob(&url).await.unwrap();
let url = c1.get_blob_url(&workspace_id, &file_id);
let err = c1.get_blob(&url).await.unwrap_err();
assert_eq!(err.code, ErrorCode::RecordNotFound);
}
#[tokio::test]
async fn put_and_download_png() {
let image_path = "tests/workspace/blob/asset/16kb_logo.png";
let client = TestClient::new_user_without_ws_conn().await;
let url = client.upload_file_with_path(image_path).await;
let url = Url::parse(&url).unwrap();
let file_id = url.path_segments().unwrap().last().unwrap();
assert_eq!(file_id, "u_XUU5snbo_IDNQlqacesNyT6LVkxBKbnk3d82oq1og=");
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 15694);
// download the image and check the content is equal to the original file
let temp_file = generate_temp_file_path("temp.png");
std::fs::write(&temp_file, client.download_blob(&url).await).unwrap();
assert_image_equal(Path::new(image_path), &temp_file);
}

View file

@ -4,29 +4,33 @@ use crate::util::test_client::TestClient;
async fn workspace_usage_put_blob_test() {
let client = TestClient::new_user_without_ws_conn().await;
let mime = mime::TEXT_PLAIN_UTF_8;
let file_1 = client.upload_blob("123", &mime).await;
let file_2 = client.upload_blob("456", &mime).await;
let file_id_1 = uuid::Uuid::new_v4().to_string();
let file_id_2 = uuid::Uuid::new_v4().to_string();
client.upload_blob(&file_id_1, "123", &mime).await;
client.upload_blob(&file_id_2, "456", &mime).await;
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 6);
// after the test, delete the files
client.delete_file(&file_1).await;
client.delete_file(&file_2).await;
client.delete_file(&file_id_1).await;
client.delete_file(&file_id_2).await;
}
#[tokio::test]
async fn workspace_usage_put_and_then_delete_blob_test() {
let client = TestClient::new_user_without_ws_conn().await;
let mime = mime::TEXT_PLAIN_UTF_8;
let file_1 = client.upload_blob("123", &mime).await;
let file_2 = client.upload_blob("456", &mime).await;
let file_id_1 = uuid::Uuid::new_v4().to_string();
let file_id_2 = uuid::Uuid::new_v4().to_string();
client.upload_blob(&file_id_1, "123", &mime).await;
client.upload_blob(&file_id_2, "456", &mime).await;
client.delete_file(&file_1).await;
client.delete_file(&file_id_1).await;
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 3);
client.delete_file(&file_2).await;
client.delete_file(&file_id_2).await;
let usage = client.get_workspace_usage().await;
assert_eq!(usage.consumed_capacity, 0);
}