chore: delete upload task when upload size exceed limit (#6841)

* chore: delete upload tasks

* test: update

* chore: bump client api

* chore: fix test
This commit is contained in:
Nathan.fooo 2024-11-22 18:18:24 +08:00 committed by Lucas.Xu
parent a0d8711d5c
commit 3d8e6ebd72
26 changed files with 431 additions and 246 deletions

View file

@ -172,7 +172,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bincode",
@ -192,7 +192,7 @@ dependencies = [
[[package]]
name = "appflowy-ai-client"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bytes",
@ -888,7 +888,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"again",
"anyhow",
@ -943,7 +943,7 @@ dependencies = [
[[package]]
name = "client-api-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"collab-entity",
"collab-rt-entity",
@ -956,7 +956,7 @@ dependencies = [
[[package]]
name = "client-websocket"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"futures-channel",
"futures-util",
@ -1256,7 +1256,7 @@ dependencies = [
[[package]]
name = "collab-rt-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bincode",
@ -1281,7 +1281,7 @@ dependencies = [
[[package]]
name = "collab-rt-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"async-trait",
@ -1678,7 +1678,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",
@ -3266,7 +3266,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"futures-util",
@ -3283,7 +3283,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",
@ -3715,7 +3715,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bytes",
@ -6400,7 +6400,7 @@ dependencies = [
[[package]]
name = "shared-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",

View file

@ -59,7 +59,7 @@ collab-importer = { version = "0.1" }
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "e9c374bfe2440d53a58aee21dca7c7af048fff85" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "0ec12c5f2fdd1bce0a0457eafb9963532b5208b3" }
[dependencies]
serde_json.workspace = true

View file

@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bincode",
@ -183,7 +183,7 @@ dependencies = [
[[package]]
name = "appflowy-ai-client"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bytes",
@ -877,7 +877,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"again",
"anyhow",
@ -932,7 +932,7 @@ dependencies = [
[[package]]
name = "client-api-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"collab-entity",
"collab-rt-entity",
@ -945,7 +945,7 @@ dependencies = [
[[package]]
name = "client-websocket"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"futures-channel",
"futures-util",
@ -1254,7 +1254,7 @@ dependencies = [
[[package]]
name = "collab-rt-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bincode",
@ -1279,7 +1279,7 @@ dependencies = [
[[package]]
name = "collab-rt-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"async-trait",
@ -1683,7 +1683,7 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",
@ -3348,7 +3348,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"futures-util",
@ -3365,7 +3365,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",
@ -3802,7 +3802,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bytes",
@ -6480,7 +6480,7 @@ dependencies = [
[[package]]
name = "shared-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",

View file

@ -58,7 +58,7 @@ collab-importer = { version = "0.1" }
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "e9c374bfe2440d53a58aee21dca7c7af048fff85" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "0ec12c5f2fdd1bce0a0457eafb9963532b5208b3" }
[dependencies]
serde_json.workspace = true

View file

@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bincode",
@ -183,7 +183,7 @@ dependencies = [
[[package]]
name = "appflowy-ai-client"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bytes",
@ -780,7 +780,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"again",
"anyhow",
@ -835,7 +835,7 @@ dependencies = [
[[package]]
name = "client-api-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"collab-entity",
"collab-rt-entity",
@ -848,7 +848,7 @@ dependencies = [
[[package]]
name = "client-websocket"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"futures-channel",
"futures-util",
@ -1117,7 +1117,7 @@ dependencies = [
[[package]]
name = "collab-rt-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bincode",
@ -1142,7 +1142,7 @@ dependencies = [
[[package]]
name = "collab-rt-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"async-trait",
@ -1537,7 +1537,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",
@ -2980,7 +2980,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"futures-util",
@ -2997,7 +2997,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",
@ -3359,7 +3359,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"bytes",
@ -5665,7 +5665,7 @@ dependencies = [
[[package]]
name = "shared-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=e9c374bfe2440d53a58aee21dca7c7af048fff85#e9c374bfe2440d53a58aee21dca7c7af048fff85"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=0ec12c5f2fdd1bce0a0457eafb9963532b5208b3#0ec12c5f2fdd1bce0a0457eafb9963532b5208b3"
dependencies = [
"anyhow",
"app-error",

View file

@ -106,8 +106,8 @@ dashmap = "6.0.1"
# Run the script.add_workspace_members:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "e9c374bfe2440d53a58aee21dca7c7af048fff85" }
client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "e9c374bfe2440d53a58aee21dca7c7af048fff85" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "0ec12c5f2fdd1bce0a0457eafb9963532b5208b3" }
client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "0ec12c5f2fdd1bce0a0457eafb9963532b5208b3" }
[profile.dev]
opt-level = 0

View file

@ -4,6 +4,13 @@ use collab::preclude::Collab;
use collab_document::blocks::DocumentData;
use collab_document::document::Document;
use collab_entity::CollabType;
use flowy_core::config::AppFlowyCoreConfig;
use flowy_core::AppFlowyCore;
use flowy_notification::register_notification_sender;
use flowy_server::AppFlowyServer;
use flowy_user::entities::AuthenticatorPB;
use flowy_user::errors::FlowyError;
use lib_dispatch::runtime::AFPluginRuntime;
use nanoid::nanoid;
use semver::Version;
use std::env::temp_dir;
@ -15,14 +22,6 @@ use tokio::select;
use tokio::task::LocalSet;
use tokio::time::sleep;
use flowy_core::config::AppFlowyCoreConfig;
use flowy_core::AppFlowyCore;
use flowy_notification::register_notification_sender;
use flowy_server::AppFlowyServer;
use flowy_user::entities::AuthenticatorPB;
use flowy_user::errors::FlowyError;
use lib_dispatch::runtime::AFPluginRuntime;
use crate::user_event::TestNotificationSender;
mod chat_event;
@ -43,6 +42,8 @@ pub struct EventIntegrationTest {
local_set: Arc<LocalSet>,
}
pub const SINGLE_FILE_UPLOAD_SIZE: usize = 15 * 1024 * 1024;
impl EventIntegrationTest {
pub async fn new() -> Self {
Self::new_with_name(nanoid!(6)).await
@ -76,7 +77,7 @@ impl EventIntegrationTest {
pub async fn new_with_user_data_path(path_buf: PathBuf, name: String) -> Self {
let path = path_buf.to_str().unwrap().to_string();
let device_id = uuid::Uuid::new_v4().to_string();
let config = AppFlowyCoreConfig::new(
let mut config = AppFlowyCoreConfig::new(
Version::new(0, 7, 0),
path.clone(),
path,
@ -92,6 +93,10 @@ impl EventIntegrationTest {
// "lib_dispatch".to_string(),
],
);
if let Some(cloud_config) = config.cloud_config.as_mut() {
cloud_config.maximum_upload_file_size_in_bytes = Some(SINGLE_FILE_UPLOAD_SIZE as u64);
}
Self::new_with_config(config).await
}

View file

@ -444,6 +444,7 @@ pub async fn use_localhost_af_cloud() {
base_url,
ws_base_url,
gotrue_url,
maximum_upload_file_size_in_bytes: None,
}
.write_env();
std::env::set_var("GOTRUE_ADMIN_EMAIL", "admin@example.com");

View file

@ -1,6 +1,6 @@
use crate::document::generate_random_bytes;
use event_integration_test::user_event::use_localhost_af_cloud;
use event_integration_test::EventIntegrationTest;
use event_integration_test::{EventIntegrationTest, SINGLE_FILE_UPLOAD_SIZE};
use flowy_storage_pub::storage::FileUploadState;
use lib_infra::util::md5;
use std::env::temp_dir;
@ -24,7 +24,7 @@ async fn af_cloud_upload_big_file_test() {
let (created_upload, rx) = test
.storage_manager
.storage_service
.create_upload(&workspace_id, parent_dir, &file_path, false)
.create_upload(&workspace_id, parent_dir, &file_path)
.await
.unwrap();
@ -85,14 +85,14 @@ async fn af_cloud_upload_6_files_test() {
let mut created_uploads = vec![];
let mut receivers = vec![];
for file_size in [1, 2, 5, 8, 12, 20] {
for file_size in [1, 2, 5, 8, 10, 12] {
let file_path = generate_file_with_bytes_len(file_size * 1024 * 1024)
.await
.0;
let (created_upload, rx) = test
.storage_manager
.storage_service
.create_upload(&workspace_id, "temp_test", &file_path, false)
.create_upload(&workspace_id, "temp_test", &file_path)
.await
.unwrap();
receivers.push(rx.unwrap());
@ -132,6 +132,69 @@ async fn af_cloud_upload_6_files_test() {
assert_eq!(uploads.lock().await.len(), 0);
}
#[tokio::test]
async fn af_cloud_delete_upload_file_test() {
use_localhost_af_cloud().await;
let test = EventIntegrationTest::new().await;
test.af_cloud_sign_up().await;
let workspace_id = test.get_current_workspace().await.id;
// Pause sync
test.storage_manager.update_network_reachable(false);
let mut created_uploads = vec![];
let mut receivers = vec![];
// file that exceeds limit will be deleted automatically
let exceed_file_limit_size = SINGLE_FILE_UPLOAD_SIZE + 1;
for file_size in [8 * 1024 * 1024, exceed_file_limit_size, 10 * 1024 * 1024] {
let file_path = generate_file_with_bytes_len(file_size).await.0;
let (created_upload, rx) = test
.storage_manager
.storage_service
.create_upload(&workspace_id, "temp_test", &file_path)
.await
.unwrap();
receivers.push(rx.unwrap());
created_uploads.push(created_upload);
let _ = fs::remove_file(file_path).await;
}
test
.storage_manager
.storage_service
.delete_object(created_uploads[0].clone().url)
.await
.unwrap();
let mut handles = vec![];
// start sync
test.storage_manager.update_network_reachable(true);
for mut receiver in receivers {
let handle = tokio::spawn(async move {
while let Ok(Ok(value)) = timeout(Duration::from_secs(60), receiver.recv()).await {
if let FileUploadState::Finished { file_id } = value {
println!("file_id: {} was uploaded", file_id);
break;
}
}
});
handles.push(handle);
}
// join all handles
futures::future::join_all(handles).await;
let tasks = test.storage_manager.get_all_tasks().await.unwrap();
assert!(tasks.is_empty());
let state = test
.storage_manager
.get_file_state(&created_uploads[2].file_id)
.await
.unwrap();
assert!(matches!(state, FileUploadState::Finished { .. }));
}
async fn generate_file_with_bytes_len(len: usize) -> (String, Vec<u8>) {
let data = generate_random_bytes(len);
let file_dir = temp_dir().join(uuid::Uuid::new_v4().to_string());

View file

@ -27,7 +27,7 @@ pub struct AppFlowyCoreConfig {
/// the origin_application_path.
pub application_path: String,
pub(crate) log_filter: String,
cloud_config: Option<AFCloudConfiguration>,
pub cloud_config: Option<AFCloudConfiguration>,
}
impl fmt::Debug for AppFlowyCoreConfig {

View file

@ -11,7 +11,6 @@ use flowy_server::af_cloud::define::ServerUser;
use flowy_server::af_cloud::AppFlowyCloudServer;
use flowy_server::local_server::{LocalServer, LocalServerDB};
use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
use flowy_server_pub::af_cloud_config::AFCloudConfiguration;
use flowy_server_pub::AuthenticatorType;
use flowy_sqlite::kv::KVStorePreferences;
use flowy_user_pub::entities::*;
@ -123,7 +122,9 @@ impl ServerProvider {
Ok::<Arc<dyn AppFlowyServer>, FlowyError>(server)
},
Server::AppFlowyCloud => {
let config = AFCloudConfiguration::from_env()?;
let config = self.config.cloud_config.clone().ok_or_else(|| {
FlowyError::internal().with_context("AppFlowyCloud configuration is missing")
})?;
let server = Arc::new(AppFlowyCloudServer::new(
config,
self.user_enable_sync.load(Ordering::Acquire),

View file

@ -106,11 +106,12 @@ impl StorageCloudService for ServerProvider {
parent_dir: &str,
file_id: &str,
content_type: &str,
file_size: u64,
) -> Result<CreateUploadResponse, FlowyError> {
let server = self.get_server();
let storage = server?.file_storage().ok_or(FlowyError::internal())?;
let server = self.get_server()?;
let storage = server.file_storage().ok_or(FlowyError::internal())?;
storage
.create_upload(workspace_id, parent_dir, file_id, content_type)
.create_upload(workspace_id, parent_dir, file_id, content_type, file_size)
.await
}

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use anyhow::Context;
use client_api::entity::billing_dto::SubscriptionPlan;
use tracing::{event, trace};
use tracing::{event, info};
use collab_entity::CollabType;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
@ -204,7 +204,7 @@ impl UserStatusCallback for UserStatusCallbackImpl {
}
fn did_update_network(&self, reachable: bool) {
trace!("Notify did update network: reachable: {}", reachable);
info!("Notify did update network: reachable: {}", reachable);
self.collab_builder.update_network(reachable);
self.storage_manager.update_network_reachable(reachable);
}

View file

@ -490,10 +490,10 @@ pub(crate) async fn delete_file_handler(
) -> FlowyResult<()> {
let DownloadFilePB {
url,
local_file_path,
local_file_path: _,
} = params.try_into_inner()?;
let manager = upgrade_document(manager)?;
manager.delete_file(local_file_path, url).await
manager.delete_file(url).await
}
pub(crate) async fn set_awareness_local_state_handler(

View file

@ -417,7 +417,7 @@ impl DocumentManager {
) -> FlowyResult<CreatedUpload> {
let storage_service = self.storage_service_upgrade()?;
let upload = storage_service
.create_upload(&workspace_id, document_id, local_file_path, false)
.create_upload(&workspace_id, document_id, local_file_path)
.await?
.0;
Ok(upload)
@ -429,9 +429,9 @@ impl DocumentManager {
Ok(())
}
pub async fn delete_file(&self, local_file_path: String, url: String) -> FlowyResult<()> {
pub async fn delete_file(&self, url: String) -> FlowyResult<()> {
let storage_service = self.storage_service_upgrade()?;
storage_service.delete_object(url, local_file_path)?;
storage_service.delete_object(url).await?;
Ok(())
}

View file

@ -186,7 +186,7 @@ pub struct DocumentTestFileStorageService;
#[async_trait]
impl StorageService for DocumentTestFileStorageService {
fn delete_object(&self, _url: String, _local_file_path: String) -> FlowyResult<()> {
async fn delete_object(&self, _url: String) -> FlowyResult<()> {
todo!()
}
@ -199,7 +199,6 @@ impl StorageService for DocumentTestFileStorageService {
_workspace_id: &str,
_parent_dir: &str,
_local_file_path: &str,
_upload_immediately: bool,
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), flowy_error::FlowyError> {
todo!()
}

View file

@ -13,6 +13,8 @@ pub struct AFCloudConfiguration {
pub base_url: String,
pub ws_base_url: String,
pub gotrue_url: String,
#[serde(default)]
pub maximum_upload_file_size_in_bytes: Option<u64>,
}
impl Display for AFCloudConfiguration {
@ -57,6 +59,7 @@ impl AFCloudConfiguration {
base_url,
ws_base_url,
gotrue_url,
maximum_upload_file_size_in_bytes: None,
})
}

View file

@ -1,15 +1,23 @@
use crate::af_cloud::AFServer;
use client_api::entity::{CompleteUploadRequest, CreateUploadRequest};
use flowy_error::FlowyError;
use flowy_error::{ErrorCode, FlowyError};
use flowy_storage_pub::cloud::{ObjectIdentity, ObjectValue, StorageCloudService};
use flowy_storage_pub::storage::{CompletedPartRequest, CreateUploadResponse, UploadPartResponse};
use lib_infra::async_trait::async_trait;
pub struct AFCloudFileStorageServiceImpl<T>(pub T);
pub struct AFCloudFileStorageServiceImpl<T> {
pub client: T,
/// Only use in debug mode
pub maximum_upload_file_size_in_bytes: Option<u64>,
}
impl<T> AFCloudFileStorageServiceImpl<T> {
pub fn new(client: T) -> Self {
Self(client)
pub fn new(client: T, maximum_upload_file_size_in_bytes: Option<u64>) -> Self {
Self {
client,
maximum_upload_file_size_in_bytes,
}
}
}
@ -21,25 +29,25 @@ where
async fn get_object_url(&self, object_id: ObjectIdentity) -> Result<String, FlowyError> {
let file_name = format!("{}.{}", object_id.file_id, object_id.ext);
let url = self
.0
.client
.try_get_client()?
.get_blob_url(&object_id.workspace_id, &file_name);
Ok(url)
}
async fn put_object(&self, url: String, file: ObjectValue) -> Result<(), FlowyError> {
let client = self.0.try_get_client()?;
let client = self.client.try_get_client()?;
client.put_blob(&url, file.raw, &file.mime).await?;
Ok(())
}
async fn delete_object(&self, url: &str) -> Result<(), FlowyError> {
self.0.try_get_client()?.delete_blob(url).await?;
self.client.try_get_client()?.delete_blob(url).await?;
Ok(())
}
async fn get_object(&self, url: String) -> Result<ObjectValue, FlowyError> {
let (mime, raw) = self.0.try_get_client()?.get_blob(&url).await?;
let (mime, raw) = self.client.try_get_client()?.get_blob(&url).await?;
Ok(ObjectValue {
raw: raw.into(),
mime,
@ -53,14 +61,14 @@ where
file_id: &str,
) -> Result<String, FlowyError> {
let url = self
.0
.client
.try_get_client()?
.get_blob_url_v1(workspace_id, parent_dir, file_id);
Ok(url)
}
async fn parse_object_url_v1(&self, url: &str) -> Option<(String, String, String)> {
let value = self.0.try_get_client().ok()?.parse_blob_url_v1(url)?;
let value = self.client.try_get_client().ok()?.parse_blob_url_v1(url)?;
Some(value)
}
@ -70,17 +78,31 @@ where
parent_dir: &str,
file_id: &str,
content_type: &str,
file_size: u64,
) -> Result<CreateUploadResponse, FlowyError> {
let parent_dir = parent_dir.to_string();
let content_type = content_type.to_string();
let file_id = file_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.client.try_get_client();
let client = try_get_client?;
let req = CreateUploadRequest {
file_id,
parent_dir,
content_type,
file_size: Some(file_size),
};
if cfg!(debug_assertions) {
if let Some(maximum_upload_size) = self.maximum_upload_file_size_in_bytes {
if file_size > maximum_upload_size {
return Err(FlowyError::new(
ErrorCode::SingleUploadLimitExceeded,
"File size exceeds the maximum limit",
));
}
}
}
let resp = client.create_upload(workspace_id, req).await?;
Ok(resp)
}
@ -94,7 +116,7 @@ where
part_number: i32,
body: Vec<u8>,
) -> Result<UploadPartResponse, FlowyError> {
let try_get_client = self.0.try_get_client();
let try_get_client = self.client.try_get_client();
let client = try_get_client?;
let resp = client
.upload_part(
@ -121,7 +143,7 @@ where
let parent_dir = parent_dir.to_string();
let upload_id = upload_id.to_string();
let file_id = file_id.to_string();
let try_get_client = self.0.try_get_client();
let try_get_client = self.client.try_get_client();
let client = try_get_client?;
let request = CompleteUploadRequest {
file_id,

View file

@ -270,7 +270,10 @@ impl AppFlowyServer for AppFlowyCloudServer {
let client = AFServerImpl {
client: self.get_client(),
};
Some(Arc::new(AFCloudFileStorageServiceImpl::new(client)))
Some(Arc::new(AFCloudFileStorageServiceImpl::new(
client,
self.config.maximum_upload_file_size_in_bytes,
)))
}
fn search_service(&self) -> Option<Arc<dyn SearchCloudService>> {

View file

@ -137,7 +137,7 @@ mod tests {
file_path.push("test_small_file");
let mut file = File::create(&file_path).await.unwrap();
file.write_all(&vec![0; 1 * 1024 * 1024]).await.unwrap(); // 1 MB
file.write_all(&vec![0; 1024 * 1024]).await.unwrap(); // 1 MB
file.flush().await.unwrap();
// Create ChunkedBytes instance
@ -148,7 +148,7 @@ mod tests {
// Validate total chunks and read the data
assert_eq!(chunked_bytes.total_chunks(), 1); // Only 1 chunk due to file size
let chunk = chunked_bytes.next_chunk().await.unwrap().unwrap();
assert_eq!(chunk.len(), 1 * 1024 * 1024); // The full 1 MB
assert_eq!(chunk.len(), 1024 * 1024); // The full 1 MB
// Ensure no more chunks are available
assert!(chunked_bytes.next_chunk().await.is_none());
@ -237,7 +237,7 @@ mod tests {
// Read the second chunk
let chunk2 = chunked_bytes.next_chunk().await.unwrap().unwrap();
assert_eq!(chunk2.len(), 1 * 1024 * 1024); // Partial chunk
assert_eq!(chunk2.len(), 1024 * 1024); // Partial chunk
// Ensure no more chunks are available
assert!(chunked_bytes.next_chunk().await.is_none());

View file

@ -52,6 +52,7 @@ pub trait StorageCloudService: Send + Sync {
file_id: &str,
) -> FlowyResult<String>;
/// Return workspace_id, parent_dir, file_id
async fn parse_object_url_v1(&self, url: &str) -> Option<(String, String, String)>;
async fn create_upload(
@ -60,6 +61,7 @@ pub trait StorageCloudService: Send + Sync {
parent_dir: &str,
file_id: &str,
content_type: &str,
file_size: u64,
) -> Result<CreateUploadResponse, FlowyError>;
async fn upload_part(

View file

@ -9,7 +9,7 @@ use tokio::sync::broadcast;
#[async_trait]
pub trait StorageService: Send + Sync {
fn delete_object(&self, url: String, local_file_path: String) -> FlowyResult<()>;
async fn delete_object(&self, url: String) -> FlowyResult<()>;
fn download_object(&self, url: String, local_file_path: String) -> FlowyResult<()>;
@ -18,7 +18,6 @@ pub trait StorageService: Send + Sync {
workspace_id: &str,
parent_dir: &str,
local_file_path: &str,
upload_immediately: bool,
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError>;
async fn start_upload(&self, record: &BoxAny) -> Result<(), FlowyError>;
@ -127,6 +126,7 @@ impl ProgressNotifier {
}
}
#[derive(Clone)]
pub struct CreatedUpload {
pub url: String,
pub file_id: String,

View file

@ -2,9 +2,10 @@ use crate::entities::FileStatePB;
use crate::file_cache::FileTempStorage;
use crate::notification::{make_notification, StorageNotification};
use crate::sqlite_sql::{
batch_select_upload_file, delete_all_upload_parts, delete_upload_file, insert_upload_file,
insert_upload_part, is_upload_completed, select_upload_file, select_upload_parts,
update_upload_file_completed, update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
batch_select_upload_file, delete_all_upload_parts, delete_upload_file,
delete_upload_file_by_file_id, insert_upload_file, insert_upload_part, is_upload_completed,
is_upload_exist, select_upload_file, select_upload_parts, update_upload_file_completed,
update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
};
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
use allo_isolate::Isolate;
@ -25,7 +26,6 @@ use lib_infra::util::timestamp;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::sync::{broadcast, watch};
use tracing::{debug, error, info, instrument, trace};
@ -91,8 +91,6 @@ impl StorageManager {
let weak_uploader = Arc::downgrade(&uploader);
let cloned_user_service = user_service.clone();
tokio::spawn(async move {
// Start uploading after 20 seconds
tokio::time::sleep(Duration::from_secs(20)).await;
if let Some(uploader) = weak_uploader.upgrade() {
if let Err(err) = prepare_upload_task(uploader, cloned_user_service).await {
error!("prepare upload task failed: {}", err);
@ -172,8 +170,12 @@ impl StorageManager {
Some(FileStatePB { file_id, is_finish })
}
pub async fn initialize(&self, _workspace_id: &str) {
pub async fn initialize(&self, workspace_id: &str) {
self.enable_storage_write_access();
if let Err(err) = prepare_upload_task(self.uploader.clone(), self.user_service.clone()).await {
error!("prepare {} upload task failed: {}", workspace_id, err);
}
}
pub fn update_network_reachable(&self, reachable: bool) {
@ -205,33 +207,42 @@ impl StorageManager {
.await
}
/// Returns None if the file with given file_id is not exist
/// When delete a file, the progress notifier for given file_id will be deleted too
pub async fn get_file_state(&self, file_id: &str) -> Option<FileUploadState> {
self
.progress_notifiers
.get(file_id)
.and_then(|notifier| notifier.value().current_value.clone())
}
pub async fn get_all_tasks(&self) -> FlowyResult<Vec<UploadTask>> {
let tasks = self.uploader.all_tasks().await;
Ok(tasks)
}
}
async fn prepare_upload_task(
uploader: Arc<FileUploader>,
user_service: Arc<dyn StorageUserService>,
) -> FlowyResult<()> {
let uid = user_service.user_id()?;
let conn = user_service.sqlite_connection(uid)?;
let upload_files = batch_select_upload_file(conn, 100, false)?;
let tasks = upload_files
.into_iter()
.map(|upload_file| UploadTask::BackgroundTask {
workspace_id: upload_file.workspace_id,
file_id: upload_file.file_id,
parent_dir: upload_file.parent_dir,
created_at: upload_file.created_at,
retry_count: 0,
})
.collect::<Vec<_>>();
info!("[File] prepare upload task: {}", tasks.len());
uploader.queue_tasks(tasks).await;
if let Ok(uid) = user_service.user_id() {
let workspace_id = user_service.workspace_id()?;
let conn = user_service.sqlite_connection(uid)?;
let upload_files = batch_select_upload_file(conn, &workspace_id, 100, false)?;
let tasks = upload_files
.into_iter()
.map(|upload_file| UploadTask::BackgroundTask {
workspace_id: upload_file.workspace_id,
file_id: upload_file.file_id,
parent_dir: upload_file.parent_dir,
created_at: upload_file.created_at,
retry_count: 0,
})
.collect::<Vec<_>>();
info!("[File] prepare upload task: {}", tasks.len());
uploader.queue_tasks(tasks).await;
}
Ok(())
}
@ -247,24 +258,52 @@ pub struct StorageServiceImpl {
#[async_trait]
impl StorageService for StorageServiceImpl {
fn delete_object(&self, url: String, local_file_path: String) -> FlowyResult<()> {
let cloud_service = self.cloud_service.clone();
tokio::spawn(async move {
match tokio::fs::remove_file(&local_file_path).await {
Ok(_) => {
debug!("[File] deleted file from local disk: {}", local_file_path)
async fn delete_object(&self, url: String) -> FlowyResult<()> {
if let Some((workspace_id, parent_dir, file_id)) =
self.cloud_service.parse_object_url_v1(&url).await
{
info!(
"[File] delete object: workspace: {}, parent_dir: {}, file_id: {}",
workspace_id, parent_dir, file_id
);
self
.task_queue
.remove_task(&workspace_id, &parent_dir, &file_id)
.await;
trace!("[File] delete progress notifier: {}", file_id);
self.progress_notifiers.remove(&file_id);
match delete_upload_file_by_file_id(
self
.user_service
.sqlite_connection(self.user_service.user_id()?)?,
&workspace_id,
&parent_dir,
&file_id,
) {
Ok(Some(file)) => {
let file_path = file.local_file_path;
match tokio::fs::remove_file(&file_path).await {
Ok(_) => debug!("[File] deleted file from local disk: {}", file_path),
Err(err) => {
error!("[File] delete file at {} failed: {}", file_path, err);
},
}
},
Ok(None) => {
info!(
"[File]: can not find file record for url: {} when delete",
url
);
},
Err(err) => {
error!("[File] delete file at {} failed: {}", local_file_path, err);
error!("[File] delete upload file failed: {}", err);
},
}
if let Err(e) = cloud_service.delete_object(&url).await {
// TODO: add WAL to log the delete operation.
// keep a list of files to be deleted, and retry later
error!("[File] delete file failed: {}", e);
}
debug!("[File] deleted file from cloud: {}", url);
});
}
let _ = self.cloud_service.delete_object(&url).await;
Ok(())
}
@ -301,7 +340,6 @@ impl StorageService for StorageServiceImpl {
workspace_id: &str,
parent_dir: &str,
file_path: &str,
upload_immediately: bool,
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError> {
if workspace_id.is_empty() {
return Err(FlowyError::internal().with_context("workspace id is empty"));
@ -354,28 +392,18 @@ impl StorageService for StorageServiceImpl {
match insert_upload_file(conn, &record) {
Ok(_) => {
// 3. generate url for given file
if upload_immediately {
self
.task_queue
.queue_task(UploadTask::ImmediateTask {
local_file_path,
record,
retry_count: 3,
})
.await;
} else {
self
.task_queue
.queue_task(UploadTask::Task {
local_file_path,
record,
retry_count: 0,
})
.await;
}
self
.task_queue
.queue_task(UploadTask::Task {
local_file_path,
record,
retry_count: 3,
})
.await;
let notifier = ProgressNotifier::new(file_id.to_string());
let receiver = notifier.subscribe();
trace!("[File] create upload progress notifier: {}", file_id);
self
.progress_notifiers
.insert(file_id.to_string(), notifier);
@ -397,14 +425,21 @@ impl StorageService for StorageServiceImpl {
FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
})?;
start_upload(
&self.cloud_service,
&self.user_service,
&self.temp_storage,
file_record,
self.global_notifier.clone(),
)
.await?;
// If the file is already uploaded, skip the upload process
if !is_upload_exist(
self
.user_service
.sqlite_connection(self.user_service.user_id()?)?,
&file_record.upload_id,
)? {
info!(
"[File] skip upload, {} was deleted",
file_record.local_file_path
);
return Ok(());
}
start_upload(self, file_record).await?;
Ok(())
}
@ -421,16 +456,12 @@ impl StorageService for StorageServiceImpl {
.sqlite_connection(self.user_service.user_id()?)?;
if let Some(upload_file) = select_upload_file(&mut conn, workspace_id, parent_dir, file_id)? {
resume_upload(
&self.cloud_service,
&self.user_service,
&self.temp_storage,
upload_file,
self.global_notifier.clone(),
)
.await?;
resume_upload(self, upload_file).await?;
} else {
error!("[File] resume upload failed: record not found");
error!(
"[File] resume upload failed: can not found {}:{}",
parent_dir, file_id
);
}
Ok(())
}
@ -474,7 +505,7 @@ async fn create_upload_record(
// Calculate the total number of chunks
let num_chunk = calculate_offsets(file_size, MIN_CHUNK_SIZE).len();
let content_type = mime_guess::from_path(&file_path)
let content_type = mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string();
let file_id = FileId::from_path(&file_path.to_path_buf()).await?;
@ -496,12 +527,14 @@ async fn create_upload_record(
#[instrument(level = "debug", skip_all, err)]
async fn start_upload(
cloud_service: &Arc<dyn StorageCloudService>,
user_service: &Arc<dyn StorageUserService>,
temp_storage: &Arc<FileTempStorage>,
storage_service: &StorageServiceImpl,
upload_file: &UploadFileTable,
global_notifier: GlobalNotifier,
) -> FlowyResult<()> {
let temp_storage = &storage_service.temp_storage;
let user_service = &storage_service.user_service;
let global_notifier = storage_service.global_notifier.clone();
let cloud_service = &storage_service.cloud_service;
// 4. gather existing completed parts
let mut conn = user_service.sqlite_connection(user_service.user_id()?)?;
let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id)
@ -523,6 +556,10 @@ async fn start_upload(
}
}
}
let file_size = file_path
.metadata()
.map(|metadata| metadata.len())
.unwrap_or(0);
let mut chunked_bytes =
ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE).await?;
@ -559,10 +596,20 @@ async fn start_upload(
&upload_file.parent_dir,
&upload_file.file_id,
&upload_file.content_type,
file_size,
)
.await;
let file_url = cloud_service
.get_object_url_v1(
&upload_file.workspace_id,
&upload_file.parent_dir,
&upload_file.file_id,
)
.await?;
if let Err(err) = create_upload_resp_result.as_ref() {
handle_upload_error(user_service, &err, &upload_file.upload_id);
handle_upload_error(storage_service, err, &file_url).await;
}
let create_upload_resp = create_upload_resp_result?;
@ -602,13 +649,6 @@ async fn start_upload(
chunk_bytes.len() / 1000,
);
let file_url = cloud_service
.get_object_url_v1(
&upload_file.workspace_id,
&upload_file.parent_dir,
&upload_file.file_id,
)
.await?;
// start uploading parts
match upload_part(
cloud_service,
@ -633,8 +673,11 @@ async fn start_upload(
if progress_value >= 0.9 {
progress_value = 0.9;
}
let progress =
FileProgress::new_progress(file_url, upload_file.file_id.clone(), progress_value);
let progress = FileProgress::new_progress(
file_url.clone(),
upload_file.file_id.clone(),
progress_value,
);
trace!("[File] upload progress: {}", progress);
if let Err(err) = global_notifier.send(progress) {
@ -652,9 +695,9 @@ async fn start_upload(
"[File] {} failed to upload part: {}",
upload_file.file_id, err
);
handle_upload_error(user_service, &err, &upload_file.upload_id);
handle_upload_error(storage_service, &err, &file_url).await;
if let Err(err) = global_notifier.send(FileProgress::new_error(
file_url,
file_url.clone(),
upload_file.file_id.clone(),
err.msg.clone(),
)) {
@ -686,17 +729,17 @@ async fn start_upload(
)
.await;
if let Err(err) = complete_upload_result {
handle_upload_error(user_service, &err, &upload_file.upload_id);
handle_upload_error(storage_service, &err, &file_url).await;
return Err(err);
}
Ok(())
}
fn handle_upload_error(
user_service: &Arc<dyn StorageUserService>,
async fn handle_upload_error(
storage_service: &StorageServiceImpl,
err: &FlowyError,
upload_id: &str,
file_url: &str,
) {
if err.is_file_limit_exceeded() {
make_notification(StorageNotification::FileStorageLimitExceeded)
@ -705,13 +748,9 @@ fn handle_upload_error(
}
if err.is_single_file_limit_exceeded() {
info!("[File] file exceed limit:{}", upload_id);
if let Ok(user_id) = user_service.user_id() {
if let Ok(db_conn) = user_service.sqlite_connection(user_id) {
if let Err(err) = delete_upload_file(db_conn, upload_id) {
error!("[File] delete upload file:{} error:{}", upload_id, err);
}
}
info!("[File] file exceed limit:{}", file_url);
if let Err(err) = storage_service.delete_object(file_url.to_string()).await {
error!("[File] delete upload file:{} error:{}", file_url, err);
}
make_notification(StorageNotification::SingleFileLimitExceeded)
@ -722,11 +761,8 @@ fn handle_upload_error(
#[instrument(level = "debug", skip_all, err)]
async fn resume_upload(
cloud_service: &Arc<dyn StorageCloudService>,
user_service: &Arc<dyn StorageUserService>,
temp_storage: &Arc<FileTempStorage>,
storage_service: &StorageServiceImpl,
upload_file: UploadFileTable,
global_notifier: GlobalNotifier,
) -> FlowyResult<()> {
trace!(
"[File] resume upload for workspace: {}, parent_dir: {}, file_id: {}, local_file_path:{}",
@ -736,14 +772,7 @@ async fn resume_upload(
upload_file.local_file_path
);
start_upload(
cloud_service,
user_service,
temp_storage,
&upload_file,
global_notifier,
)
.await?;
start_upload(storage_service, &upload_file).await?;
Ok(())
}
@ -848,8 +877,8 @@ async fn complete_upload(
error!("[File] send global notifier failed: {}", send_err);
}
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
if let Err(err) = delete_all_upload_parts(conn, &upload_file.upload_id) {
let mut conn = user_service.sqlite_connection(user_service.user_id()?)?;
if let Err(err) = delete_all_upload_parts(&mut conn, &upload_file.upload_id) {
error!("[File] delete all upload parts failed: {}", err);
}
return Err(err);

View file

@ -6,7 +6,7 @@ use flowy_sqlite::{
diesel, AsChangeset, BoolExpressionMethods, DBConnection, ExpressionMethods, Identifiable,
Insertable, OptionalExtension, QueryDsl, Queryable, RunQueryDsl, SqliteConnection,
};
use tracing::warn;
use tracing::{trace, warn};
#[derive(Queryable, Insertable, AsChangeset, Identifiable, Debug, Clone)]
#[diesel(table_name = upload_file_table)]
@ -55,6 +55,7 @@ pub fn insert_upload_file(
mut conn: DBConnection,
upload_file: &UploadFileTable,
) -> FlowyResult<()> {
trace!("[File]: insert upload file: {:?}", upload_file);
match diesel::insert_into(upload_file_table::table)
.values(upload_file)
.execute(&mut *conn)
@ -97,6 +98,14 @@ pub fn update_upload_file_completed(mut conn: DBConnection, upload_id: &str) ->
Ok(())
}
pub fn is_upload_exist(mut conn: DBConnection, upload_id: &str) -> FlowyResult<bool> {
let result = upload_file_table::dsl::upload_file_table
.filter(upload_file_table::upload_id.eq(upload_id))
.first::<UploadFileTable>(&mut *conn)
.optional()?;
Ok(result.is_some())
}
pub fn is_upload_completed(
conn: &mut SqliteConnection,
workspace_id: &str,
@ -116,28 +125,65 @@ pub fn is_upload_completed(
Ok(result.is_some())
}
/// Delete upload file and its parts
pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
conn.immediate_transaction(|conn| {
diesel::delete(
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
)
.execute(&mut *conn)?;
if let Err(err) = diesel::delete(
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
)
.execute(&mut *conn)
{
warn!("Failed to delete upload parts: {:?}", err)
}
_delete_upload_file(upload_id, conn)?;
Ok::<_, FlowyError>(())
})?;
Ok(())
}
pub fn delete_all_upload_parts(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
pub fn delete_upload_file_by_file_id(
mut conn: DBConnection,
workspace_id: &str,
parent_dir: &str,
file_id: &str,
) -> FlowyResult<Option<UploadFileTable>> {
let file = conn.immediate_transaction(|conn| {
let file = select_upload_file(&mut *conn, workspace_id, parent_dir, file_id)?;
if let Some(file) = &file {
// when the upload is not started, the upload id will be empty
if file.upload_id.is_empty() {
diesel::delete(
upload_file_table::dsl::upload_file_table.filter(
upload_file_table::workspace_id
.eq(workspace_id)
.and(upload_file_table::parent_dir.eq(parent_dir))
.and(upload_file_table::file_id.eq(file_id)),
),
)
.execute(&mut *conn)?;
} else {
_delete_upload_file(&file.upload_id, &mut *conn)?;
}
}
Ok::<_, FlowyError>(file)
})?;
Ok(file)
}
fn _delete_upload_file(upload_id: &str, conn: &mut SqliteConnection) -> Result<(), FlowyError> {
if upload_id.is_empty() {
warn!("[File]: upload_id is empty when delete upload file");
return Ok(());
}
trace!("[File]: delete upload file: {}", upload_id);
diesel::delete(
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
)
.execute(&mut *conn)?;
if let Err(err) = delete_all_upload_parts(&mut *conn, upload_id) {
warn!("Failed to delete upload parts: {:?}", err)
}
Ok(())
}
pub fn delete_all_upload_parts(conn: &mut SqliteConnection, upload_id: &str) -> FlowyResult<()> {
diesel::delete(
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
)
@ -179,10 +225,12 @@ pub fn select_upload_parts(
pub fn batch_select_upload_file(
mut conn: DBConnection,
workspace_id: &str,
limit: i32,
is_finish: bool,
) -> FlowyResult<Vec<UploadFileTable>> {
let results = upload_file_table::dsl::upload_file_table
.filter(upload_file_table::workspace_id.eq(workspace_id))
.filter(upload_file_table::is_finish.eq(is_finish))
.order(upload_file_table::created_at.desc())
.limit(limit.into())

View file

@ -35,6 +35,24 @@ impl UploadTaskQueue {
self.tasks.write().await.push(task);
let _ = self.notifier.send_replace(Signal::Proceed);
}
pub async fn remove_task(&self, workspace_id: &str, parent_dir: &str, file_id: &str) {
let mut tasks = self.tasks.write().await;
tasks.retain(|task| match task {
UploadTask::BackgroundTask {
workspace_id: w_id,
parent_dir: p_dir,
file_id: f_id,
..
} => !(w_id == workspace_id && p_dir == parent_dir && f_id == file_id),
UploadTask::Task { record, .. } => {
!(record.workspace_id == workspace_id
&& record.parent_dir == parent_dir
&& record.file_id == file_id)
},
});
}
}
pub struct FileUploader {
@ -43,7 +61,7 @@ pub struct FileUploader {
max_uploads: u8,
current_uploads: AtomicU8,
pause_sync: AtomicBool,
has_exceeded_limit: Arc<AtomicBool>,
disable_upload: Arc<AtomicBool>,
}
impl Drop for FileUploader {
@ -64,10 +82,15 @@ impl FileUploader {
max_uploads: 3,
current_uploads: Default::default(),
pause_sync: Default::default(),
has_exceeded_limit: is_exceed_limit,
disable_upload: is_exceed_limit,
}
}
pub async fn all_tasks(&self) -> Vec<UploadTask> {
let tasks = self.queue.tasks.read().await;
tasks.iter().cloned().collect()
}
pub async fn queue_tasks(&self, tasks: Vec<UploadTask>) {
let mut queue_lock = self.queue.tasks.write().await;
for task in tasks {
@ -84,14 +107,14 @@ impl FileUploader {
pub fn disable_storage_write(&self) {
self
.has_exceeded_limit
.disable_upload
.store(true, std::sync::atomic::Ordering::SeqCst);
self.pause();
}
pub fn enable_storage_write(&self) {
self
.has_exceeded_limit
.disable_upload
.store(false, std::sync::atomic::Ordering::SeqCst);
self.resume();
}
@ -100,6 +123,7 @@ impl FileUploader {
self
.pause_sync
.store(false, std::sync::atomic::Ordering::SeqCst);
trace!("[File] Uploader resumed");
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(3));
}
@ -130,7 +154,7 @@ impl FileUploader {
}
if self
.has_exceeded_limit
.disable_upload
.load(std::sync::atomic::Ordering::SeqCst)
{
// If the storage limitation is enabled, do not proceed.
@ -152,12 +176,7 @@ impl FileUploader {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
match task {
UploadTask::ImmediateTask {
local_file_path,
record,
mut retry_count,
}
| UploadTask::Task {
UploadTask::Task {
local_file_path,
record,
mut retry_count,
@ -234,6 +253,9 @@ pub struct FileUploaderRunner;
impl FileUploaderRunner {
pub async fn run(weak_uploader: Weak<FileUploader>, mut notifier: watch::Receiver<Signal>) {
// Start uploading after 20 seconds
tokio::time::sleep(Duration::from_secs(20)).await;
loop {
// stops the runner if the notifier was closed.
if notifier.changed().await.is_err() {
@ -272,12 +294,8 @@ impl FileUploaderRunner {
}
}
#[derive(Clone)]
pub enum UploadTask {
ImmediateTask {
local_file_path: String,
record: UploadFileTable,
retry_count: u8,
},
Task {
local_file_path: String,
record: UploadFileTable,
@ -295,7 +313,6 @@ pub enum UploadTask {
impl UploadTask {
pub fn retry_count(&self) -> u8 {
match self {
UploadTask::ImmediateTask { retry_count, .. } => *retry_count,
UploadTask::Task { retry_count, .. } => *retry_count,
UploadTask::BackgroundTask { retry_count, .. } => *retry_count,
}
@ -307,7 +324,6 @@ impl Display for UploadTask {
match self {
UploadTask::Task { record, .. } => write!(f, "Task: {}", record.file_id),
UploadTask::BackgroundTask { file_id, .. } => write!(f, "BackgroundTask: {}", file_id),
UploadTask::ImmediateTask { record, .. } => write!(f, "Immediate Task: {}", record.file_id),
}
}
}
@ -317,9 +333,6 @@ impl Eq for UploadTask {}
impl PartialEq for UploadTask {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::ImmediateTask { record: lhs, .. }, Self::ImmediateTask { record: rhs, .. }) => {
lhs.local_file_path == rhs.local_file_path
},
(Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => {
lhs.local_file_path == rhs.local_file_path
},
@ -349,11 +362,6 @@ impl PartialOrd for UploadTask {
impl Ord for UploadTask {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Self::ImmediateTask { record: lhs, .. }, Self::ImmediateTask { record: rhs, .. }) => {
lhs.created_at.cmp(&rhs.created_at)
},
(_, Self::ImmediateTask { .. }) => Ordering::Less,
(Self::ImmediateTask { .. }, _) => Ordering::Greater,
(Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => {
lhs.created_at.cmp(&rhs.created_at)
},

View file

@ -42,7 +42,7 @@ async fn test_insert_new_upload() {
// select
let conn = db.get_connection().unwrap();
let records = batch_select_upload_file(conn, 100, false).unwrap();
let records = batch_select_upload_file(conn, &workspace_id, 100, false).unwrap();
assert_eq!(records.len(), 5);
// compare the upload id order is the same as upload_ids
@ -55,7 +55,7 @@ async fn test_insert_new_upload() {
}
let conn = db.get_connection().unwrap();
let records = batch_select_upload_file(conn, 100, false).unwrap();
let records = batch_select_upload_file(conn, &workspace_id, 100, false).unwrap();
assert!(records.is_empty());
}