mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
chore: retry import (#887)
* chore: retry import * chore: delete temp files * chore: bump collab
This commit is contained in:
parent
22a70f241d
commit
3c02fa253f
10 changed files with 306 additions and 74 deletions
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n workspace_id,\n database_storage_id,\n owner_uid,\n owner_profile.name as owner_name,\n owner_profile.email as owner_email,\n af_workspace.created_at,\n workspace_type,\n af_workspace.deleted_at,\n workspace_name,\n icon\n FROM public.af_workspace\n JOIN public.af_user owner_profile ON af_workspace.owner_uid = owner_profile.uid\n WHERE workspace_id = $1\n ",
|
||||
"query": "\n SELECT\n workspace_id,\n database_storage_id,\n owner_uid,\n owner_profile.name as owner_name,\n owner_profile.email as owner_email,\n af_workspace.created_at,\n workspace_type,\n af_workspace.deleted_at,\n workspace_name,\n icon\n FROM public.af_workspace\n JOIN public.af_user owner_profile ON af_workspace.owner_uid = owner_profile.uid\n WHERE af_workspace.workspace_id = $1\n AND COALESCE(af_workspace.is_initialized, true) = true;\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
|
@ -72,5 +72,5 @@
|
|||
false
|
||||
]
|
||||
},
|
||||
"hash": "f448ae1b28ef69f884040016072b12694e530b64a105e03a040c65b779c9d91e"
|
||||
"hash": "c360ec37792d567535ccd2a5011d92c7a201f516e92e204db855167f381c58b1"
|
||||
}
|
17
Cargo.lock
generated
17
Cargo.lock
generated
|
@ -608,6 +608,7 @@ dependencies = [
|
|||
"assert-json-diff",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"async_zip",
|
||||
"authentication",
|
||||
"aws-config",
|
||||
"aws-sdk-s3",
|
||||
|
@ -633,6 +634,7 @@ dependencies = [
|
|||
"dotenvy",
|
||||
"fancy-regex 0.11.0",
|
||||
"futures",
|
||||
"futures-lite",
|
||||
"futures-util",
|
||||
"gotrue",
|
||||
"gotrue-entity",
|
||||
|
@ -660,6 +662,7 @@ dependencies = [
|
|||
"rcgen",
|
||||
"redis 0.25.4",
|
||||
"reqwest 0.11.27",
|
||||
"sanitize-filename",
|
||||
"scraper",
|
||||
"secrecy",
|
||||
"semver",
|
||||
|
@ -2228,7 +2231,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
@ -2253,7 +2256,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-database"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
@ -2292,7 +2295,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-document"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
@ -2313,7 +2316,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-entity"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
|
@ -2333,7 +2336,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-folder"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
@ -2355,7 +2358,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-importer"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-recursion",
|
||||
|
@ -2456,7 +2459,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-user"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"collab",
|
||||
|
|
20
Cargo.toml
20
Cargo.toml
|
@ -149,6 +149,9 @@ byteorder = "1.5.0"
|
|||
sha2 = "0.10.8"
|
||||
rayon.workspace = true
|
||||
mailer.workspace = true
|
||||
async_zip.workspace = true
|
||||
sanitize-filename.workspace = true
|
||||
futures-lite = "2.3.0"
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -170,6 +173,7 @@ collab-rt-entity.workspace = true
|
|||
hex = "0.4.3"
|
||||
unicode-normalization = "0.1.24"
|
||||
|
||||
|
||||
[[bin]]
|
||||
name = "appflowy_cloud"
|
||||
path = "src/main.rs"
|
||||
|
@ -270,6 +274,8 @@ tonic-proto = { path = "libs/tonic-proto" }
|
|||
appflowy-ai-client = { path = "libs/appflowy-ai-client", default-features = false }
|
||||
pgvector = { version = "0.4", features = ["sqlx"] }
|
||||
client-api-entity = { path = "libs/client-api-entity" }
|
||||
async_zip = { version = "0.0.17", features = ["full"] }
|
||||
sanitize-filename = "0.5.0"
|
||||
|
||||
# collaboration
|
||||
yrs = { version = "0.21.2", features = ["sync"] }
|
||||
|
@ -293,13 +299,13 @@ debug = true
|
|||
[patch.crates-io]
|
||||
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
|
||||
# So using patch to workaround this issue.
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" }
|
||||
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" }
|
||||
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" }
|
||||
collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" }
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" }
|
||||
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" }
|
||||
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" }
|
||||
collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" }
|
||||
|
||||
[features]
|
||||
history = []
|
||||
|
|
|
@ -660,7 +660,8 @@ pub async fn select_workspace<'a, E: Executor<'a, Database = Postgres>>(
|
|||
icon
|
||||
FROM public.af_workspace
|
||||
JOIN public.af_user owner_profile ON af_workspace.owner_uid = owner_profile.uid
|
||||
WHERE workspace_id = $1
|
||||
WHERE af_workspace.workspace_id = $1
|
||||
AND COALESCE(af_workspace.is_initialized, true) = true;
|
||||
"#,
|
||||
workspace_id
|
||||
)
|
||||
|
|
|
@ -25,6 +25,9 @@ pub enum ImportError {
|
|||
#[error("Can not open the workspace:{0}")]
|
||||
CannotOpenWorkspace(String),
|
||||
|
||||
#[error("Failed to unzip file: {0}")]
|
||||
UnZipFileError(String),
|
||||
|
||||
#[error(transparent)]
|
||||
Internal(#[from] anyhow::Error),
|
||||
}
|
||||
|
@ -125,6 +128,15 @@ impl ImportError {
|
|||
),
|
||||
format!("Task ID: {} - Internal error: {}", task_id, err),
|
||||
),
|
||||
ImportError::UnZipFileError(_) => {
|
||||
(
|
||||
format!(
|
||||
"Task ID: {} - There was an issue unzipping the file. Please check the file and try again.",
|
||||
task_id
|
||||
),
|
||||
format!("Task ID: {} - Unzip file error", task_id),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
use crate::import_worker::report::{ImportNotifier, ImportProgress, ImportResult};
|
||||
use crate::s3_client::S3StreamResponse;
|
||||
use crate::s3_client::{download_file, S3StreamResponse};
|
||||
use anyhow::anyhow;
|
||||
use async_zip::base::read::stream::ZipFileReader;
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
|
||||
use crate::error::ImportError;
|
||||
use crate::mailer::ImportNotionMailerParam;
|
||||
use crate::s3_client::S3Client;
|
||||
|
||||
use bytes::Bytes;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::entity::EncodedCollab;
|
||||
|
@ -15,13 +18,15 @@ use collab_importer::notion::page::CollabResource;
|
|||
use collab_importer::notion::NotionImporter;
|
||||
use collab_importer::util::FileId;
|
||||
use collab_importer::zip_tool::unzip_stream;
|
||||
use database::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache};
|
||||
use database::collab::{insert_into_af_collab_bulk_for_user, select_blob_from_af_collab};
|
||||
use database::resource_usage::{insert_blob_metadata_bulk, BulkInsertMeta};
|
||||
use database::workspace::{
|
||||
delete_from_workspace, select_workspace_database_storage_id, update_import_task_status,
|
||||
update_workspace_status,
|
||||
};
|
||||
use database_entity::dto::CollabParams;
|
||||
use futures::io::BufReader;
|
||||
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{stream, StreamExt};
|
||||
use redis::aio::ConnectionManager;
|
||||
|
@ -45,15 +50,9 @@ use std::str::FromStr;
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::fs;
|
||||
|
||||
use crate::s3_client::S3Client;
|
||||
use database::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache};
|
||||
use tokio::task::spawn_local;
|
||||
use tokio::time::interval;
|
||||
|
||||
use crate::error::ImportError;
|
||||
use crate::mailer::ImportNotionMailerParam;
|
||||
use database::resource_usage::{insert_blob_metadata_bulk, BulkInsertMeta};
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
use tracing::{error, info, trace, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -75,7 +74,16 @@ pub async fn run_import_worker(
|
|||
error!("Failed to ensure consumer group: {:?}", err);
|
||||
}
|
||||
|
||||
let mut storage_dir = temp_dir().join("import_worker_temp_dir");
|
||||
if !storage_dir.exists() {
|
||||
if let Err(err) = fs::create_dir(&storage_dir).await {
|
||||
error!("Failed to create importer temp dir: {:?}", err);
|
||||
storage_dir = temp_dir();
|
||||
}
|
||||
}
|
||||
|
||||
process_un_acked_tasks(
|
||||
&storage_dir,
|
||||
&mut redis_client,
|
||||
&s3_client,
|
||||
&pg_pool,
|
||||
|
@ -87,6 +95,7 @@ pub async fn run_import_worker(
|
|||
.await;
|
||||
|
||||
process_upcoming_tasks(
|
||||
&storage_dir,
|
||||
&mut redis_client,
|
||||
&s3_client,
|
||||
pg_pool,
|
||||
|
@ -101,7 +110,9 @@ pub async fn run_import_worker(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn process_un_acked_tasks(
|
||||
storage_dir: &Path,
|
||||
redis_client: &mut ConnectionManager,
|
||||
s3_client: &Arc<dyn S3Client>,
|
||||
pg_pool: &PgPool,
|
||||
|
@ -117,6 +128,7 @@ async fn process_un_acked_tasks(
|
|||
for un_ack_task in un_ack_tasks {
|
||||
// Ignore the error here since the consume task will handle the error
|
||||
let _ = consume_task(
|
||||
storage_dir,
|
||||
stream_name,
|
||||
group_name,
|
||||
un_ack_task.task,
|
||||
|
@ -135,6 +147,7 @@ async fn process_un_acked_tasks(
|
|||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn process_upcoming_tasks(
|
||||
storage_dir: &Path,
|
||||
redis_client: &mut ConnectionManager,
|
||||
s3_client: &Arc<dyn S3Client>,
|
||||
pg_pool: PgPool,
|
||||
|
@ -176,8 +189,10 @@ async fn process_upcoming_tasks(
|
|||
let notifier = notifier.clone();
|
||||
let stream_name = stream_name.to_string();
|
||||
let group_name = group_name.to_string();
|
||||
let storage_dir = storage_dir.to_path_buf();
|
||||
task_handlers.push(spawn_local(async move {
|
||||
consume_task(
|
||||
&storage_dir,
|
||||
&stream_name,
|
||||
&group_name,
|
||||
import_task,
|
||||
|
@ -210,6 +225,7 @@ async fn process_upcoming_tasks(
|
|||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn consume_task(
|
||||
storage_dir: &Path,
|
||||
stream_name: &str,
|
||||
group_name: &str,
|
||||
import_task: ImportTask,
|
||||
|
@ -219,7 +235,15 @@ async fn consume_task(
|
|||
pg_pool: &Pool<Postgres>,
|
||||
notifier: Arc<dyn ImportNotifier>,
|
||||
) -> Result<(), ImportError> {
|
||||
let result = process_task(import_task, s3_client, redis_client, pg_pool, notifier).await;
|
||||
let result = process_task(
|
||||
storage_dir,
|
||||
import_task,
|
||||
s3_client,
|
||||
redis_client,
|
||||
pg_pool,
|
||||
notifier,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Each task will be consumed only once, regardless of success or failure.
|
||||
let _: () = redis_client
|
||||
|
@ -234,6 +258,7 @@ async fn consume_task(
|
|||
}
|
||||
|
||||
async fn process_task(
|
||||
storage_dir: &Path,
|
||||
import_task: ImportTask,
|
||||
s3_client: &Arc<dyn S3Client>,
|
||||
redis_client: &mut ConnectionManager,
|
||||
|
@ -245,7 +270,10 @@ async fn process_task(
|
|||
match import_task {
|
||||
ImportTask::Notion(task) => {
|
||||
// 1. download zip file
|
||||
match download_and_unzip_file(&task, s3_client).await {
|
||||
let unzip_result =
|
||||
download_and_unzip_file_retry(storage_dir, &task, s3_client, 3, Duration::from_secs(5))
|
||||
.await;
|
||||
match unzip_result {
|
||||
Ok(unzip_dir_path) => {
|
||||
// 2. process unzip file
|
||||
let result =
|
||||
|
@ -292,7 +320,38 @@ async fn process_task(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn download_and_unzip_file_retry(
|
||||
storage_dir: &Path,
|
||||
import_task: &NotionImportTask,
|
||||
s3_client: &Arc<dyn S3Client>,
|
||||
max_retries: usize,
|
||||
interval: Duration,
|
||||
) -> Result<PathBuf, ImportError> {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
attempt += 1;
|
||||
match download_and_unzip_file(storage_dir, import_task, s3_client).await {
|
||||
Ok(result) => return Ok(result),
|
||||
Err(err) if attempt <= max_retries => {
|
||||
warn!(
|
||||
"Attempt {} failed: {}. Retrying in {:?}...",
|
||||
attempt, err, interval
|
||||
);
|
||||
tokio::time::sleep(interval).await;
|
||||
},
|
||||
Err(err) => {
|
||||
return Err(ImportError::Internal(anyhow!(
|
||||
"Failed after {} attempts: {}",
|
||||
attempt,
|
||||
err
|
||||
)));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_and_unzip_file(
|
||||
storage_dir: &Path,
|
||||
import_task: &NotionImportTask,
|
||||
s3_client: &Arc<dyn S3Client>,
|
||||
) -> Result<PathBuf, ImportError> {
|
||||
|
@ -305,11 +364,19 @@ async fn download_and_unzip_file(
|
|||
.await
|
||||
.map_err(|err| ImportError::Internal(err.into()))?;
|
||||
let buffer_size = buffer_size_from_content_length(content_length);
|
||||
let reader = BufReader::with_capacity(buffer_size, stream);
|
||||
let zip_reader = ZipFileReader::new(reader);
|
||||
|
||||
// Read from stream
|
||||
// let reader = BufReader::with_capacity(buffer_size, stream);
|
||||
// let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader);
|
||||
|
||||
// Download first and then read from local file
|
||||
let zip_file_path = download_file(storage_dir, stream).await?;
|
||||
|
||||
let file = fs::File::open(&zip_file_path).await.unwrap();
|
||||
let reader = tokio::io::BufReader::with_capacity(buffer_size, file).compat();
|
||||
let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader);
|
||||
let unique_file_name = Uuid::new_v4().to_string();
|
||||
let output_file_path = temp_dir().join(unique_file_name);
|
||||
let output_file_path = storage_dir.join(unique_file_name);
|
||||
fs::create_dir_all(&output_file_path)
|
||||
.await
|
||||
.map_err(|err| ImportError::Internal(err.into()))?;
|
||||
|
@ -323,6 +390,7 @@ async fn download_and_unzip_file(
|
|||
let unzip_file = unzip_stream(zip_reader, output_file_path)
|
||||
.await
|
||||
.map_err(ImportError::Internal)?;
|
||||
|
||||
Ok(unzip_file.unzip_dir_path)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,11 +2,19 @@ use crate::error::WorkerError;
|
|||
use anyhow::anyhow;
|
||||
use aws_sdk_s3::error::SdkError;
|
||||
|
||||
use anyhow::Result;
|
||||
use aws_sdk_s3::operation::get_object::GetObjectError;
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use axum::async_trait;
|
||||
use futures::AsyncReadExt;
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[async_trait]
|
||||
pub trait S3Client: Send + Sync {
|
||||
|
@ -122,3 +130,66 @@ pub struct S3StreamResponse {
|
|||
pub content_type: Option<String>,
|
||||
pub content_length: Option<i64>,
|
||||
}
|
||||
|
||||
pub struct AutoRemoveDownloadedFile(PathBuf);
|
||||
|
||||
impl AsRef<Path> for AutoRemoveDownloadedFile {
|
||||
fn as_ref(&self) -> &Path {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<PathBuf> for AutoRemoveDownloadedFile {
|
||||
fn as_ref(&self) -> &PathBuf {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for AutoRemoveDownloadedFile {
|
||||
type Target = PathBuf;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for AutoRemoveDownloadedFile {
|
||||
fn drop(&mut self) {
|
||||
let path = self.0.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = fs::remove_file(&path).await {
|
||||
error!(
|
||||
"Failed to delete the auto remove downloaded file: {:?}, error: {}",
|
||||
path, err
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_file(
|
||||
storage_dir: &Path,
|
||||
stream: Box<dyn futures::AsyncBufRead + Unpin + Send>,
|
||||
) -> Result<AutoRemoveDownloadedFile, anyhow::Error> {
|
||||
let zip_file_path = storage_dir.join(format!("{}.zip", Uuid::new_v4()));
|
||||
write_stream_to_file(&zip_file_path, stream).await?;
|
||||
Ok(AutoRemoveDownloadedFile(zip_file_path))
|
||||
}
|
||||
|
||||
pub async fn write_stream_to_file(
|
||||
file_path: &PathBuf,
|
||||
mut stream: Box<dyn futures::AsyncBufRead + Unpin + Send>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let mut file = File::create(file_path).await?;
|
||||
let mut buffer = vec![0u8; 1_048_576];
|
||||
loop {
|
||||
let bytes_read = stream.read(&mut buffer).await?;
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
file.write_all(&buffer[..bytes_read]).await?;
|
||||
}
|
||||
file.flush().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ use futures_util::StreamExt;
|
|||
use shared_entity::dto::import_dto::{ImportTaskDetail, ImportTaskStatus, UserImportTask};
|
||||
use shared_entity::response::{AppResponse, JsonAppResponse};
|
||||
use std::env::temp_dir;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tracing::{error, info, trace};
|
||||
|
@ -73,45 +74,20 @@ async fn import_data_handler(
|
|||
.and_then(|s| s.parse::<usize>().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
let mut workspace_name = "".to_string();
|
||||
let file_path = temp_dir().join(format!("import_data_{}.zip", Uuid::new_v4()));
|
||||
let file = write_multiple_part(&mut payload, file_path).await?;
|
||||
|
||||
// file_name must be unique
|
||||
let file_name = format!("{}.zip", Uuid::new_v4());
|
||||
let file_path = temp_dir().join(&file_name);
|
||||
|
||||
let mut file_size = 0;
|
||||
let mut file = File::create(&file_path).await?;
|
||||
while let Some(item) = payload.next().await {
|
||||
let mut field = item?;
|
||||
workspace_name = field
|
||||
.content_disposition()
|
||||
.and_then(|c| c.get_name().map(|f| f.to_string()))
|
||||
.unwrap_or_else(|| format!("import-{}", chrono::Local::now().format("%d/%m/%Y %H:%M")));
|
||||
|
||||
while let Some(chunk) = field.next().await {
|
||||
let data = chunk?;
|
||||
file_size += data.len();
|
||||
file.write_all(&data).await?;
|
||||
}
|
||||
}
|
||||
file.shutdown().await?;
|
||||
drop(file);
|
||||
|
||||
if workspace_name.is_empty() {
|
||||
return Err(AppError::InvalidRequest("Invalid file".to_string()).into());
|
||||
}
|
||||
|
||||
if content_length != file_size {
|
||||
if content_length != file.size {
|
||||
trace!(
|
||||
"Import file fail. The Content-Length:{} doesn't match file size:{}",
|
||||
content_length,
|
||||
file_size
|
||||
file.size
|
||||
);
|
||||
|
||||
return Err(
|
||||
AppError::InvalidRequest(format!(
|
||||
"Content-Length:{} doesn't match file size:{}",
|
||||
content_length, file_size
|
||||
content_length, file.size
|
||||
))
|
||||
.into(),
|
||||
);
|
||||
|
@ -123,16 +99,16 @@ async fn import_data_handler(
|
|||
&state.collab_access_control_storage,
|
||||
&user_uuid,
|
||||
uid,
|
||||
&workspace_name,
|
||||
&file.name,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let workspace_id = workspace.workspace_id.to_string();
|
||||
info!(
|
||||
"User:{} import data:{} to new workspace:{}, name:{}",
|
||||
uid, file_size, workspace_id, workspace_name,
|
||||
uid, file.size, workspace_id, file.name,
|
||||
);
|
||||
let stream = ByteStream::from_path(&file_path).await.map_err(|e| {
|
||||
let stream = ByteStream::from_path(&file.file_path).await.map_err(|e| {
|
||||
AppError::Internal(anyhow!("Failed to create ByteStream from file path: {}", e))
|
||||
})?;
|
||||
state
|
||||
|
@ -140,20 +116,13 @@ async fn import_data_handler(
|
|||
.put_blob_as_content_type(&workspace_id, stream, "zip")
|
||||
.await?;
|
||||
|
||||
// delete the file after uploading
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = tokio::fs::remove_file(file_path).await {
|
||||
error!("Failed to delete file after uploading: {}", err);
|
||||
}
|
||||
});
|
||||
|
||||
create_upload_task(
|
||||
uid,
|
||||
&user_name,
|
||||
&user_email,
|
||||
&workspace_id,
|
||||
&workspace_name,
|
||||
file_size,
|
||||
&file.name,
|
||||
file.size,
|
||||
&host,
|
||||
&state.redis_connection_manager,
|
||||
&state.pg_pool,
|
||||
|
@ -163,6 +132,61 @@ async fn import_data_handler(
|
|||
Ok(AppResponse::Ok().into())
|
||||
}
|
||||
|
||||
struct AutoDeletedFile {
|
||||
name: String,
|
||||
file_path: PathBuf,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
impl Drop for AutoDeletedFile {
|
||||
fn drop(&mut self) {
|
||||
let path = self.file_path.clone();
|
||||
tokio::spawn(async move {
|
||||
trace!("[AutoDeletedFile]: delete file: {:?}", path);
|
||||
if let Err(err) = tokio::fs::remove_file(&path).await {
|
||||
error!(
|
||||
"Failed to delete the auto deleted file: {:?}, error: {}",
|
||||
path, err
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_multiple_part(
|
||||
payload: &mut Multipart,
|
||||
file_path: PathBuf,
|
||||
) -> Result<AutoDeletedFile, AppError> {
|
||||
let mut file_name = "".to_string();
|
||||
let mut file_size = 0;
|
||||
let mut file = File::create(&file_path).await?;
|
||||
while let Some(Ok(mut field)) = payload.next().await {
|
||||
file_name = field
|
||||
.content_disposition()
|
||||
.and_then(|c| c.get_name().map(|f| f.to_string()))
|
||||
.unwrap_or_else(|| format!("import-{}", chrono::Local::now().format("%d/%m/%Y %H:%M")));
|
||||
|
||||
while let Some(Ok(data)) = field.next().await {
|
||||
file_size += data.len();
|
||||
file.write_all(&data).await?;
|
||||
}
|
||||
}
|
||||
file.shutdown().await?;
|
||||
drop(file);
|
||||
|
||||
if file_name.is_empty() {
|
||||
return Err(AppError::InvalidRequest(
|
||||
"Can not get the file name".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(AutoDeletedFile {
|
||||
name: file_name,
|
||||
file_path,
|
||||
size: file_size,
|
||||
})
|
||||
}
|
||||
|
||||
fn get_host_from_request(req: &HttpRequest) -> String {
|
||||
req
|
||||
.headers()
|
||||
|
|
46
src/biz/data_import/mod.rs
Normal file
46
src/biz/data_import/mod.rs
Normal file
|
@ -0,0 +1,46 @@
|
|||
use actix_multipart::Multipart;
|
||||
use anyhow::Result;
|
||||
use async_zip::base::write::ZipFileWriter;
|
||||
use async_zip::{Compression, ZipEntryBuilder};
|
||||
use futures_lite::{AsyncWriteExt, StreamExt};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::File;
|
||||
use tokio_util::compat::TokioAsyncWriteCompatExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn create_archive(
|
||||
mut body: Multipart,
|
||||
file_path: &PathBuf,
|
||||
) -> Result<(String, usize), anyhow::Error> {
|
||||
let mut file_name = "".to_string();
|
||||
let mut file_size = 0;
|
||||
|
||||
let archive = File::create(file_path).await?.compat_write();
|
||||
let mut writer = ZipFileWriter::new(archive);
|
||||
|
||||
while let Some(Ok(mut field)) = body.next().await {
|
||||
let name = match field.content_disposition().and_then(|c| c.get_filename()) {
|
||||
Some(filename) => sanitize_filename::sanitize(filename),
|
||||
None => Uuid::new_v4().to_string(),
|
||||
};
|
||||
|
||||
if file_name.is_empty() {
|
||||
file_name = field
|
||||
.content_disposition()
|
||||
.and_then(|c| c.get_name().map(|f| f.to_string()))
|
||||
.unwrap_or_else(|| format!("import-{}", chrono::Local::now().format("%d/%m/%Y %H:%M")));
|
||||
}
|
||||
|
||||
// Build the zip entry
|
||||
let builder = ZipEntryBuilder::new(name.into(), Compression::Deflate);
|
||||
let mut entry_writer = writer.write_entry_stream(builder).await?;
|
||||
while let Some(Ok(chunk)) = field.next().await {
|
||||
file_size += chunk.len();
|
||||
entry_writer.write_all(&chunk).await?;
|
||||
}
|
||||
entry_writer.close().await?;
|
||||
}
|
||||
writer.close().await?;
|
||||
Ok((file_name, file_size))
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
pub mod access_request;
|
||||
pub mod chat;
|
||||
pub mod collab;
|
||||
pub mod data_import;
|
||||
pub mod pg_listener;
|
||||
pub mod search;
|
||||
pub mod template;
|
||||
|
|
Loading…
Add table
Reference in a new issue