feat: implement snapshot handler (#38)

This commit is contained in:
Nathan.fooo 2023-09-13 15:25:51 +08:00 committed by GitHub
parent 3aa4d20ad3
commit 107627f4d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 260 additions and 174 deletions

View file

@ -0,0 +1,34 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT sid as \"snapshot_id\", oid as \"object_id\", created_at\n FROM af_collab_snapshot where oid = $1 AND deleted_at IS NULL;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "snapshot_id",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "object_id",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "created_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "47d4c0cd4cbcf016e566a80f071dcc423989f1a3fa274033852e9075b721c57f"
}

View file

@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab_snapshot (oid, blob, len, encrypt, workspace_id)\n VALUES ($1, $2, $3, $4, $5)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Bytea",
"Int4",
"Int4",
"Uuid"
]
},
"nullable": []
},
"hash": "d1f845717b19636e61d1d96d7a5629754f3ded9bda9116953bd1b40bd80551ae"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT blob\n FROM af_collab_snapshot \n WHERE sid = $1 AND deleted_at IS NULL;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "blob",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false
]
},
"hash": "e098091c119895b2568865a3a08141378c227caa5816554b7edf53e2deef8f7b"
}

View file

@ -104,7 +104,7 @@ impl Client {
}
pub async fn create_collab(&self, params: InsertCollabParams) -> Result<(), AppError> {
let url = format!("{}/api/collab", self.base_url);
let url = format!("{}/api/collab/", self.base_url);
let resp = self
.http_client_with_auth(Method::POST, &url)?
.json(&params)
@ -114,7 +114,7 @@ impl Client {
}
pub async fn update_collab(&self, params: InsertCollabParams) -> Result<(), AppError> {
let url = format!("{}/api/collab", self.base_url);
let url = format!("{}/api/collab/", self.base_url);
let resp = self
.http_client_with_auth(Method::PUT, &url)?
.json(&params)
@ -124,7 +124,7 @@ impl Client {
}
pub async fn get_collab(&self, params: QueryCollabParams) -> Result<RawData, AppError> {
let url = format!("{}/api/collab", self.base_url);
let url = format!("{}/api/collab/", self.base_url);
let resp = self
.http_client_with_auth(Method::GET, &url)?
.json(&params)
@ -136,7 +136,7 @@ impl Client {
}
pub async fn delete_collab(&self, params: DeleteCollabParams) -> Result<(), AppError> {
let url = format!("{}/api/collab", self.base_url);
let url = format!("{}/api/collab/", self.base_url);
let resp = self
.http_client_with_auth(Method::DELETE, &url)?
.json(&params)

View file

@ -4,7 +4,10 @@ use std::sync::Arc;
use storage::collab::Result;
use storage::collab::{CollabStorage, StorageConfig};
use storage::error::StorageError;
use storage_entity::{InsertCollabParams, QueryCollabParams, RawData};
use storage_entity::{
AFCollabSnapshots, InsertCollabParams, InsertSnapshotParams, QueryCollabParams,
QueryObjectSnapshotParams, QuerySnapshotParams, RawData,
};
use tokio::sync::RwLock;
#[derive(Clone, Default)]
@ -63,4 +66,19 @@ impl CollabStorage for CollabMemoryStorageImpl {
.remove(object_id);
Ok(())
}
async fn create_snapshot(&self, _params: InsertSnapshotParams) -> Result<()> {
Ok(())
}
async fn get_snapshot_data(&self, _params: QuerySnapshotParams) -> Result<RawData> {
Ok(vec![])
}
async fn get_all_snapshots(
&self,
_params: QueryObjectSnapshotParams,
) -> Result<AFCollabSnapshots> {
Ok(AFCollabSnapshots(vec![]))
}
}

View file

@ -83,6 +83,17 @@ impl InsertCollabParams {
}
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct InsertSnapshotParams {
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
#[validate(custom = "validate_not_empty_payload")]
pub raw_data: Vec<u8>,
pub len: i32,
#[validate(custom = "validate_not_empty_str")]
pub workspace_id: String,
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct QueryCollabParams {
#[validate(custom = "validate_not_empty_str")]
@ -90,6 +101,26 @@ pub struct QueryCollabParams {
pub collab_type: CollabType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AFCollabSnapshot {
pub snapshot_id: i64,
pub object_id: String,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AFCollabSnapshots(pub Vec<AFCollabSnapshot>);
#[derive(Debug, Clone, Deserialize)]
pub struct QuerySnapshotParams {
pub snapshot_id: i64,
}
#[derive(Debug, Clone, Deserialize)]
pub struct QueryObjectSnapshotParams {
pub object_id: String,
}
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct AFWorkspace {
pub workspace_id: uuid::Uuid,

View file

@ -6,7 +6,10 @@ use sqlx::types::{chrono, Uuid};
use sqlx::{PgPool, Transaction};
use std::ops::DerefMut;
use std::str::FromStr;
use storage_entity::{InsertCollabParams, QueryCollabParams, RawData};
use storage_entity::{
AFCollabSnapshot, AFCollabSnapshots, InsertCollabParams, InsertSnapshotParams, QueryCollabParams,
QueryObjectSnapshotParams, QuerySnapshotParams, RawData,
};
use validator::Validate;
pub type Result<T, E = StorageError> = core::result::Result<T, E>;
@ -61,6 +64,13 @@ pub trait CollabStorage: Clone + Send + Sync + 'static {
///
/// * `Result<()>` - Returns `Ok(())` if the collaboration was deleted successfully, `Err` otherwise.
async fn delete_collab(&self, object_id: &str) -> Result<()>;
async fn create_snapshot(&self, params: InsertSnapshotParams) -> Result<()>;
async fn get_snapshot_data(&self, params: QuerySnapshotParams) -> Result<RawData>;
async fn get_all_snapshots(&self, params: QueryObjectSnapshotParams)
-> Result<AFCollabSnapshots>;
}
#[derive(Debug, Clone)]
@ -117,7 +127,7 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
.begin()
.await
.context("Failed to acquire a Postgres transaction to insert collab")?;
insert_or_update(&mut transaction, params).await?;
insert_af_collab(&mut transaction, params).await?;
transaction
.commit()
.await
@ -162,6 +172,62 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
.await?;
Ok(())
}
async fn create_snapshot(&self, params: InsertSnapshotParams) -> Result<()> {
params.validate()?;
let encrypt = 0;
let workspace_id = Uuid::from_str(&params.workspace_id)?;
sqlx::query!(
r#"
INSERT INTO af_collab_snapshot (oid, blob, len, encrypt, workspace_id)
VALUES ($1, $2, $3, $4, $5)
"#,
params.object_id,
params.raw_data,
params.len,
encrypt,
workspace_id,
)
.execute(&self.pg_pool)
.await?;
Ok(())
}
async fn get_snapshot_data(&self, params: QuerySnapshotParams) -> Result<RawData> {
let record = sqlx::query!(
r#"
SELECT blob
FROM af_collab_snapshot
WHERE sid = $1 AND deleted_at IS NULL;
"#,
params.snapshot_id,
)
.fetch_optional(&self.pg_pool)
.await?;
match record {
Some(record) => Ok(record.blob),
None => Err(StorageError::RecordNotFound),
}
}
async fn get_all_snapshots(
&self,
params: QueryObjectSnapshotParams,
) -> Result<AFCollabSnapshots> {
let snapshots: Vec<AFCollabSnapshot> = sqlx::query_as!(
AFCollabSnapshot,
r#"
SELECT sid as "snapshot_id", oid as "object_id", created_at
FROM af_collab_snapshot where oid = $1 AND deleted_at IS NULL;
"#,
params.object_id
)
.fetch_all(&self.pg_pool)
.await?;
Ok(AFCollabSnapshots(snapshots))
}
}
/// Inserts a new row into the `af_collab` table or updates an existing row if it matches the
@ -186,12 +252,10 @@ impl CollabStorage for CollabPostgresDBStorageImpl {
/// * There's a database operation failure.
/// * There's an attempt to insert a row with an existing `object_id` but a different `workspace_id`.
///
async fn insert_or_update(
async fn insert_af_collab(
tx: &mut Transaction<'_, sqlx::Postgres>,
params: InsertCollabParams,
) -> Result<()> {
params.validate()?;
let encrypt = 0;
let partition_key = params.collab_type.value();
let workspace_id = Uuid::from_str(&params.workspace_id)?;

View file

@ -1,141 +0,0 @@
use chrono::{DateTime, Utc};
use collab_define::CollabType;
use serde::{Deserialize, Serialize};
use std::ops::Deref;
use validator::{Validate, ValidationError};
pub type RawData = Vec<u8>;
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct InsertCollabParams {
pub uid: i64,
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
#[validate(custom = "validate_not_empty_payload")]
pub raw_data: Vec<u8>,
pub len: i32,
#[validate(custom = "validate_not_empty_str")]
pub workspace_id: String,
pub collab_type: CollabType,
}
impl InsertCollabParams {
pub fn new<T: ToString>(
uid: i64,
object_id: T,
collab_type: CollabType,
raw_data: Vec<u8>,
workspace_id: String,
) -> Self {
let len = raw_data.len() as i32;
let object_id = object_id.to_string();
Self {
uid,
object_id,
collab_type,
raw_data,
len,
workspace_id,
}
}
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct DeleteCollabParams {
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
}
fn validate_not_empty_str(s: &str) -> Result<(), ValidationError> {
if s.is_empty() {
return Err(ValidationError::new("should not be empty string"));
}
Ok(())
}
fn validate_not_empty_payload(payload: &[u8]) -> Result<(), ValidationError> {
if payload.is_empty() {
return Err(ValidationError::new("should not be empty payload"));
}
Ok(())
}
impl InsertCollabParams {
pub fn from_raw_data(
uid: i64,
object_id: &str,
collab_type: CollabType,
raw_data: Vec<u8>,
workspace_id: &str,
) -> Self {
let len = raw_data.len() as i32;
let object_id = object_id.to_string();
let workspace_id = workspace_id.to_string();
Self {
uid,
object_id,
collab_type,
raw_data,
len,
workspace_id,
}
}
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct QueryCollabParams {
#[validate(custom = "validate_not_empty_str")]
pub object_id: String,
pub collab_type: CollabType,
}
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct AFWorkspace {
pub workspace_id: uuid::Uuid,
pub database_storage_id: Option<sqlx::types::uuid::Uuid>,
pub owner_uid: Option<i64>,
pub created_at: Option<DateTime<Utc>>,
pub workspace_type: i32,
pub deleted_at: Option<DateTime<Utc>>,
pub workspace_name: Option<String>,
}
#[derive(Debug, sqlx::FromRow, Deserialize, Serialize)]
pub struct AFUserProfileView {
pub uid: Option<i64>,
pub uuid: Option<uuid::Uuid>,
pub email: Option<String>,
pub password: Option<String>,
pub name: Option<String>,
pub encryption_sign: Option<String>,
pub deleted_at: Option<DateTime<Utc>>,
pub updated_at: Option<DateTime<Utc>>,
pub created_at: Option<DateTime<Utc>>,
pub latest_workspace_id: Option<uuid::Uuid>,
}
#[derive(Debug, sqlx::FromRow, Deserialize, Serialize)]
pub struct AFWorkspaces(pub Vec<AFWorkspace>);
impl Deref for AFWorkspaces {
type Target = Vec<AFWorkspace>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<Vec<AFWorkspace>> for AFWorkspaces {
fn from(v: Vec<AFWorkspace>) -> Self {
Self(v)
}
}
impl AFWorkspaces {
pub fn get_latest(&self, profile: AFUserProfileView) -> Option<AFWorkspace> {
match profile.latest_workspace_id {
Some(ws_id) => self.0.iter().find(|ws| ws.workspace_id == ws_id).cloned(),
None => None,
}
}
}

View file

@ -1,4 +1,3 @@
pub mod collab;
// pub mod entities;
pub mod error;
pub mod workspace;

View file

@ -33,13 +33,15 @@ CREATE TABLE af_collab_member (
-- collab snapshot. It will be used to store the snapshots of the collab.
CREATE TABLE IF NOT EXISTS af_collab_snapshot (
sid BIGSERIAL PRIMARY KEY,
sid BIGSERIAL PRIMARY KEY,-- snapshot id
oid TEXT NOT NULL,
name TEXT DEFAULT '',
blob BYTEA NOT NULL,
len INTEGER NOT NULL,
edit_count BIGINT NOT NULL DEFAULT 0,
encrypt INTEGER DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
workspace_id UUID NOT NULL REFERENCES af_workspace(workspace_id) ON DELETE CASCADE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL
);
CREATE INDEX idx_af_collab_snapshot_oid ON af_collab_snapshot(oid);

View file

@ -7,18 +7,24 @@ use shared_entity::error::AppError;
use shared_entity::server_error::ErrorCode;
use storage::collab::{CollabPostgresDBStorageImpl, CollabStorage};
use storage::error::StorageError;
use storage_entity::{DeleteCollabParams, InsertCollabParams, QueryCollabParams, RawData};
use storage_entity::{
AFCollabSnapshots, DeleteCollabParams, InsertCollabParams, QueryCollabParams,
QueryObjectSnapshotParams, QuerySnapshotParams, RawData,
};
use tracing::instrument;
use validator::Validate;
pub fn collab_scope() -> Scope {
web::scope("/api").service(
web::resource("collab")
.route(web::post().to(create_collab_handler))
.route(web::get().to(retrieve_collab_handler))
.route(web::put().to(update_collab_handler))
.route(web::delete().to(delete_collab_handler)),
)
web::scope("/api/collab")
.service(
web::resource("/")
.route(web::post().to(create_collab_handler))
.route(web::get().to(retrieve_collab_handler))
.route(web::put().to(update_collab_handler))
.route(web::delete().to(delete_collab_handler)),
)
.service(web::resource("snapshot").route(web::get().to(retrieve_snapshot_data_handler)))
.service(web::resource("snapshots").route(web::get().to(retrieve_snapshots_handler)))
}
#[instrument(level = "debug", skip_all, err)]
@ -85,3 +91,33 @@ async fn delete_collab_handler(
.map_err(|err| AppError::new(ErrorCode::StorageError, err.to_string()))?;
Ok(Json(AppResponse::Ok()))
}
async fn retrieve_snapshot_data_handler(
payload: Json<QuerySnapshotParams>,
storage: Data<Storage<CollabPostgresDBStorageImpl>>,
) -> Result<Json<AppResponse<RawData>>> {
let data = storage
.collab_storage
.get_snapshot_data(payload.into_inner())
.await
.map_err(|err| match &err {
StorageError::RecordNotFound => AppError::new(ErrorCode::RecordNotFound, err.to_string()),
_ => AppError::new(ErrorCode::StorageError, err.to_string()),
})?;
Ok(Json(AppResponse::Ok().with_data(data)))
}
async fn retrieve_snapshots_handler(
payload: Json<QueryObjectSnapshotParams>,
storage: Data<Storage<CollabPostgresDBStorageImpl>>,
) -> Result<Json<AppResponse<AFCollabSnapshots>>> {
let data = storage
.collab_storage
.get_all_snapshots(payload.into_inner())
.await
.map_err(|err| match &err {
StorageError::RecordNotFound => AppError::new(ErrorCode::RecordNotFound, err.to_string()),
_ => AppError::new(ErrorCode::StorageError, err.to_string()),
})?;
Ok(Json(AppResponse::Ok().with_data(data)))
}

View file

@ -1 +1,13 @@
use client_api::Client;
mod storage_test;
pub(crate) async fn workspace_id_from_client(c: &Client) -> String {
c.workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id
.to_string()
}

View file

@ -1,5 +1,6 @@
use crate::client::constants::LOCALHOST_URL;
use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD};
use crate::collab::workspace_id_from_client;
use client_api::Client;
use collab_define::CollabType;
use shared_entity::server_error::ErrorCode;
@ -118,13 +119,3 @@ async fn fail_insert_collab_with_invalid_workspace_id_test() {
assert_eq!(error.code, ErrorCode::StorageError);
}
async fn workspace_id_from_client(c: &Client) -> String {
c.workspaces()
.await
.unwrap()
.first()
.unwrap()
.workspace_id
.to_string()
}