chore: generate history using redis steam (#570)

* chore: combine test

* chore: add snapshot test

* chore: add test
This commit is contained in:
Nathan.fooo 2024-05-23 07:57:20 +08:00 committed by GitHub
parent 05e7f1cda7
commit 4eb1e6bceb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 794 additions and 415 deletions

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_snapshot_meta (oid, workspace_id, snapshot, snapshot_version, partition_key, created_at)\n VALUES ($1, $2, $3, $4, $5, $6)\n ",
"query": "\n INSERT INTO af_snapshot_meta (oid, workspace_id, snapshot, snapshot_version, partition_key, created_at)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
@ -15,5 +15,5 @@
},
"nullable": []
},
"hash": "0584173cd14ab1a975421dc4a58a42fb1e0ebc03278544928603ab2281ac5739"
"hash": "3cfb0a6d9a798f29422bc4bf4a52d3c86c3aae98c173b83c60eb57504a3d2c7c"
}

1
Cargo.lock generated
View file

@ -6341,6 +6341,7 @@ name = "tonic-proto"
version = "0.1.0"
dependencies = [
"prost",
"serde",
"tonic",
"tonic-build",
"tracing",

View file

@ -483,7 +483,7 @@ impl TestClient {
) -> String {
let object_id = Uuid::new_v4().to_string();
self
.create_and_edit_collab_with_data(object_id.clone(), workspace_id, collab_type, None)
.create_and_edit_collab_with_data(&object_id, workspace_id, collab_type, None)
.await;
object_id
}
@ -491,7 +491,7 @@ impl TestClient {
#[allow(unused_variables)]
pub async fn create_and_edit_collab_with_data(
&mut self,
object_id: String,
object_id: &str,
workspace_id: &str,
collab_type: CollabType,
encoded_collab_v1: Option<EncodedCollab>,
@ -501,14 +501,14 @@ impl TestClient {
let collab = match encoded_collab_v1 {
None => Arc::new(MutexCollab::new(Collab::new_with_origin(
origin.clone(),
&object_id,
object_id,
vec![],
false,
))),
Some(data) => Arc::new(MutexCollab::new(
Collab::new_with_source(
origin.clone(),
&object_id,
object_id,
DataSource::DocStateV1(data.doc_state.to_vec()),
vec![],
false,
@ -532,7 +532,7 @@ impl TestClient {
self
.api_client
.create_collab(CreateCollabParams {
object_id: object_id.clone(),
object_id: object_id.to_string(),
encoded_collab_v1,
collab_type: collab_type.clone(),
workspace_id: workspace_id.to_string(),
@ -542,10 +542,13 @@ impl TestClient {
#[cfg(feature = "collab-sync")]
{
let handler = self.ws_client.subscribe_collab(object_id.clone()).unwrap();
let handler = self
.ws_client
.subscribe_collab(object_id.to_string())
.unwrap();
let (sink, stream) = (handler.sink(), handler.stream());
let ws_connect_state = self.ws_client.subscribe_connect_state();
let object = SyncObject::new(&object_id, workspace_id, collab_type, &self.device_id);
let object = SyncObject::new(object_id, workspace_id, collab_type, &self.device_id);
let sync_plugin = SyncPlugin::new(
origin.clone(),
object,
@ -563,8 +566,8 @@ impl TestClient {
origin,
mutex_collab: collab,
};
self.collabs.insert(object_id.clone(), test_collab);
self.wait_object_sync_complete(&object_id).await.unwrap();
self.collabs.insert(object_id.to_string(), test_collab);
self.wait_object_sync_complete(object_id).await.unwrap();
}
pub async fn open_workspace_collab(&mut self, workspace_id: &str) {

View file

@ -0,0 +1,52 @@
use crate::http::log_request_id;
use crate::Client;
use collab_entity::CollabType;
use reqwest::Method;
use shared_entity::dto::history_dto::{RepeatedSnapshotMeta, SnapshotInfo};
use shared_entity::response::{AppResponse, AppResponseError};
impl Client {
pub async fn get_snapshots(
&self,
workspace_id: &str,
object_id: &str,
collab_type: CollabType,
) -> Result<RepeatedSnapshotMeta, AppResponseError> {
let collab_type = collab_type.value();
let url = format!(
"{}/api/history/{workspace_id}/{object_id}/{collab_type}",
self.base_url,
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<RepeatedSnapshotMeta>::from_response(resp)
.await?
.into_data()
}
pub async fn get_latest_history(
&self,
workspace_id: &str,
object_id: &str,
collab_type: CollabType,
) -> Result<SnapshotInfo, AppResponseError> {
let collab_type = collab_type.value();
let url = format!(
"{}/api/history/{workspace_id}/{object_id}/{collab_type}/latest",
self.base_url,
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<SnapshotInfo>::from_response(resp)
.await?
.into_data()
}
}

View file

@ -2,6 +2,7 @@ mod http;
mod http_ai;
mod http_blob;
mod http_collab;
mod http_history;
mod http_member;
pub use http::*;

View file

@ -1,6 +1,6 @@
use collab_entity::CollabType;
use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::str::FromStr;
@ -260,6 +260,25 @@ pub enum CollabControlEvent {
},
}
impl Display for CollabControlEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
CollabControlEvent::Open {
workspace_id: _,
object_id,
collab_type,
doc_state: _,
} => f.write_fmt(format_args!(
"Open collab: object_id:{}|collab_type:{:?}",
object_id, collab_type,
)),
CollabControlEvent::Close { object_id } => {
f.write_fmt(format_args!("Close collab: object_id:{}", object_id))
},
}
}
}
impl CollabControlEvent {
pub fn encode(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)

View file

@ -3,7 +3,8 @@ use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
use sqlx::{Executor, FromRow, PgPool, Postgres};
use std::ops::DerefMut;
use tonic_proto::history::{HistoryState, SnapshotInfo, SnapshotMeta};
use tonic_proto::history::{HistoryStatePb, SingleSnapshotInfoPb, SnapshotMetaPb};
use tracing::trace;
use uuid::Uuid;
#[allow(clippy::too_many_arguments)]
@ -15,16 +16,20 @@ pub async fn insert_history<'a>(
deps_snapshot_id: Option<String>,
collab_type: CollabType,
created_at: i64,
snapshots: Vec<SnapshotMeta>,
snapshots: Vec<SnapshotMetaPb>,
pool: PgPool,
) -> Result<(), sqlx::Error> {
let mut transaction = pool.begin().await?;
let partition_key = partition_key_from_collab_type(&collab_type);
let to_insert: Vec<SnapshotMeta> = snapshots
let to_insert: Vec<SnapshotMetaPb> = snapshots
.into_iter()
.filter(|s| s.created_at <= created_at)
.collect();
trace!(
"Inserting {} snapshots into af_snapshot_meta",
to_insert.len()
);
for snapshot in to_insert {
insert_snapshot_meta(
workspace_id,
@ -56,7 +61,7 @@ pub async fn insert_history<'a>(
async fn insert_snapshot_meta<'a, E: Executor<'a, Database = Postgres>>(
workspace_id: &Uuid,
oid: &str,
meta: SnapshotMeta,
meta: SnapshotMetaPb,
partition_key: i32,
executor: E,
) -> Result<(), sqlx::Error> {
@ -64,6 +69,7 @@ async fn insert_snapshot_meta<'a, E: Executor<'a, Database = Postgres>>(
r#"
INSERT INTO af_snapshot_meta (oid, workspace_id, snapshot, snapshot_version, partition_key, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT DO NOTHING
"#,
oid,
workspace_id,
@ -85,13 +91,13 @@ async fn insert_snapshot_meta<'a, E: Executor<'a, Database = Postgres>>(
/// - `pool`: The PostgreSQL connection pool.
///
/// # Returns
/// Returns a vector of `AFSnapshotMetaRow` struct instances containing the snapshot data.
/// Returns a vector of `AFSnapshotMetaPbRow` struct instances containing the snapshot data.
/// This vector is empty if no records match the criteria.
pub async fn get_snapshot_meta_list<'a>(
oid: &str,
collab_type: &CollabType,
pool: &PgPool,
) -> Result<Vec<AFSnapshotMetaRow>, sqlx::Error> {
) -> Result<Vec<AFSnapshotMetaPbRow>, sqlx::Error> {
let partition_key = partition_key_from_collab_type(collab_type);
let order_clause = "DESC";
let query = format!(
@ -99,7 +105,7 @@ pub async fn get_snapshot_meta_list<'a>(
order_clause
);
let rows = sqlx::query_as::<_, AFSnapshotMetaRow>(&query)
let rows = sqlx::query_as::<_, AFSnapshotMetaPbRow>(&query)
.bind(oid)
.bind(partition_key)
.fetch_all(pool)
@ -148,8 +154,8 @@ async fn insert_snapshot_state<'a, E: Executor<'a, Database = Postgres>>(
partition_key,
created_at,
)
.execute(executor)
.await?;
.execute(executor)
.await?;
Ok(())
}
@ -186,12 +192,12 @@ pub async fn get_latest_snapshot(
oid: &str,
collab_type: &CollabType,
pool: &PgPool,
) -> Result<Option<SnapshotInfo>, sqlx::Error> {
) -> Result<Option<SingleSnapshotInfoPb>, sqlx::Error> {
let mut transaction = pool.begin().await?;
let partition_key = partition_key_from_collab_type(collab_type);
// Attempt to fetch the latest snapshot metadata
let snapshot_meta = sqlx::query_as!(
AFSnapshotMetaRow,
AFSnapshotMetaPbRow,
r#"
SELECT oid, snapshot, snapshot_version, created_at
FROM af_snapshot_meta
@ -207,33 +213,37 @@ pub async fn get_latest_snapshot(
// Return None if no metadata found
let snapshot_meta = match snapshot_meta {
Some(meta) => meta,
Some(meta) => SnapshotMetaPb {
oid: meta.oid,
snapshot: meta.snapshot,
snapshot_version: meta.snapshot_version,
created_at: meta.created_at,
},
None => return Ok(None),
};
// Fetch the corresponding state using the metadata's created_at timestamp
let snapshot_state = get_latest_snapshot_state(
// Return None if no metadata found
let snapshot_state = match get_latest_snapshot_state(
oid,
snapshot_meta.created_at,
collab_type,
transaction.deref_mut(),
)
.await?;
// Return None if no metadata found
let snapshot_state = match snapshot_state {
.await?
{
Some(state) => state,
None => return Ok(None),
};
let history_state = HistoryState {
let history_state = HistoryStatePb {
object_id: snapshot_state.oid,
doc_state: snapshot_state.doc_state,
doc_state_version: snapshot_state.doc_state_version,
};
let snapshot_info = SnapshotInfo {
snapshot: snapshot_meta.snapshot,
let snapshot_info = SingleSnapshotInfoPb {
snapshot_meta: Some(snapshot_meta),
history_state: Some(history_state),
};
@ -241,7 +251,7 @@ pub async fn get_latest_snapshot(
}
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct AFSnapshotMetaRow {
pub struct AFSnapshotMetaPbRow {
pub oid: String,
pub snapshot: Vec<u8>,
pub snapshot_version: i32,

View file

@ -0,0 +1,32 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct SnapshotMeta {
pub oid: String,
/// Using yrs::Snapshot to deserialize the snapshot
pub snapshot: Vec<u8>,
/// Specifies the version of the snapshot
pub snapshot_version: i32,
pub created_at: i64,
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct RepeatedSnapshotMeta {
pub items: Vec<SnapshotMeta>,
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct HistoryState {
pub object_id: String,
pub doc_state: Vec<u8>,
pub doc_state_version: i32,
}
/// [HistoryState] contains all the necessary information that can be used to restore the full state
/// collab. This collab object can restore given [SnapshotMeta]. The snapshot represents the state of
/// the collab at a certain point in time.
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct SnapshotInfo {
pub history: HistoryState,
pub snapshot_meta: SnapshotMeta,
}

View file

@ -1,3 +1,4 @@
pub mod ai_dto;
pub mod auth_dto;
pub mod history_dto;
pub mod workspace_dto;

View file

@ -9,6 +9,7 @@ edition = "2021"
tonic.workspace = true
prost.workspace = true
tracing.workspace = true
serde = { version = "1.0", features = ["derive"] }
[build-dependencies]
tonic-build = "0.11"

View file

@ -1,4 +1,15 @@
use tonic_build::configure;
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/history.proto")?;
configure()
// .type_attribute(
// "history.RepeatedSnapshotMeta",
// "#[derive(serde::Serialize, serde::Deserialize)]",
// )
// .type_attribute(
// "history.SnapshotMeta",
// "#[derive(serde::Serialize, serde::Deserialize)]",
// )
.compile(&["proto/history.proto"], &["proto"])?;
Ok(())
}

View file

@ -6,19 +6,20 @@ import "google/protobuf/wrappers.proto";
package history;
service History {
rpc GetSnapshots (SnapshotRequest) returns (RepeatedSnapshotMeta);
rpc GetInMemoryHistory (SnapshotRequest) returns (HistoryState);
rpc GetInDiskHistory (SnapshotMeta) returns (HistoryState);
rpc GetSnapshots (SnapshotRequestPb) returns (RepeatedSnapshotMetaPb);
rpc GetNumOfSnapshot (SnapshotRequestPb) returns (RepeatedSnapshotInfoPb);
rpc GetLatestSnapshot (SnapshotRequestPb) returns (SingleSnapshotInfoPb);
}
message SnapshotRequest {
message SnapshotRequestPb {
string workspace_id = 1;
string object_id = 2;
int32 collab_type = 3;
int32 num_snapshot = 4;
}
// SnapshotMeta represents metadata for a snapshot.
message SnapshotMeta {
// SnapshotMetaPB represents metadata for a snapshot.
message SnapshotMetaPb {
// The unique identifier for the snapshot.
string oid = 1;
// The raw binary snapshot data.
@ -31,26 +32,31 @@ message SnapshotMeta {
int64 created_at = 4;
}
// A container for repeated instances of SnapshotMeta.
message RepeatedSnapshotMeta {
repeated SnapshotMeta items = 1; // List of SnapshotMeta items
// A container for repeated instances of SnapshotMetaPB.
message RepeatedSnapshotMetaPb {
repeated SnapshotMetaPb items = 1; // List of SnapshotMetaPB items
}
// SnapshotState represents the state of a snapshot, including optional dependency IDs.
message SnapshotState {
// SnapshotStatePB represents the state of a snapshot, including optional dependency IDs.
message SnapshotStatePb {
string oid = 1; // Unique identifier for the snapshot
bytes doc_state = 2; // The document state as raw binary data
int32 doc_state_version = 3; // Version of the document state format
google.protobuf.StringValue deps_snapshot_id = 4; // Optional dependency snapshot ID
}
message SnapshotInfo {
HistoryState history_state = 1;
bytes snapshot = 2; // Additional field specific to SnapshotInfo representing the snapshot data
message SingleSnapshotInfoPb {
HistoryStatePb history_state = 1;
SnapshotMetaPb snapshot_meta = 2;
}
message HistoryState {
message RepeatedSnapshotInfoPb {
HistoryStatePb history_state = 1;
repeated SnapshotMetaPb snapshots = 2;
}
message HistoryStatePb {
string object_id = 1; // Unique identifier for the object
bytes doc_state = 2; // The document state as raw binary data
int32 doc_state_version = 3; // Version of the document state format, with decoding instructions based on version

View file

@ -3,7 +3,7 @@ use crate::biz::history::get_snapshots;
use collab_entity::CollabType;
use tonic::{Request, Response, Status};
use tonic_proto::history::history_server::History;
use tonic_proto::history::{HistoryState, SnapshotMeta, SnapshotRequest};
use tonic_proto::history::{RepeatedSnapshotInfoPb, SingleSnapshotInfoPb, SnapshotRequestPb};
pub struct HistoryImpl {
pub state: AppState,
@ -26,31 +26,32 @@ pub struct HistoryImpl {
impl History for HistoryImpl {
async fn get_snapshots(
&self,
request: Request<SnapshotRequest>,
) -> Result<Response<tonic_proto::history::RepeatedSnapshotMeta>, Status> {
request: Request<SnapshotRequestPb>,
) -> Result<Response<tonic_proto::history::RepeatedSnapshotMetaPb>, Status> {
let request = request.into_inner();
let collab_type = CollabType::from(request.collab_type);
let data = get_snapshots(&request.object_id, &collab_type, &self.state.pg_pool).await?;
Ok(Response::new(data))
}
async fn get_in_memory_history(
async fn get_num_of_snapshot(
&self,
request: Request<SnapshotRequest>,
) -> Result<Response<HistoryState>, Status> {
_request: Request<SnapshotRequestPb>,
) -> Result<Response<RepeatedSnapshotInfoPb>, Status> {
todo!()
}
async fn get_latest_snapshot(
&self,
request: Request<SnapshotRequestPb>,
) -> Result<Response<SingleSnapshotInfoPb>, Status> {
let request = request.into_inner();
let resp = self
.state
.open_collab_manager
.get_in_memory_history(request)
.get_latest_snapshot(request, &self.state.pg_pool)
.await?;
Ok(Response::new(resp))
}
async fn get_in_disk_history(
&self,
_request: Request<SnapshotMeta>,
) -> Result<Response<HistoryState>, Status> {
todo!()
}
}

View file

@ -8,7 +8,8 @@ use collab_entity::CollabType;
use database::history::ops::get_snapshot_meta_list;
use serde_json::Value;
use sqlx::PgPool;
use tonic_proto::history::{RepeatedSnapshotMeta, SnapshotMeta};
use tonic_proto::history::{RepeatedSnapshotMetaPb, SnapshotMetaPb};
use tracing::trace;
pub struct CollabHistory {
object_id: String,
@ -63,7 +64,7 @@ impl CollabHistory {
if snapshots.is_empty() {
return Ok(None);
}
trace!("[History] prepare to save snapshots to disk");
let (doc_state, state_vector) = {
let lock_guard = mutex_collab.lock();
let txn = lock_guard.try_transaction()?;
@ -131,14 +132,14 @@ pub async fn get_snapshots(
object_id: &str,
collab_type: &CollabType,
pg_pool: &PgPool,
) -> Result<RepeatedSnapshotMeta, HistoryError> {
) -> Result<RepeatedSnapshotMetaPb, HistoryError> {
let metas = get_snapshot_meta_list(object_id, collab_type, pg_pool)
.await
.unwrap();
let metas = metas
.into_iter()
.map(|meta| SnapshotMeta {
.map(|meta| SnapshotMetaPb {
oid: meta.oid,
snapshot: meta.snapshot,
snapshot_version: meta.snapshot_version,
@ -146,5 +147,5 @@ pub async fn get_snapshots(
})
.collect::<Vec<_>>();
Ok(RepeatedSnapshotMeta { items: metas })
Ok(RepeatedSnapshotMetaPb { items: metas })
}

View file

@ -3,7 +3,8 @@ use crate::error::HistoryError;
use collab_entity::CollabType;
use database::history::ops::insert_history;
use sqlx::PgPool;
use tonic_proto::history::SnapshotMeta;
use tonic_proto::history::SnapshotMetaPb;
use tracing::trace;
use uuid::Uuid;
pub struct HistoryPersistence {
@ -24,9 +25,15 @@ impl HistoryPersistence {
snapshots: Vec<CollabSnapshot>,
collab_type: CollabType,
) -> Result<(), HistoryError> {
trace!(
"[History] save {}:{}: {} snapshots and history to disk",
state.object_id,
collab_type,
snapshots.len(),
);
let snapshots = snapshots
.into_iter()
.map(SnapshotMeta::from)
.map(SnapshotMetaPb::from)
.collect::<Vec<_>>();
insert_history(

View file

@ -9,8 +9,8 @@ use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tonic_proto::history::SnapshotMeta;
use tracing::{error, warn};
use tonic_proto::history::SnapshotMetaPb;
use tracing::{error, trace, warn};
#[derive(Clone)]
pub struct SnapshotGenerator {
@ -43,7 +43,8 @@ impl SnapshotGenerator {
// keep it simple for now. we just compare the update count to determine if we need to generate a snapshot.
// in the future, we can use a more sophisticated algorithm to determine when to generate a snapshot.
if prev_apply_update_count + 1 >= gen_snapshot_threshold(&self.collab_type) {
let threshold = gen_snapshot_threshold(&self.collab_type);
if prev_apply_update_count + 1 >= threshold {
let pending_snapshots = self.pending_snapshots.clone();
let mutex_collab = self.mutex_collab.clone();
let object_id = self.object_id.clone();
@ -79,7 +80,7 @@ fn gen_snapshot_threshold(collab_type: &CollabType) -> u32 {
CollabType::UserAwareness => 50,
CollabType::Unknown => {
if cfg!(debug_assertions) {
10
5
} else {
50
}
@ -96,10 +97,12 @@ async fn attempt_gen_snapshot(
max_retries: usize,
delay: Duration,
) {
trace!("[History] attempting to generate snapshot");
let mut retries = 0;
while retries < max_retries {
match gen_snapshot(collab, object_id) {
Ok(snapshot) => {
trace!("[History] did generate snapshot for {}", snapshot.object_id);
pending_snapshots.write().push(snapshot);
return;
},
@ -117,6 +120,7 @@ async fn attempt_gen_snapshot(
warn!("Exceeded maximum retry attempts for snapshot generation");
}
#[inline]
pub fn gen_snapshot(
mutex_collab: &MutexCollab,
object_id: &str,
@ -202,7 +206,7 @@ impl CollabSnapshot {
}
}
impl From<CollabSnapshot> for SnapshotMeta {
impl From<CollabSnapshot> for SnapshotMetaPb {
fn from(snapshot: CollabSnapshot) -> Self {
let snapshot_data = snapshot.encode_v1();
Self {

View file

@ -9,11 +9,12 @@ use dashmap::mapref::entry::Entry;
use crate::config::StreamSetting;
use dashmap::DashMap;
use database::history::ops::get_latest_snapshot;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tonic_proto::history::{HistoryState, SnapshotRequest};
use tonic_proto::history::{HistoryStatePb, SingleSnapshotInfoPb, SnapshotRequestPb};
use tracing::{error, trace};
use uuid::Uuid;
@ -41,13 +42,25 @@ impl OpenCollabManager {
pub async fn get_in_memory_history(
&self,
req: SnapshotRequest,
) -> Result<HistoryState, HistoryError> {
req: SnapshotRequestPb,
) -> Result<HistoryStatePb, HistoryError> {
match self.handles.get(&req.object_id) {
None => Err(HistoryError::RecordNotFound(req.object_id)),
Some(handle) => handle.history_state().await,
}
}
pub async fn get_latest_snapshot(
&self,
req: SnapshotRequestPb,
pg_pool: &PgPool,
) -> Result<SingleSnapshotInfoPb, HistoryError> {
let collab_type = CollabType::from(req.collab_type);
match get_latest_snapshot(&req.object_id, &collab_type, pg_pool).await {
Ok(Some(pb)) => Ok(pb),
_ => Err(HistoryError::RecordNotFound(req.object_id)),
}
}
}
async fn spawn_control_group(
@ -104,7 +117,7 @@ async fn handle_control_event(
handles: &Arc<DashMap<String, Arc<OpenCollabHandle>>>,
pg_pool: &PgPool,
) {
trace!("Received control event: {:?}", event);
trace!("[History] received control event: {}", event);
match event {
CollabControlEvent::Open {
workspace_id,
@ -114,7 +127,7 @@ async fn handle_control_event(
} => match handles.entry(object_id.clone()) {
Entry::Occupied(_) => {},
Entry::Vacant(entry) => {
trace!("Opening collab: {}", object_id);
trace!("[History] create collab: {}", object_id);
match init_collab_handle(
redis_stream,
pg_pool,
@ -136,9 +149,9 @@ async fn handle_control_event(
},
},
CollabControlEvent::Close { object_id } => {
trace!("Close collab: {}", object_id);
trace!("[History] close collab: {}", object_id);
if let Some(handle) = handles.get(&object_id) {
if let Err(err) = handle.gen_history().await {
if let Err(err) = handle.generate_history().await {
error!(
"Failed to generate history when receiving close event: {:?}",
err

View file

@ -13,8 +13,8 @@ use parking_lot::MutexGuard;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::time::interval;
use tonic_proto::history::HistoryState;
use tracing::error;
use tonic_proto::history::HistoryStatePb;
use tracing::{error, trace};
const CONSUMER_NAME: &str = "open_collab_handle";
pub struct OpenCollabHandle {
@ -38,6 +38,7 @@ impl OpenCollabHandle {
history_persistence: Option<Arc<HistoryPersistence>>,
) -> Result<Self, HistoryError> {
let mutex_collab = {
// Must set skip_gc = true to avoid the garbage collection of the collab.
let mut collab = Collab::new_with_source(
CollabOrigin::Empty,
object_id,
@ -79,21 +80,21 @@ impl OpenCollabHandle {
})
}
pub async fn history_state(&self) -> Result<HistoryState, HistoryError> {
pub async fn history_state(&self) -> Result<HistoryStatePb, HistoryError> {
let lock_guard = self
.mutex_collab
.try_lock()
.ok_or(HistoryError::TryLockFail)?;
let encode_collab =
lock_guard.encode_collab_v1(|collab| self.collab_type.validate_require_data(collab))?;
Ok(HistoryState {
Ok(HistoryStatePb {
object_id: self.object_id.clone(),
doc_state: encode_collab.doc_state.to_vec(),
doc_state_version: 1,
})
}
pub async fn gen_history(&self) -> Result<(), HistoryError> {
pub async fn generate_history(&self) -> Result<(), HistoryError> {
if let Some(history_persistence) = &self.history_persistence {
save_history(self.history.clone(), history_persistence.clone()).await;
}
@ -126,7 +127,7 @@ async fn spawn_recv_update(
None => return Ok(()),
};
let interval_duration = Duration::from_secs(2);
let interval_duration = Duration::from_secs(5);
let object_id = object_id.to_string();
let collab_type = collab_type.clone();
@ -148,14 +149,14 @@ async fn spawn_recv_update(
{
// 2.Clear the stale messages if failed to process them.
if let Err(err) = update_stream.clear().await {
error!("Failed to clear stale update messages: {:?}", err);
error!("[History]: fail to clear stale update messages: {:?}", err);
}
return Err(HistoryError::ApplyStaleMessage(err.to_string()));
}
// 3.Acknowledge the stale messages.
if let Err(err) = update_stream.ack_message_ids(message_ids).await {
error!("Failed to ack stale messages: {:?}", err);
error!("[History ] fail to ack stale messages: {:?}", err);
}
}
@ -172,6 +173,11 @@ async fn spawn_recv_update(
.consumer_messages(CONSUMER_NAME, ReadOption::Undelivered)
.await
{
if messages.is_empty() {
continue;
}
trace!("[History] received {} update messages", messages.len());
if let Err(e) = process_messages(
&mut update_stream,
messages,
@ -240,7 +246,8 @@ fn apply_updates(
fn spawn_save_history(history: Weak<CollabHistory>, history_persistence: Weak<HistoryPersistence>) {
tokio::spawn(async move {
let mut interval = if cfg!(debug_assertions) {
interval(Duration::from_secs(60))
// In debug mode, save the history every 10 seconds.
interval(Duration::from_secs(10))
} else {
interval(Duration::from_secs(60 * 60))
};

View file

@ -1,7 +1,7 @@
use crate::edit_test::mock::mock_test_data;
use crate::util::{check_doc_state_json, redis_stream, run_test_server};
use collab_entity::CollabType;
use tonic_proto::history::SnapshotRequest;
use tonic_proto::history::SnapshotRequestPb;
#[tokio::test]
async fn apply_update_stream_updates_test() {
@ -39,10 +39,11 @@ async fn apply_update_stream_updates_test() {
.unwrap();
}
let request = SnapshotRequest {
let request = SnapshotRequestPb {
workspace_id: workspace_id.to_string(),
object_id: object_id.to_string(),
collab_type: CollabType::Unknown.value(),
num_snapshot: 1,
};
check_doc_state_json(&object_id, 60, mock.expected_json.clone(), move || {
@ -50,9 +51,9 @@ async fn apply_update_stream_updates_test() {
let cloned_request = request.clone();
Box::pin(async move {
cloned_client
.get_in_memory_history(cloned_request)
.get_latest_snapshot(cloned_request)
.await
.map(|r| r.into_inner())
.map(|r| r.into_inner().history_state.unwrap())
})
})
.await
@ -101,7 +102,7 @@ async fn apply_update_stream_updates_test() {
// update_group.insert_message(update_event).await.unwrap();
// }
//
// let request = SnapshotRequest {
// let request = SnapshotRequestPb {
// workspace_id: workspace_id.to_string(),
// object_id: object_id.to_string(),
// collab_type: CollabType::Unknown.value(),

View file

@ -1,3 +1,3 @@
mod edit_test;
// mod edit_test;
mod stream_test;
mod util;

View file

@ -21,7 +21,7 @@ use tokio::time::timeout;
use tonic::Status;
use tonic_proto::history::history_client::HistoryClient;
use tonic_proto::history::HistoryState;
use tonic_proto::history::HistoryStatePb;
pub async fn redis_client() -> redis::Client {
let redis_uri = "redis://localhost:6379";
@ -128,7 +128,7 @@ pub async fn check_doc_state_json<'a, F>(
client_action: F,
) -> Result<()>
where
F: Fn() -> BoxFuture<'a, Result<HistoryState, Status>> + Send + Sync + 'static,
F: Fn() -> BoxFuture<'a, Result<HistoryStatePb, Status>> + Send + Sync + 'static,
{
let duration = Duration::from_secs(timeout_secs);
let check_interval = Duration::from_secs(2);

110
src/api/history.rs Normal file
View file

@ -0,0 +1,110 @@
use crate::state::AppState;
use actix_web::web::Data;
use actix_web::{web, Scope};
use anyhow::anyhow;
use app_error::AppError;
use shared_entity::dto::history_dto::{
HistoryState, RepeatedSnapshotMeta, SnapshotInfo, SnapshotMeta,
};
use shared_entity::response::{AppResponse, JsonAppResponse};
use tonic_proto::history::SnapshotRequestPb;
pub fn history_scope() -> Scope {
web::scope("/api/history/{workspace_id}")
.service(web::resource("/{object_id}/{collab_type}").route(web::get().to(get_snapshot_handler)))
.service(
web::resource("/{object_id}/{collab_type}/latest")
.route(web::get().to(get_latest_history_handler)),
)
}
async fn get_snapshot_handler(
path: web::Path<(String, String, i32)>,
state: Data<AppState>,
) -> actix_web::Result<JsonAppResponse<RepeatedSnapshotMeta>> {
let (workspace_id, object_id, collab_type) = path.into_inner();
let request = SnapshotRequestPb {
workspace_id,
object_id,
collab_type,
num_snapshot: 1,
};
let items = state
.grpc_history_client
.lock()
.await
.get_snapshots(request)
.await
.map_err(|err| AppError::Internal(anyhow!(err.to_string())))?
.into_inner()
.items
.into_iter()
.map(|item| SnapshotMeta {
oid: item.oid,
snapshot: item.snapshot,
snapshot_version: item.snapshot_version,
created_at: item.created_at,
})
.collect::<Vec<_>>();
Ok(
AppResponse::Ok()
.with_data(RepeatedSnapshotMeta { items })
.into(),
)
}
async fn get_latest_history_handler(
path: web::Path<(String, String, i32)>,
state: Data<AppState>,
) -> actix_web::Result<JsonAppResponse<SnapshotInfo>> {
let (workspace_id, object_id, collab_type) = path.into_inner();
let request = SnapshotRequestPb {
workspace_id,
object_id,
collab_type,
num_snapshot: 1,
};
let pb = state
.grpc_history_client
.lock()
.await
.get_latest_snapshot(request)
.await
.map_err(|err| AppError::Internal(anyhow!(err.to_string())))?
.into_inner();
let pb_history_state = pb
.history_state
.ok_or_else(|| AppError::Internal(anyhow!("No history state found")))?;
let pb_snapshot_meta = pb
.snapshot_meta
.ok_or_else(|| AppError::Internal(anyhow!("No snapshot meta found")))?;
let history_state = HistoryState {
object_id: pb_history_state.object_id,
doc_state: pb_history_state.doc_state,
doc_state_version: pb_history_state.doc_state_version,
};
let snapshot_meta = SnapshotMeta {
oid: pb_snapshot_meta.oid,
snapshot: pb_snapshot_meta.snapshot,
snapshot_version: pb_snapshot_meta.snapshot_version,
created_at: pb_snapshot_meta.created_at,
};
Ok(
AppResponse::Ok()
.with_data(SnapshotInfo {
history: history_state,
snapshot_meta,
})
.into(),
)
}

View file

@ -1,6 +1,8 @@
pub mod ai;
pub mod chat;
pub mod file_storage;
pub mod history;
pub mod metrics;
pub mod user;
pub mod util;

View file

@ -9,6 +9,7 @@ use access_control::access::{enable_access_control, AccessControl};
use crate::api::ai::ai_tool_scope;
use crate::api::chat::chat_scope;
use crate::api::history::history_scope;
use crate::biz::collab::access_control::CollabMiddlewareAccessControl;
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl;
@ -45,7 +46,9 @@ use sqlx::{postgres::PgPoolOptions, PgPool};
use std::net::TcpListener;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tonic_proto::history::history_client::HistoryClient;
use tracing::{info, warn};
use workspace_access::WorkspaceAccessControlImpl;
@ -140,6 +143,7 @@ pub async fn run_actix_server(
.service(ws_scope())
.service(file_storage_scope())
.service(chat_scope())
.service(history_scope())
.service(ai_tool_scope())
.service(metrics_scope())
.app_data(Data::new(state.metrics.registry.clone()))
@ -241,11 +245,13 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
metrics.collab_metrics.clone(),
));
#[cfg(feature = "history")]
let grpc_history_client =
tonic_proto::history::history_client::HistoryClient::connect(config.grpc_history.addrs.clone())
.await?;
info!("Connecting to history server");
let channel = tonic::transport::Channel::from_shared(config.grpc_history.addrs.clone())?
.keep_alive_timeout(Duration::from_secs(20))
.keep_alive_while_idle(true)
.connect_lazy();
let grpc_history_client = Arc::new(Mutex::new(HistoryClient::new(channel)));
let mailer = Mailer::new(
config.mailer.smtp_username.clone(),
config.mailer.smtp_password.expose_secret().clone(),
@ -277,7 +283,6 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
gotrue_admin,
mailer,
ai_client: appflowy_ai_client,
#[cfg(feature = "history")]
grpc_history_client,
realtime_shared_state,
})

View file

@ -4,7 +4,7 @@ use appflowy_ai_client::client::AppFlowyAIClient;
use dashmap::DashMap;
use secrecy::{ExposeSecret, Secret};
use sqlx::PgPool;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};
use tokio_stream::StreamExt;
use uuid::Uuid;
@ -21,6 +21,7 @@ use database::file::bucket_s3_impl::S3BucketStorage;
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
use gotrue::grant::{Grant, PasswordGrant};
use snowflake::Snowflake;
use tonic_proto::history::history_client::HistoryClient;
use workspace_access::WorkspaceAccessControlImpl;
use crate::api::metrics::RequestMetrics;
@ -29,7 +30,6 @@ use crate::config::config::Config;
use crate::mailer::Mailer;
pub type RedisConnectionManager = redis::aio::ConnectionManager;
#[derive(Clone)]
pub struct AppState {
pub pg_pool: PgPool,
@ -50,9 +50,7 @@ pub struct AppState {
pub mailer: Mailer,
pub ai_client: AppFlowyAIClient,
pub realtime_shared_state: RealtimeSharedState,
#[cfg(feature = "history")]
pub grpc_history_client:
tonic_proto::history::history_client::HistoryClient<tonic::transport::Channel>,
pub grpc_history_client: Arc<Mutex<HistoryClient<tonic::transport::Channel>>>,
}
impl AppState {

View file

@ -1,132 +0,0 @@
use app_error::ErrorCode;
use client_api_test::TestClient;
use collab::core::transaction::DocTransactionExtension;
use collab::preclude::Doc;
use collab_entity::CollabType;
use database_entity::dto::CreateCollabParams;
use workspace_template::document::get_started::GetStartedDocumentTemplate;
use workspace_template::WorkspaceTemplateBuilder;
#[tokio::test]
async fn insert_empty_data_test() {
let test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = uuid::Uuid::new_v4().to_string();
// test all collab type
for collab_type in [
CollabType::Folder,
CollabType::Document,
CollabType::UserAwareness,
CollabType::WorkspaceDatabase,
CollabType::Database,
CollabType::DatabaseRow,
] {
let params = CreateCollabParams {
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
encoded_collab_v1: vec![],
collab_type,
};
let error = test_client
.api_client
.create_collab(params)
.await
.unwrap_err();
assert_eq!(error.code, ErrorCode::NoRequiredData);
}
}
#[tokio::test]
async fn insert_invalid_data_test() {
let test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = uuid::Uuid::new_v4().to_string();
let doc = Doc::new();
let encoded_collab_v1 = doc.get_encoded_collab_v1().encode_to_bytes().unwrap();
for collab_type in [
CollabType::Folder,
CollabType::Document,
CollabType::UserAwareness,
CollabType::WorkspaceDatabase,
CollabType::Database,
CollabType::DatabaseRow,
] {
let params = CreateCollabParams {
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
encoded_collab_v1: encoded_collab_v1.clone(),
collab_type: collab_type.clone(),
};
let error = test_client
.api_client
.create_collab(params)
.await
.unwrap_err();
assert_eq!(
error.code,
ErrorCode::NoRequiredData,
"collab_type: {:?}",
collab_type
);
}
}
// #[tokio::test]
// async fn load_document_test() {
// let test_client = TestClient::new_user().await;
// let workspace_id = test_client.workspace_id().await;
// let object_id = uuid::Uuid::new_v4().to_string();
//
// for file in [include_str!("../test_asset/document/empty_lines.json")] {
// let document_data: DocumentData = serde_json::from_str(file).unwrap();
// let template = DocumentTemplate::from_data(document_data)
// .create(object_id.clone())
// .await
// .unwrap();
//
// let data = template.object_data.encode_to_bytes().unwrap();
// let params = CreateCollabParams {
// workspace_id: workspace_id.clone(),
// object_id: object_id.clone(),
// encoded_collab_v1: data,
// collab_type: template.object_type,
// };
// test_client.api_client.create_collab(params).await.unwrap();
// }
// }
#[tokio::test]
async fn insert_folder_data_success_test() {
let test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = uuid::Uuid::new_v4().to_string();
let uid = test_client.uid().await;
let templates = WorkspaceTemplateBuilder::new(uid, &workspace_id)
.with_templates(vec![GetStartedDocumentTemplate])
.build()
.await
.unwrap();
assert_eq!(templates.len(), 2);
for (index, template) in templates.into_iter().enumerate() {
if index == 0 {
assert_eq!(template.object_type, CollabType::Document);
}
if index == 1 {
assert_eq!(template.object_type, CollabType::Folder);
}
let data = template.object_data.encode_to_bytes().unwrap();
let params = CreateCollabParams {
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
encoded_collab_v1: data,
collab_type: template.object_type,
};
test_client.api_client.create_collab(params).await.unwrap();
}
}

View file

@ -1,11 +1,9 @@
mod awareness_test;
mod collab_curd_test;
mod data_write_test;
mod edit_permission;
mod member_crud;
mod missing_update_test;
mod multi_devices_edit;
mod pending_write_test;
mod permission_test;
mod single_device_edit;
mod storage_test;
pub mod util;

View file

@ -1,161 +0,0 @@
use crate::collab::util::{generate_random_bytes, redis_connection_manager};
use crate::sql_test::util::{setup_db, test_create_user};
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::queue::StorageQueue;
use appflowy_collaborate::collab::WritePriority;
use client_api_test::setup_log;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use database_entity::dto::{CollabParams, QueryCollab};
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
#[sqlx::test(migrations = false)]
async fn simulate_small_data_set_write(pool: PgPool) {
// prepare test prerequisites
setup_db(&pool).await.unwrap();
setup_log();
let conn = redis_connection_manager().await;
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
let collab_cache = CollabCache::new(conn.clone(), pool);
let queue_name = uuid::Uuid::new_v4().to_string();
let storage_queue = StorageQueue::new(collab_cache.clone(), conn, &queue_name);
let queries = Arc::new(Mutex::new(Vec::new()));
for i in 0..10 {
// sleep random seconds less than 2 seconds. because the runtime is single-threaded,
// we need sleep a little time to let the runtime switch to other tasks.
sleep(Duration::from_millis(i % 2)).await;
let cloned_storage_queue = storage_queue.clone();
let cloned_queries = queries.clone();
let cloned_user = user.clone();
let encode_collab = EncodedCollab::new_v1(
generate_random_bytes(1024),
generate_random_bytes(1024 * 1024),
);
let params = CollabParams {
object_id: format!("object_id_{}", i),
collab_type: CollabType::Unknown,
encoded_collab_v1: encode_collab.encode_to_bytes().unwrap(),
};
cloned_storage_queue
.push(
&cloned_user.workspace_id,
&cloned_user.uid,
&params,
WritePriority::Low,
)
.await
.unwrap();
cloned_queries.lock().await.push((params, encode_collab));
}
// Allow some time for processing
sleep(Duration::from_secs(30)).await;
// Check that all items are processed correctly
for (params, original_encode_collab) in queries.lock().await.iter() {
let query = QueryCollab {
object_id: params.object_id.clone(),
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.await
.unwrap();
assert_eq!(
encode_collab_from_disk.doc_state.len(),
original_encode_collab.doc_state.len(),
"doc_state length mismatch"
);
assert_eq!(
encode_collab_from_disk.doc_state,
original_encode_collab.doc_state
);
assert_eq!(
encode_collab_from_disk.state_vector.len(),
original_encode_collab.state_vector.len(),
"state_vector length mismatch"
);
assert_eq!(
encode_collab_from_disk.state_vector,
original_encode_collab.state_vector
);
}
}
#[sqlx::test(migrations = false)]
async fn simulate_large_data_set_write(pool: PgPool) {
// prepare test prerequisites
setup_db(&pool).await.unwrap();
setup_log();
let conn = redis_connection_manager().await;
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
let collab_cache = CollabCache::new(conn.clone(), pool);
let queue_name = uuid::Uuid::new_v4().to_string();
let storage_queue = StorageQueue::new(collab_cache.clone(), conn, &queue_name);
let origin_encode_collab = EncodedCollab::new_v1(
generate_random_bytes(10 * 1024),
generate_random_bytes(2 * 1024 * 1024),
);
let params = CollabParams {
object_id: uuid::Uuid::new_v4().to_string(),
collab_type: CollabType::Unknown,
encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap(),
};
storage_queue
.push(&user.workspace_id, &user.uid, &params, WritePriority::Low)
.await
.unwrap();
// Allow some time for processing
sleep(Duration::from_secs(30)).await;
let query = QueryCollab {
object_id: params.object_id.clone(),
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.await
.unwrap();
assert_eq!(
encode_collab_from_disk.doc_state.len(),
origin_encode_collab.doc_state.len(),
"doc_state length mismatch"
);
assert_eq!(
encode_collab_from_disk.doc_state,
origin_encode_collab.doc_state
);
assert_eq!(
encode_collab_from_disk.state_vector.len(),
origin_encode_collab.state_vector.len(),
"state_vector length mismatch"
);
assert_eq!(
encode_collab_from_disk.state_vector,
origin_encode_collab.state_vector
);
}

View file

@ -414,7 +414,7 @@ async fn multiple_user_with_read_and_write_permission_edit_same_collab_test() {
let collab_type = CollabType::Unknown;
let workspace_id = owner.workspace_id().await;
owner
.create_and_edit_collab_with_data(object_id.clone(), &workspace_id, collab_type.clone(), None)
.create_and_edit_collab_with_data(&object_id, &workspace_id, collab_type.clone(), None)
.await;
let arc_owner = Arc::new(owner);

View file

@ -1,18 +1,32 @@
use collab::core::transaction::DocTransactionExtension;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use collab::entity::EncodedCollab;
use collab::preclude::Doc;
use collab_entity::CollabType;
use sqlx::types::Uuid;
use sqlx::PgPool;
use tokio::sync::Mutex;
use tokio::time::sleep;
use app_error::ErrorCode;
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::mem_cache::CollabMemCache;
use appflowy_collaborate::collab::queue::StorageQueue;
use appflowy_collaborate::collab::WritePriority;
use client_api_test::*;
use database::collab::CollabMetadata;
use database_entity::dto::{
CreateCollabParams, DeleteCollabParams, QueryCollab, QueryCollabParams, QueryCollabResult,
CollabParams, CreateCollabParams, DeleteCollabParams, QueryCollab, QueryCollabParams,
QueryCollabResult,
};
use workspace_template::document::get_started::GetStartedDocumentTemplate;
use workspace_template::WorkspaceTemplateBuilder;
use crate::collab::util::{redis_connection_manager, test_encode_collab_v1};
use crate::collab::util::{generate_random_bytes, redis_connection_manager, test_encode_collab_v1};
use crate::sql_test::util::{setup_db, test_create_user};
#[tokio::test]
async fn success_insert_collab_test() {
@ -320,3 +334,249 @@ async fn collab_meta_redis_cache_test() {
assert_eq!(meta.workspace_id, meta_from_cache.workspace_id);
assert_eq!(meta.object_id, meta_from_cache.object_id);
}
#[tokio::test]
async fn insert_empty_data_test() {
let test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = uuid::Uuid::new_v4().to_string();
// test all collab type
for collab_type in [
CollabType::Folder,
CollabType::Document,
CollabType::UserAwareness,
CollabType::WorkspaceDatabase,
CollabType::Database,
CollabType::DatabaseRow,
] {
let params = CreateCollabParams {
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
encoded_collab_v1: vec![],
collab_type,
};
let error = test_client
.api_client
.create_collab(params)
.await
.unwrap_err();
assert_eq!(error.code, ErrorCode::NoRequiredData);
}
}
#[tokio::test]
async fn insert_invalid_data_test() {
let test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = uuid::Uuid::new_v4().to_string();
let doc = Doc::new();
let encoded_collab_v1 = doc.get_encoded_collab_v1().encode_to_bytes().unwrap();
for collab_type in [
CollabType::Folder,
CollabType::Document,
CollabType::UserAwareness,
CollabType::WorkspaceDatabase,
CollabType::Database,
CollabType::DatabaseRow,
] {
let params = CreateCollabParams {
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
encoded_collab_v1: encoded_collab_v1.clone(),
collab_type: collab_type.clone(),
};
let error = test_client
.api_client
.create_collab(params)
.await
.unwrap_err();
assert_eq!(
error.code,
ErrorCode::NoRequiredData,
"collab_type: {:?}",
collab_type
);
}
}
#[tokio::test]
async fn insert_folder_data_success_test() {
let test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = uuid::Uuid::new_v4().to_string();
let uid = test_client.uid().await;
let templates = WorkspaceTemplateBuilder::new(uid, &workspace_id)
.with_templates(vec![GetStartedDocumentTemplate])
.build()
.await
.unwrap();
assert_eq!(templates.len(), 2);
for (index, template) in templates.into_iter().enumerate() {
if index == 0 {
assert_eq!(template.object_type, CollabType::Document);
}
if index == 1 {
assert_eq!(template.object_type, CollabType::Folder);
}
let data = template.object_data.encode_to_bytes().unwrap();
let params = CreateCollabParams {
workspace_id: workspace_id.clone(),
object_id: object_id.clone(),
encoded_collab_v1: data,
collab_type: template.object_type,
};
test_client.api_client.create_collab(params).await.unwrap();
}
}
#[sqlx::test(migrations = false)]
async fn simulate_small_data_set_write(pool: PgPool) {
// prepare test prerequisites
setup_db(&pool).await.unwrap();
setup_log();
let conn = redis_connection_manager().await;
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
let collab_cache = CollabCache::new(conn.clone(), pool);
let queue_name = uuid::Uuid::new_v4().to_string();
let storage_queue = StorageQueue::new(collab_cache.clone(), conn, &queue_name);
let queries = Arc::new(Mutex::new(Vec::new()));
for i in 0..10 {
// sleep random seconds less than 2 seconds. because the runtime is single-threaded,
// we need sleep a little time to let the runtime switch to other tasks.
sleep(Duration::from_millis(i % 2)).await;
let cloned_storage_queue = storage_queue.clone();
let cloned_queries = queries.clone();
let cloned_user = user.clone();
let encode_collab = EncodedCollab::new_v1(
generate_random_bytes(1024),
generate_random_bytes(1024 * 1024),
);
let params = CollabParams {
object_id: format!("object_id_{}", i),
collab_type: CollabType::Unknown,
encoded_collab_v1: encode_collab.encode_to_bytes().unwrap(),
};
cloned_storage_queue
.push(
&cloned_user.workspace_id,
&cloned_user.uid,
&params,
WritePriority::Low,
)
.await
.unwrap();
cloned_queries.lock().await.push((params, encode_collab));
}
// Allow some time for processing
sleep(Duration::from_secs(30)).await;
// Check that all items are processed correctly
for (params, original_encode_collab) in queries.lock().await.iter() {
let query = QueryCollab {
object_id: params.object_id.clone(),
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.await
.unwrap();
assert_eq!(
encode_collab_from_disk.doc_state.len(),
original_encode_collab.doc_state.len(),
"doc_state length mismatch"
);
assert_eq!(
encode_collab_from_disk.doc_state,
original_encode_collab.doc_state
);
assert_eq!(
encode_collab_from_disk.state_vector.len(),
original_encode_collab.state_vector.len(),
"state_vector length mismatch"
);
assert_eq!(
encode_collab_from_disk.state_vector,
original_encode_collab.state_vector
);
}
}
#[sqlx::test(migrations = false)]
async fn simulate_large_data_set_write(pool: PgPool) {
// prepare test prerequisites
setup_db(&pool).await.unwrap();
setup_log();
let conn = redis_connection_manager().await;
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
let collab_cache = CollabCache::new(conn.clone(), pool);
let queue_name = uuid::Uuid::new_v4().to_string();
let storage_queue = StorageQueue::new(collab_cache.clone(), conn, &queue_name);
let origin_encode_collab = EncodedCollab::new_v1(
generate_random_bytes(10 * 1024),
generate_random_bytes(2 * 1024 * 1024),
);
let params = CollabParams {
object_id: uuid::Uuid::new_v4().to_string(),
collab_type: CollabType::Unknown,
encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap(),
};
storage_queue
.push(&user.workspace_id, &user.uid, &params, WritePriority::Low)
.await
.unwrap();
// Allow some time for processing
sleep(Duration::from_secs(30)).await;
let query = QueryCollab {
object_id: params.object_id.clone(),
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.await
.unwrap();
assert_eq!(
encode_collab_from_disk.doc_state.len(),
origin_encode_collab.doc_state.len(),
"doc_state length mismatch"
);
assert_eq!(
encode_collab_from_disk.doc_state,
origin_encode_collab.doc_state
);
assert_eq!(
encode_collab_from_disk.state_vector.len(),
origin_encode_collab.state_vector.len(),
"state_vector length mismatch"
);
assert_eq!(
encode_collab_from_disk.state_vector,
origin_encode_collab.state_vector
);
}

View file

@ -0,0 +1,126 @@
use assert_json_diff::assert_json_include;
use client_api_test::TestClient;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::preclude::updates::decoder::Decode;
use collab::preclude::updates::encoder::{Encoder, EncoderV2};
use collab::preclude::{Collab, ReadTxn, Snapshot, Update};
use collab_entity::CollabType;
use serde_json::json;
use tokio::time::sleep;
#[tokio::test]
async fn collab_history_and_snapshot_test() {
// Set up all the required data
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = uuid::Uuid::new_v4().to_string();
// Using [CollabType::Unknown] for testing purposes.
let collab_type = CollabType::Unknown;
test_client
.create_and_edit_collab_with_data(&object_id, &workspace_id, collab_type.clone(), None)
.await;
test_client
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
// from the beginning, there should be no snapshots
let snapshots = test_client
.api_client
.get_snapshots(&workspace_id, &object_id, collab_type.clone())
.await
.unwrap()
.items;
assert!(snapshots.is_empty());
// Simulate the client editing the collaboration object. A snapshot is generated if the number of edits
// exceeds a specific threshold. By default, [CollabType::Unknown] has a threshold of 10 edits in debug mode.
for i in 0..10 {
test_client
.collabs
.get_mut(&object_id)
.unwrap()
.mutex_collab
.lock()
.insert(&i.to_string(), i.to_string());
sleep(std::time::Duration::from_millis(500)).await;
}
// Wait for the snapshot to be generated.
sleep(std::time::Duration::from_secs(10)).await;
let snapshots = test_client
.api_client
.get_snapshots(&workspace_id, &object_id, collab_type.clone())
.await
.unwrap()
.items;
assert!(!snapshots.is_empty());
// Get the latest history
let snapshot_info = test_client
.api_client
.get_latest_history(&workspace_id, &object_id, collab_type)
.await
.unwrap();
let full_collab = Collab::new_with_source(
CollabOrigin::Empty,
&object_id,
DataSource::DocStateV2(snapshot_info.history.doc_state),
vec![],
true,
)
.unwrap();
// Collab restored from the history data may not contain all the data. So just compare part of the data.
assert_json_include!(
actual: full_collab.to_json_value(),
expected: json!({
"0": "0",
"1": "1",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
})
);
// Collab restored from snapshot data might equal to full_collab or be a subset of full_collab.
let snapshot = Snapshot::decode_v1(snapshot_info.snapshot_meta.snapshot.as_slice()).unwrap();
let json_snapshot = json_from_snapshot(&full_collab, &object_id, &snapshot);
assert_json_include!(
actual: json_snapshot,
expected: json!({
"0": "0",
"1": "1",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
})
);
}
fn json_from_snapshot(
full_collab: &Collab,
object_id: &str,
snapshot: &Snapshot,
) -> serde_json::Value {
let update = doc_state_v2_from_snapshot(full_collab, snapshot);
let update = Update::decode_v2(update.as_slice()).unwrap();
let snapshot_collab = Collab::new_with_origin(CollabOrigin::Empty, object_id, vec![], true);
snapshot_collab.with_origin_transact_mut(|txn| {
txn.apply_update(update);
});
snapshot_collab.to_json_value()
}
fn doc_state_v2_from_snapshot(full_collab: &Collab, snapshot: &Snapshot) -> Vec<u8> {
let txn = full_collab.try_transaction().unwrap();
let mut encoder = EncoderV2::new();
txn
.encode_state_from_snapshot(snapshot, &mut encoder)
.unwrap();
encoder.to_vec()
}

View file

@ -0,0 +1 @@
mod document_history;

View file

@ -7,4 +7,5 @@ mod websocket;
mod workspace;
mod ai_test;
mod collab_history;
mod yrs_version;

View file

@ -4,7 +4,7 @@ use database::history::ops::{
get_latest_snapshot, get_latest_snapshot_state, get_snapshot_meta_list, insert_history,
};
use sqlx::PgPool;
use tonic_proto::history::{SnapshotMeta, SnapshotState};
use tonic_proto::history::{SnapshotMetaPb, SnapshotStatePb};
use uuid::Uuid;
#[sqlx::test(migrations = false)]
@ -24,13 +24,13 @@ async fn insert_snapshot_test(pool: PgPool) {
let collab_type = CollabType::Document;
let snapshots = vec![
SnapshotMeta {
SnapshotMetaPb {
oid: object_id.clone(),
snapshot: vec![1, 2, 3],
snapshot_version: 1,
created_at: timestamp,
},
SnapshotMeta {
SnapshotMetaPb {
oid: object_id.clone(),
snapshot: vec![3, 4, 5],
snapshot_version: 1,
@ -38,7 +38,7 @@ async fn insert_snapshot_test(pool: PgPool) {
},
];
let snapshot_state = SnapshotState {
let snapshot_state = SnapshotStatePb {
oid: object_id.clone(),
doc_state: vec![10, 11, 12],
doc_state_version: 1,
@ -78,5 +78,5 @@ async fn insert_snapshot_test(pool: PgPool) {
.unwrap()
.unwrap();
assert_eq!(snapshot.history_state.unwrap().doc_state, vec![10, 11, 12]);
assert_eq!(snapshot.snapshot, vec![3, 4, 5]);
assert_eq!(snapshot.snapshot_meta.unwrap().snapshot, vec![3, 4, 5]);
}

View file

@ -17,7 +17,7 @@ async fn main() -> Result<()> {
kill_existing_process(appflowy_history_bin_name).await?;
let mut appflowy_cloud_cmd = Command::new("cargo")
.args(["run", "--features", ""])
.args(["run", "--features", "history"])
.spawn()
.context("Failed to start AppFlowy-Cloud process")?;