mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
feat: add a polling api for getting database row id updates
This commit is contained in:
parent
fa80a4c716
commit
c4d52859fb
8 changed files with 190 additions and 10 deletions
30
.sqlx/query-1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d.json
generated
Normal file
30
.sqlx/query-1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d.json
generated
Normal file
|
@ -0,0 +1,30 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT\n updated_at as updated_at,\n oid as row_id\n FROM af_collab_database_row\n WHERE workspace_id = $1\n AND oid = ANY($2)\n AND updated_at > $3\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "updated_at",
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "row_id",
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"TextArray",
|
||||
"Timestamptz"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d"
|
||||
}
|
|
@ -2,8 +2,10 @@ use crate::http::log_request_id;
|
|||
use crate::{blocking_brotli_compress, brotli_compress, Client};
|
||||
use app_error::AppError;
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Utc};
|
||||
use client_api_entity::workspace_dto::{
|
||||
AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, ListDatabaseRowDetailParam,
|
||||
AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, DatabaseRowUpdatedItem,
|
||||
ListDatabaseRowDetailParam, ListDatabaseRowUpdatedParam,
|
||||
};
|
||||
use client_api_entity::{
|
||||
BatchQueryCollabParams, BatchQueryCollabResult, CollabParams, CreateCollabParams,
|
||||
|
@ -190,6 +192,26 @@ impl Client {
|
|||
AppResponse::from_response(resp).await?.into_data()
|
||||
}
|
||||
|
||||
pub async fn list_database_row_ids_updated(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
database_id: &str,
|
||||
after: Option<DateTime<Utc>>,
|
||||
) -> Result<Vec<DatabaseRowUpdatedItem>, AppResponseError> {
|
||||
let url = format!(
|
||||
"{}/api/workspace/{}/database/{}/row/updated",
|
||||
self.base_url, workspace_id, database_id
|
||||
);
|
||||
let resp = self
|
||||
.http_client_with_auth(Method::GET, &url)
|
||||
.await?
|
||||
.query(&ListDatabaseRowUpdatedParam { after })
|
||||
.send()
|
||||
.await?;
|
||||
log_request_id(&resp);
|
||||
AppResponse::from_response(resp).await?.into_data()
|
||||
}
|
||||
|
||||
pub async fn list_database_row_details(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
|
|
|
@ -4,12 +4,13 @@ use database_entity::dto::{
|
|||
AFAccessLevel, AFCollabMember, AFPermission, AFSnapshotMeta, AFSnapshotMetas, CollabParams,
|
||||
QueryCollab, QueryCollabResult, RawData,
|
||||
};
|
||||
use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem;
|
||||
|
||||
use crate::collab::{partition_key_from_collab_type, SNAPSHOT_PER_HOUR};
|
||||
use crate::pg_row::AFSnapshotRow;
|
||||
use crate::pg_row::{AFCollabMemberAccessLevelRow, AFCollabRowMeta};
|
||||
use app_error::AppError;
|
||||
use chrono::{Duration, Utc};
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use futures_util::stream::BoxStream;
|
||||
|
||||
use sqlx::postgres::PgRow;
|
||||
|
@ -792,3 +793,29 @@ pub async fn select_workspace_database_oid<'a, E: Executor<'a, Database = Postgr
|
|||
.fetch_one(executor)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn select_last_updated_database_row_ids(
|
||||
pg_pool: &PgPool,
|
||||
workspace_id: &Uuid,
|
||||
row_ids: &[String],
|
||||
after: &DateTime<Utc>,
|
||||
) -> Result<Vec<DatabaseRowUpdatedItem>, sqlx::Error> {
|
||||
let updated_row_items = sqlx::query_as!(
|
||||
DatabaseRowUpdatedItem,
|
||||
r#"
|
||||
SELECT
|
||||
updated_at as updated_at,
|
||||
oid as row_id
|
||||
FROM af_collab_database_row
|
||||
WHERE workspace_id = $1
|
||||
AND oid = ANY($2)
|
||||
AND updated_at > $3
|
||||
"#,
|
||||
workspace_id,
|
||||
row_ids,
|
||||
after,
|
||||
)
|
||||
.fetch_all(pg_pool)
|
||||
.await?;
|
||||
Ok(updated_row_items)
|
||||
}
|
||||
|
|
|
@ -300,6 +300,17 @@ pub struct ListDatabaseRowDetailParam {
|
|||
pub ids: String,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Deserialize, Serialize)]
|
||||
pub struct ListDatabaseRowUpdatedParam {
|
||||
pub after: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Deserialize, Serialize)]
|
||||
pub struct DatabaseRowUpdatedItem {
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub row_id: String,
|
||||
}
|
||||
|
||||
impl ListDatabaseRowDetailParam {
|
||||
pub fn from(ids: &[&str]) -> Self {
|
||||
Self { ids: ids.join(",") }
|
||||
|
|
22
migrations/20241124212630_af_collab_updated_at.sql
Normal file
22
migrations/20241124212630_af_collab_updated_at.sql
Normal file
|
@ -0,0 +1,22 @@
|
|||
-- Add `updated_at` column to `af_collab` table
|
||||
ALTER TABLE public.af_collab
|
||||
ADD COLUMN updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;
|
||||
|
||||
-- Create or replace function to update `updated_at` column
|
||||
CREATE OR REPLACE FUNCTION update_updated_at_column()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
NEW.updated_at = CURRENT_TIMESTAMP;
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create trigger to update `updated_at` column
|
||||
CREATE TRIGGER set_updated_at
|
||||
BEFORE INSERT OR UPDATE ON public.af_collab
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION update_updated_at_column();
|
||||
|
||||
-- Create index on `updated_at` column
|
||||
CREATE INDEX idx_af_collab_updated_at
|
||||
ON public.af_collab (updated_at);
|
|
@ -5,6 +5,7 @@ use actix_web::{web, Scope};
|
|||
use actix_web::{HttpRequest, Result};
|
||||
use anyhow::{anyhow, Context};
|
||||
use bytes::BytesMut;
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_entity::CollabType;
|
||||
use futures_util::future::try_join_all;
|
||||
|
@ -260,6 +261,10 @@ pub fn workspace_scope() -> Scope {
|
|||
web::resource("/{workspace_id}/database/{database_id}/row")
|
||||
.route(web::get().to(list_database_row_id_handler)),
|
||||
)
|
||||
.service(
|
||||
web::resource("/{workspace_id}/database/{database_id}/row/updated")
|
||||
.route(web::get().to(list_database_row_id_updated_handler)),
|
||||
)
|
||||
.service(
|
||||
web::resource("/{workspace_id}/database/{database_id}/row/detail")
|
||||
.route(web::get().to(list_database_row_details_handler)),
|
||||
|
@ -1892,9 +1897,42 @@ async fn list_database_row_id_handler(
|
|||
.enforce_action(&uid, &workspace_id, Action::Read)
|
||||
.await?;
|
||||
|
||||
let db_rows =
|
||||
biz::collab::ops::list_database_row(&state.collab_access_control_storage, workspace_id, db_id)
|
||||
.await?;
|
||||
let db_rows = biz::collab::ops::list_database_row_ids(
|
||||
&state.collab_access_control_storage,
|
||||
&workspace_id,
|
||||
&db_id,
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(AppResponse::Ok().with_data(db_rows)))
|
||||
}
|
||||
|
||||
async fn list_database_row_id_updated_handler(
|
||||
user_uuid: UserUuid,
|
||||
path_param: web::Path<(String, String)>,
|
||||
state: Data<AppState>,
|
||||
param: web::Query<ListDatabaseRowUpdatedParam>,
|
||||
) -> Result<Json<AppResponse<Vec<DatabaseRowUpdatedItem>>>> {
|
||||
let (workspace_id, db_id) = path_param.into_inner();
|
||||
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
|
||||
|
||||
state
|
||||
.workspace_access_control
|
||||
.enforce_action(&uid, &workspace_id, Action::Read)
|
||||
.await?;
|
||||
|
||||
// Default to 1 hour ago
|
||||
let after: DateTime<Utc> = param
|
||||
.after
|
||||
.unwrap_or_else(|| Utc::now() - Duration::hours(1));
|
||||
|
||||
let db_rows = biz::collab::ops::list_database_row_ids_updated(
|
||||
&state.collab_access_control_storage,
|
||||
&state.pg_pool,
|
||||
&workspace_id,
|
||||
&db_id,
|
||||
&after,
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(AppResponse::Ok().with_data(db_rows)))
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,8 @@ use std::sync::Arc;
|
|||
|
||||
use app_error::AppError;
|
||||
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use collab::preclude::Collab;
|
||||
use collab_database::database::DatabaseBody;
|
||||
use collab_database::entity::FieldType;
|
||||
|
@ -15,6 +17,7 @@ use collab_entity::CollabType;
|
|||
use collab_entity::EncodedCollab;
|
||||
use collab_folder::SectionItem;
|
||||
use collab_folder::{CollabOrigin, Folder};
|
||||
use database::collab::select_last_updated_database_row_ids;
|
||||
use database::collab::select_workspace_database_oid;
|
||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||
use database::publish::select_published_view_ids_for_workspace;
|
||||
|
@ -24,6 +27,7 @@ use database_entity::dto::{QueryCollab, QueryCollabParams};
|
|||
use shared_entity::dto::workspace_dto::AFDatabase;
|
||||
use shared_entity::dto::workspace_dto::AFDatabaseRow;
|
||||
use shared_entity::dto::workspace_dto::AFDatabaseRowDetail;
|
||||
use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem;
|
||||
use shared_entity::dto::workspace_dto::FavoriteFolderView;
|
||||
use shared_entity::dto::workspace_dto::FolderViewMinimal;
|
||||
use shared_entity::dto::workspace_dto::RecentFolderView;
|
||||
|
@ -435,16 +439,16 @@ pub async fn list_database(
|
|||
Ok(af_databases)
|
||||
}
|
||||
|
||||
pub async fn list_database_row(
|
||||
pub async fn list_database_row_ids(
|
||||
collab_storage: &CollabAccessControlStorage,
|
||||
workspace_uuid_str: String,
|
||||
database_uuid_str: String,
|
||||
workspace_uuid_str: &str,
|
||||
database_uuid_str: &str,
|
||||
) -> Result<Vec<AFDatabaseRow>, AppError> {
|
||||
let db_collab = get_latest_collab(
|
||||
collab_storage,
|
||||
GetCollabOrigin::Server,
|
||||
&workspace_uuid_str,
|
||||
&database_uuid_str,
|
||||
workspace_uuid_str,
|
||||
database_uuid_str,
|
||||
CollabType::Database,
|
||||
)
|
||||
.await?;
|
||||
|
@ -479,6 +483,25 @@ pub async fn list_database_row(
|
|||
Ok(db_rows)
|
||||
}
|
||||
|
||||
pub async fn list_database_row_ids_updated(
|
||||
collab_storage: &CollabAccessControlStorage,
|
||||
pg_pool: &PgPool,
|
||||
workspace_uuid_str: &str,
|
||||
database_uuid_str: &str,
|
||||
after: &DateTime<Utc>,
|
||||
) -> Result<Vec<DatabaseRowUpdatedItem>, AppError> {
|
||||
let row_ids = list_database_row_ids(collab_storage, workspace_uuid_str, database_uuid_str)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| row.id)
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let workspace_uuid: Uuid = workspace_uuid_str.parse()?;
|
||||
let updated_row_ids =
|
||||
select_last_updated_database_row_ids(pg_pool, &workspace_uuid, &row_ids, after).await?;
|
||||
Ok(updated_row_ids)
|
||||
}
|
||||
|
||||
pub async fn list_database_row_details(
|
||||
collab_storage: &CollabAccessControlStorage,
|
||||
uid: i64,
|
||||
|
|
|
@ -24,6 +24,13 @@ async fn workspace_list_database() {
|
|||
.unwrap();
|
||||
assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids);
|
||||
}
|
||||
{
|
||||
let db_row_ids = c
|
||||
.list_database_row_ids_updated(&workspace_id, &todos_db.id, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids);
|
||||
}
|
||||
|
||||
{
|
||||
let db_row_ids = c
|
||||
|
|
Loading…
Add table
Reference in a new issue