diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index a8871e6302..566a6169e2 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -1072,6 +1072,7 @@ dependencies = [ "bytes", "dashmap", "flowy-error", + "flowy-revision", "flowy-sync", "futures-util", "lib-infra", diff --git a/frontend/rust-lib/flowy-document/src/editor/editor.rs b/frontend/rust-lib/flowy-document/src/editor/editor.rs index 5270932a1b..eee8eaa6de 100644 --- a/frontend/rust-lib/flowy-document/src/editor/editor.rs +++ b/frontend/rust-lib/flowy-document/src/editor/editor.rs @@ -29,7 +29,9 @@ impl AppFlowyDocumentEditor { mut rev_manager: RevisionManager>, cloud_service: Arc, ) -> FlowyResult> { - let document = rev_manager.load::(Some(cloud_service)).await?; + let document = rev_manager + .initialize::(Some(cloud_service)) + .await?; let rev_manager = Arc::new(rev_manager); let command_sender = spawn_edit_queue(user, rev_manager.clone(), document); let doc_id = doc_id.to_string(); diff --git a/frontend/rust-lib/flowy-document/src/old_editor/editor.rs b/frontend/rust-lib/flowy-document/src/old_editor/editor.rs index 35de2de668..e48842351d 100644 --- a/frontend/rust-lib/flowy-document/src/old_editor/editor.rs +++ b/frontend/rust-lib/flowy-document/src/old_editor/editor.rs @@ -45,7 +45,7 @@ impl DeltaDocumentEditor { cloud_service: Arc, ) -> FlowyResult> { let document = rev_manager - .load::(Some(cloud_service)) + .initialize::(Some(cloud_service)) .await?; let operations = DeltaTextOperations::from_bytes(&document.content)?; let rev_manager = Arc::new(rev_manager); diff --git a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs index 93ec061565..2ad6a58f55 100644 --- a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs @@ -38,7 +38,9 @@ impl FolderEditor { let cloud = Arc::new(FolderRevisionCloudService { token: token.to_string(), }); - let folder = Arc::new(RwLock::new(rev_manager.load::(Some(cloud)).await?)); + let folder = Arc::new(RwLock::new( + rev_manager.initialize::(Some(cloud)).await?, + )); let rev_manager = Arc::new(rev_manager); #[cfg(feature = "sync")] diff --git a/frontend/rust-lib/flowy-grid/src/services/block_editor.rs b/frontend/rust-lib/flowy-grid/src/services/block_editor.rs index 2a8d01cb54..9ee6278bd6 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block_editor.rs @@ -34,7 +34,7 @@ impl GridBlockRevisionEditor { let cloud = Arc::new(GridBlockRevisionCloudService { token: token.to_owned(), }); - let block_revision_pad = rev_manager.load::(Some(cloud)).await?; + let block_revision_pad = rev_manager.initialize::(Some(cloud)).await?; let pad = Arc::new(RwLock::new(block_revision_pad)); let rev_manager = Arc::new(rev_manager); let user_id = user_id.to_owned(); diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index 20e8cf34cb..8598652abc 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -60,7 +60,7 @@ impl GridRevisionEditor { ) -> FlowyResult> { let token = user.token()?; let cloud = Arc::new(GridRevisionCloudService { token }); - let grid_pad = rev_manager.load::(Some(cloud)).await?; + let grid_pad = rev_manager.initialize::(Some(cloud)).await?; let rev_manager = Arc::new(rev_manager); let grid_pad = Arc::new(RwLock::new(grid_pad)); diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs index 4be9043861..4e0f707708 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs @@ -55,7 +55,7 @@ impl GridViewRevisionEditor { let cloud = Arc::new(GridViewRevisionCloudService { token: token.to_owned(), }); - let view_revision_pad = rev_manager.load::(Some(cloud)).await?; + let view_revision_pad = rev_manager.initialize::(Some(cloud)).await?; let pad = Arc::new(RwLock::new(view_revision_pad)); let rev_manager = Arc::new(rev_manager); let group_controller = new_group_controller( diff --git a/frontend/rust-lib/flowy-revision/Cargo.toml b/frontend/rust-lib/flowy-revision/Cargo.toml index 54f1902afa..8e6800e823 100644 --- a/frontend/rust-lib/flowy-revision/Cargo.toml +++ b/frontend/rust-lib/flowy-revision/Cargo.toml @@ -23,6 +23,7 @@ serde_json = {version = "1.0"} [dev-dependencies] nanoid = "0.4.0" +flowy-revision = {path = ".", features = ["flowy_unit_test"]} serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } parking_lot = "0.11" diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index a978c32760..7033894ca5 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -108,7 +108,7 @@ impl RevisionManager { } #[tracing::instrument(level = "debug", skip_all, fields(object_id) err)] - pub async fn load(&mut self, cloud: Option>) -> FlowyResult + pub async fn initialize(&mut self, cloud: Option>) -> FlowyResult where B: RevisionObjectDeserializer, { @@ -199,6 +199,10 @@ impl RevisionManager { self.rev_persistence.number_of_sync_records() } + pub fn number_of_revisions_in_disk(&self) -> usize { + self.rev_persistence.number_of_records_in_disk() + } + pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result, FlowyError> { let revisions = self.rev_persistence.revisions_in_range(&range).await?; Ok(revisions) @@ -230,13 +234,16 @@ impl WSDataProviderDataSource for Arc RevisionManager { +impl RevisionManager { pub async fn revision_cache(&self) -> Arc> { self.rev_persistence.clone() } pub fn ack_notify(&self) -> tokio::sync::broadcast::Receiver { self.rev_ack_notifier.subscribe() } + pub fn get_all_revision_records(&self) -> FlowyResult> { + self.rev_persistence.load_all_records(&self.object_id) + } } pub struct RevisionLoader { @@ -248,7 +255,7 @@ pub struct RevisionLoader { impl RevisionLoader { pub async fn load(&self) -> Result<(Vec, i64), FlowyError> { - let records = self.rev_persistence.batch_get(&self.object_id)?; + let records = self.rev_persistence.load_all_records(&self.object_id)?; let revisions: Vec; let mut rev_id = 0; if records.is_empty() && self.cloud.is_some() { @@ -282,7 +289,7 @@ impl RevisionLoader { } pub async fn load_revisions(&self) -> Result, FlowyError> { - let records = self.rev_persistence.batch_get(&self.object_id)?; + let records = self.rev_persistence.load_all_records(&self.object_id)?; let revisions = records.into_iter().map(|record| record.revision).collect::<_>(); Ok(revisions) } diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 62d4a0e408..c74cb4e21e 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -14,6 +14,7 @@ use tokio::task::spawn_blocking; pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600; +#[derive(Clone)] pub struct RevisionPersistenceConfiguration { merge_threshold: usize, } @@ -24,14 +25,14 @@ impl RevisionPersistenceConfiguration { if merge_threshold > 1 { Self { merge_threshold } } else { - Self { merge_threshold: 2 } + Self { merge_threshold: 100 } } } } impl std::default::Default for RevisionPersistenceConfiguration { fn default() -> Self { - Self { merge_threshold: 2 } + Self { merge_threshold: 100 } } } @@ -93,7 +94,7 @@ where pub(crate) async fn sync_revision(&self, revision: &Revision) -> FlowyResult<()> { tracing::Span::current().record("rev_id", &revision.rev_id); self.add(revision.clone(), RevisionState::Sync, false).await?; - self.sync_seq.write().await.dry_push(revision.rev_id)?; + self.sync_seq.write().await.recv(revision.rev_id)?; Ok(()) } @@ -105,13 +106,17 @@ where rev_compress: &Arc, ) -> FlowyResult { let mut sync_seq = self.sync_seq.write().await; - let step = sync_seq.step; + let compact_length = sync_seq.compact_length; - // Before the new_revision pushed into the sync_seq, we check if the current `step` of the - // sync_seq is less equal or greater than the merge threshold. If yes, it's need to merged + // Before the new_revision is pushed into the sync_seq, we check if the current `step` of the + // sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged // with the new_revision into one revision. - if step >= self.configuration.merge_threshold - 1 { - let compact_seq = sync_seq.compact(); + let mut compact_seq = VecDeque::default(); + // tracing::info!("{}", compact_seq) + if compact_length >= self.configuration.merge_threshold - 1 { + compact_seq.extend(sync_seq.compact()); + } + if !compact_seq.is_empty() { let range = RevisionRange { start: *compact_seq.front().unwrap(), end: *compact_seq.back().unwrap(), @@ -127,7 +132,7 @@ where let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?; let rev_id = merged_revision.rev_id; tracing::Span::current().record("rev_id", &merged_revision.rev_id); - let _ = sync_seq.dry_push(merged_revision.rev_id)?; + let _ = sync_seq.recv(merged_revision.rev_id)?; // replace the revisions in range with compact revision self.compact(&range, merged_revision).await?; @@ -135,7 +140,7 @@ where } else { tracing::Span::current().record("rev_id", &new_revision.rev_id); self.add(new_revision.clone(), RevisionState::Sync, true).await?; - sync_seq.push(new_revision.rev_id)?; + sync_seq.merge_recv(new_revision.rev_id)?; Ok(new_revision.rev_id) } } @@ -163,6 +168,16 @@ where self.memory_cache.number_of_sync_records() } + pub(crate) fn number_of_records_in_disk(&self) -> usize { + match self.disk_cache.read_revision_records(&self.object_id, None) { + Ok(records) => records.len(), + Err(e) => { + tracing::error!("Read revision records failed: {:?}", e); + 0 + } + } + } + /// The cache gets reset while it conflicts with the remote revisions. #[tracing::instrument(level = "trace", skip(self, revisions), err)] pub(crate) async fn reset(&self, revisions: Vec) -> FlowyResult<()> { @@ -228,8 +243,8 @@ where } } - pub fn batch_get(&self, doc_id: &str) -> FlowyResult> { - self.disk_cache.read_revision_records(doc_id, None) + pub fn load_all_records(&self, object_id: &str) -> FlowyResult> { + self.disk_cache.read_revision_records(object_id, None) } // Read the revision which rev_id >= range.start && rev_id <= range.end @@ -289,8 +304,8 @@ impl RevisionMemoryCacheDelegate for Arc, - start: Option, - step: usize, + compact_index: Option, + compact_length: usize, } impl DeferSyncSequence { @@ -298,17 +313,22 @@ impl DeferSyncSequence { DeferSyncSequence::default() } - fn push(&mut self, new_rev_id: i64) -> FlowyResult<()> { - let _ = self.dry_push(new_rev_id)?; + /// Pushes the new_rev_id to the end of the list and marks this new_rev_id is mergeable. + /// + /// When calling `compact` method, it will return a list of revision ids started from + /// the `compact_start_pos`, and ends with the `compact_length`. + fn merge_recv(&mut self, new_rev_id: i64) -> FlowyResult<()> { + let _ = self.recv(new_rev_id)?; - self.step += 1; - if self.start.is_none() && !self.rev_ids.is_empty() { - self.start = Some(self.rev_ids.len() - 1); + self.compact_length += 1; + if self.compact_index.is_none() && !self.rev_ids.is_empty() { + self.compact_index = Some(self.rev_ids.len() - 1); } Ok(()) } - fn dry_push(&mut self, new_rev_id: i64) -> FlowyResult<()> { + /// Pushes the new_rev_id to the end of the list. + fn recv(&mut self, new_rev_id: i64) -> FlowyResult<()> { // The last revision's rev_id must be greater than the new one. if let Some(rev_id) = self.rev_ids.back() { if *rev_id >= new_rev_id { @@ -321,6 +341,7 @@ impl DeferSyncSequence { Ok(()) } + /// Removes the rev_id from the list fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> { let cur_rev_id = self.rev_ids.front().cloned(); if let Some(pop_rev_id) = cur_rev_id { @@ -331,7 +352,20 @@ impl DeferSyncSequence { ); return Err(FlowyError::internal().context(desc)); } - let _ = self.rev_ids.pop_front(); + + let mut compact_rev_id = None; + if let Some(compact_index) = self.compact_index { + compact_rev_id = self.rev_ids.get(compact_index).cloned(); + } + + let pop_rev_id = self.rev_ids.pop_front(); + if let (Some(compact_rev_id), Some(pop_rev_id)) = (compact_rev_id, pop_rev_id) { + if compact_rev_id <= pop_rev_id { + if self.compact_length > 0 { + self.compact_length -= 1; + } + } + } } Ok(()) } @@ -341,28 +375,22 @@ impl DeferSyncSequence { } fn clear(&mut self) { - self.start = None; - self.step = 0; + self.compact_index = None; + self.compact_length = 0; self.rev_ids.clear(); } // Compact the rev_ids into one except the current synchronizing rev_id. fn compact(&mut self) -> VecDeque { - if self.start.is_none() { - return VecDeque::default(); + let mut compact_seq = VecDeque::with_capacity(self.rev_ids.len()); + if let Some(start) = self.compact_index { + if start < self.rev_ids.len() { + let seq = self.rev_ids.split_off(start); + compact_seq.extend(seq); + } } - - let start = self.start.unwrap(); - let compact_seq = self.rev_ids.split_off(start); - self.start = None; - self.step = 0; + self.compact_index = None; + self.compact_length = 0; compact_seq - - // let mut new_seq = self.rev_ids.clone(); - // let mut drained = new_seq.drain(1..).collect::>(); - // - // let start = drained.pop_front()?; - // let end = drained.pop_back().unwrap_or(start); - // Some((RevisionRange { start, end }, new_seq)) } } diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs index b91d32386d..2e4d3119f5 100644 --- a/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs @@ -19,37 +19,31 @@ async fn revision_sync_test() { } #[tokio::test] -async fn revision_sync_multiple_revisions() { +async fn revision_compress_2_revisions_with_2_threshold_test() { let test = RevisionTest::new_with_configuration(2).await; - let (base_rev_id, rev_id_1) = test.next_rev_id_pair(); - test.run_script(AddLocalRevision { + test.run_script(AddLocalRevision2 { content: "123".to_string(), - base_rev_id, - rev_id: rev_id_1, + pair_rev_id: test.next_rev_id_pair(), }) .await; - let (base_rev_id, rev_id_2) = test.next_rev_id_pair(); - test.run_script(AddLocalRevision { + test.run_script(AddLocalRevision2 { content: "456".to_string(), - base_rev_id, - rev_id: rev_id_2, + pair_rev_id: test.next_rev_id_pair(), }) .await; test.run_scripts(vec![ - AssertNextSyncRevisionId { rev_id: Some(rev_id_1) }, - AckRevision { rev_id: rev_id_1 }, - AssertNextSyncRevisionId { rev_id: Some(rev_id_2) }, - AckRevision { rev_id: rev_id_2 }, + AssertNextSyncRevisionId { rev_id: Some(1) }, + AckRevision { rev_id: 1 }, AssertNextSyncRevisionId { rev_id: None }, ]) .await; } #[tokio::test] -async fn revision_compress_three_revisions_test() { +async fn revision_compress_4_revisions_with_threshold_2_test() { let test = RevisionTest::new_with_configuration(2).await; let (base_rev_id, rev_id_1) = test.next_rev_id_pair(); @@ -86,23 +80,23 @@ async fn revision_compress_three_revisions_test() { // rev_id_2,rev_id_3,rev_id4 will be merged into rev_id_1 test.run_scripts(vec![ - Wait { - milliseconds: REVISION_WRITE_INTERVAL_IN_MILLIS, - }, - AssertNumberOfSyncRevisions { num: 1 }, + AssertNumberOfSyncRevisions { num: 2 }, AssertNextSyncRevisionId { rev_id: Some(rev_id_1) }, AssertNextSyncRevisionContent { - expected: "1234".to_string(), + expected: "12".to_string(), }, AckRevision { rev_id: rev_id_1 }, - AssertNextSyncRevisionId { rev_id: None }, + AssertNextSyncRevisionId { rev_id: Some(rev_id_2) }, + AssertNextSyncRevisionContent { + expected: "34".to_string(), + }, ]) .await; } #[tokio::test] -async fn revision_compress_three_revisions_test2() { - let test = RevisionTest::new_with_configuration(2).await; +async fn revision_compress_8_revisions_with_threshold_4_test() { + let test = RevisionTest::new_with_configuration(4).await; let (base_rev_id, rev_id_1) = test.next_rev_id_pair(); test.run_script(AddLocalRevision { @@ -169,9 +163,6 @@ async fn revision_compress_three_revisions_test2() { .await; test.run_scripts(vec![ - // Wait { - // milliseconds: REVISION_WRITE_INTERVAL_IN_MILLIS, - // }, AssertNumberOfSyncRevisions { num: 2 }, AssertNextSyncRevisionId { rev_id: Some(rev_id_1) }, AssertNextSyncRevisionContent { @@ -241,3 +232,88 @@ async fn revision_merge_per_100_revision_test() { test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 10 }]).await; } + +#[tokio::test] +async fn revision_merge_per_100_revision_test2() { + let test = RevisionTest::new_with_configuration(100).await; + for i in 0..50 { + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + test.run_script(AddLocalRevision { + content: format!("{}", i), + base_rev_id, + rev_id, + }) + .await; + } + + test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 50 }]).await; +} + +#[tokio::test] +async fn revision_merge_per_1000_revision_test() { + let test = RevisionTest::new_with_configuration(1000).await; + for i in 0..100000 { + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + test.run_script(AddLocalRevision { + content: format!("{}", i), + base_rev_id, + rev_id, + }) + .await; + } + + test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 100 }]).await; +} + +#[tokio::test] +async fn revision_compress_revision_test() { + let test = RevisionTest::new_with_configuration(2).await; + + test.run_scripts(vec![ + AddLocalRevision2 { + content: "1".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AddLocalRevision2 { + content: "2".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AddLocalRevision2 { + content: "3".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AddLocalRevision2 { + content: "4".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AssertNumberOfSyncRevisions { num: 2 }, + ]) + .await; +} +#[tokio::test] +async fn revision_compress_revision_while_recv_ack_test() { + let test = RevisionTest::new_with_configuration(2).await; + test.run_scripts(vec![ + AddLocalRevision2 { + content: "1".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AckRevision { rev_id: 1 }, + AddLocalRevision2 { + content: "2".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AckRevision { rev_id: 2 }, + AddLocalRevision2 { + content: "3".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AckRevision { rev_id: 3 }, + AddLocalRevision2 { + content: "4".to_string(), + pair_rev_id: test.next_rev_id_pair(), + }, + AssertNumberOfSyncRevisions { num: 4 }, + ]) + .await; +} diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/mod.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/mod.rs index 91300b4b71..f0362f1436 100644 --- a/frontend/rust-lib/flowy-revision/tests/revision_test/mod.rs +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/mod.rs @@ -1,2 +1,3 @@ mod local_revision_test; +mod revision_disk_test; mod script; diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/revision_disk_test.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/revision_disk_test.rs new file mode 100644 index 0000000000..878e75ed58 --- /dev/null +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/revision_disk_test.rs @@ -0,0 +1,103 @@ +use crate::revision_test::script::RevisionScript::*; +use crate::revision_test::script::{InvalidRevisionObject, RevisionTest}; +use flowy_revision::REVISION_WRITE_INTERVAL_IN_MILLIS; + +#[tokio::test] +async fn revision_write_to_disk_test() { + let test = RevisionTest::new_with_configuration(2).await; + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + + test.run_script(AddLocalRevision { + content: "123".to_string(), + base_rev_id, + rev_id, + }) + .await; + + test.run_scripts(vec![ + AssertNumberOfRevisionsInDisk { num: 0 }, + WaitWhenWriteToDisk, + AssertNumberOfRevisionsInDisk { num: 1 }, + ]) + .await; +} + +#[tokio::test] +async fn revision_write_to_disk_with_merge_test() { + let test = RevisionTest::new_with_configuration(100).await; + for i in 0..1000 { + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + test.run_script(AddLocalRevision { + content: format!("{}", i), + base_rev_id, + rev_id, + }) + .await; + } + + test.run_scripts(vec![ + AssertNumberOfRevisionsInDisk { num: 0 }, + AssertNumberOfSyncRevisions { num: 10 }, + WaitWhenWriteToDisk, + AssertNumberOfRevisionsInDisk { num: 10 }, + ]) + .await; +} + +#[tokio::test] +async fn revision_read_from_disk_test() { + let test = RevisionTest::new_with_configuration(2).await; + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + test.run_scripts(vec![ + AddLocalRevision { + content: "123".to_string(), + base_rev_id, + rev_id, + }, + AssertNumberOfRevisionsInDisk { num: 0 }, + WaitWhenWriteToDisk, + AssertNumberOfRevisionsInDisk { num: 1 }, + ]) + .await; + + let test = RevisionTest::new_with_other(test).await; + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + test.run_scripts(vec![ + AssertNextSyncRevisionId { rev_id: Some(1) }, + AddLocalRevision { + content: "456".to_string(), + base_rev_id, + rev_id: rev_id.clone(), + }, + AckRevision { rev_id: 1 }, + AssertNextSyncRevisionId { rev_id: Some(rev_id) }, + ]) + .await; +} + +#[tokio::test] +#[should_panic] +async fn revision_read_from_disk_with_invalid_record_test() { + let test = RevisionTest::new_with_configuration(2).await; + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + test.run_script(AddLocalRevision { + content: "123".to_string(), + base_rev_id, + rev_id, + }) + .await; + + let (base_rev_id, rev_id) = test.next_rev_id_pair(); + test.run_script(AddInvalidLocalRevision { + bytes: InvalidRevisionObject::new(), + base_rev_id, + rev_id, + }) + .await; + + let test = RevisionTest::new_with_other(test).await; + test.run_scripts(vec![AssertNextSyncRevisionContent { + expected: "123".to_string(), + }]) + .await; +} diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs index dce38a0c09..b49cff9a6f 100644 --- a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs @@ -2,11 +2,13 @@ use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord}; use flowy_revision::{ - RevisionManager, RevisionMergeable, RevisionPersistence, RevisionPersistenceConfiguration, - RevisionSnapshotDiskCache, RevisionSnapshotInfo, + RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence, + RevisionPersistenceConfiguration, RevisionSnapshotDiskCache, RevisionSnapshotInfo, + REVISION_WRITE_INTERVAL_IN_MILLIS, }; +use flowy_sync::entities::document::DocumentPayloadPB; use flowy_sync::entities::revision::{Revision, RevisionRange}; -use flowy_sync::util::md5; +use flowy_sync::util::{make_operations_from_revisions, md5}; use nanoid::nanoid; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; @@ -19,6 +21,15 @@ pub enum RevisionScript { base_rev_id: i64, rev_id: i64, }, + AddLocalRevision2 { + content: String, + pair_rev_id: (i64, i64), + }, + AddInvalidLocalRevision { + bytes: Vec, + base_rev_id: i64, + rev_id: i64, + }, AckRevision { rev_id: i64, }, @@ -28,15 +39,19 @@ pub enum RevisionScript { AssertNumberOfSyncRevisions { num: usize, }, + AssertNumberOfRevisionsInDisk { + num: usize, + }, AssertNextSyncRevisionContent { expected: String, }, - Wait { - milliseconds: u64, - }, + WaitWhenWriteToDisk, } pub struct RevisionTest { + user_id: String, + object_id: String, + configuration: RevisionPersistenceConfiguration, rev_manager: Arc>, } @@ -45,19 +60,47 @@ impl RevisionTest { Self::new_with_configuration(2).await } - pub async fn new_with_configuration(merge_when_excess_number_of_version: i64) -> Self { + pub async fn new_with_configuration(merge_threshold: i64) -> Self { let user_id = nanoid!(10); let object_id = nanoid!(6); - let configuration = RevisionPersistenceConfiguration::new(merge_when_excess_number_of_version as usize); - let persistence = RevisionPersistence::new(&user_id, &object_id, RevisionDiskCacheMock::new(), configuration); + let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize); + let disk_cache = RevisionDiskCacheMock::new(vec![]); + let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone()); let compress = RevisionCompressMock {}; let snapshot = RevisionSnapshotMock {}; - let rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot); + let mut rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot); + rev_manager.initialize::(None).await.unwrap(); Self { + user_id, + object_id, + configuration, rev_manager: Arc::new(rev_manager), } } + pub async fn new_with_other(old_test: RevisionTest) -> Self { + let records = old_test.rev_manager.get_all_revision_records().unwrap(); + let disk_cache = RevisionDiskCacheMock::new(records); + let configuration = old_test.configuration; + let persistence = RevisionPersistence::new( + &old_test.user_id, + &old_test.object_id, + disk_cache, + configuration.clone(), + ); + + let compress = RevisionCompressMock {}; + let snapshot = RevisionSnapshotMock {}; + let mut rev_manager = + RevisionManager::new(&old_test.user_id, &old_test.object_id, persistence, compress, snapshot); + rev_manager.initialize::(None).await.unwrap(); + Self { + user_id: old_test.user_id, + object_id: old_test.object_id, + configuration, + rev_manager: Arc::new(rev_manager), + } + } pub async fn run_scripts(&self, scripts: Vec) { for script in scripts { self.run_script(script).await; @@ -87,6 +130,34 @@ impl RevisionTest { ); self.rev_manager.add_local_revision(&revision).await.unwrap(); } + RevisionScript::AddLocalRevision2 { content, pair_rev_id } => { + let object = RevisionObjectMock::new(&content); + let bytes = object.to_bytes(); + let md5 = md5(&bytes); + let revision = Revision::new( + &self.rev_manager.object_id, + pair_rev_id.0, + pair_rev_id.1, + Bytes::from(bytes), + md5, + ); + self.rev_manager.add_local_revision(&revision).await.unwrap(); + } + RevisionScript::AddInvalidLocalRevision { + bytes, + base_rev_id, + rev_id, + } => { + let md5 = md5(&bytes); + let revision = Revision::new( + &self.rev_manager.object_id, + base_rev_id, + rev_id, + Bytes::from(bytes), + md5, + ); + self.rev_manager.add_local_revision(&revision).await.unwrap(); + } RevisionScript::AckRevision { rev_id } => { // self.rev_manager.ack_revision(rev_id).await.unwrap() @@ -97,6 +168,9 @@ impl RevisionTest { RevisionScript::AssertNumberOfSyncRevisions { num } => { assert_eq!(self.rev_manager.number_of_sync_revisions(), num) } + RevisionScript::AssertNumberOfRevisionsInDisk { num } => { + assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num) + } RevisionScript::AssertNextSyncRevisionContent { expected } => { // let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap(); @@ -104,7 +178,8 @@ impl RevisionTest { let object = RevisionObjectMock::from_bytes(&revision.bytes); assert_eq!(object.content, expected); } - RevisionScript::Wait { milliseconds } => { + RevisionScript::WaitWhenWriteToDisk => { + let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS; tokio::time::sleep(Duration::from_millis(milliseconds)).await; } } @@ -116,9 +191,9 @@ pub struct RevisionDiskCacheMock { } impl RevisionDiskCacheMock { - pub fn new() -> Self { + pub fn new(records: Vec) -> Self { Self { - records: RwLock::new(vec![]), + records: RwLock::new(records), } } } @@ -138,17 +213,36 @@ impl RevisionDiskCache for RevisionDiskCacheMock { fn read_revision_records( &self, _object_id: &str, - _rev_ids: Option>, + rev_ids: Option>, ) -> Result, Self::Error> { - todo!() + match rev_ids { + None => Ok(self.records.read().clone()), + Some(rev_ids) => Ok(self + .records + .read() + .iter() + .filter(|record| rev_ids.contains(&record.revision.rev_id)) + .cloned() + .collect::>()), + } } fn read_revision_records_with_range( &self, _object_id: &str, - _range: &RevisionRange, + range: &RevisionRange, ) -> Result, Self::Error> { - todo!() + let read_guard = self.records.read(); + let records = range + .iter() + .flat_map(|rev_id| { + read_guard + .iter() + .find(|record| record.revision.rev_id == rev_id) + .cloned() + }) + .collect::>(); + Ok(records) } fn update_revision_record(&self, changesets: Vec) -> FlowyResult<()> { @@ -195,9 +289,7 @@ impl RevisionDiskCache for RevisionDiskCacheMock { } pub struct RevisionConnectionMock {} - pub struct RevisionSnapshotMock {} - impl RevisionSnapshotDiskCache for RevisionSnapshotMock { fn write_snapshot(&self, _object_id: &str, _rev_id: i64, _data: Vec) -> FlowyResult<()> { todo!() @@ -215,12 +307,31 @@ impl RevisionMergeable for RevisionCompressMock { let mut object = RevisionObjectMock::new(""); for revision in revisions { let other = RevisionObjectMock::from_bytes(&revision.bytes); - object.compose(other); + let _ = object.compose(other)?; } Ok(Bytes::from(object.to_bytes())) } } +#[derive(Serialize, Deserialize)] +pub struct InvalidRevisionObject { + data: String, +} + +impl InvalidRevisionObject { + pub fn new() -> Vec { + let object = InvalidRevisionObject { data: "".to_string() }; + object.to_bytes() + } + fn to_bytes(&self) -> Vec { + serde_json::to_vec(self).unwrap() + } + + fn from_bytes(bytes: &[u8]) -> Self { + serde_json::from_slice(bytes).unwrap() + } +} + #[derive(Serialize, Deserialize)] pub struct RevisionObjectMock { content: String, @@ -231,8 +342,9 @@ impl RevisionObjectMock { Self { content: s.to_owned() } } - pub fn compose(&mut self, other: RevisionObjectMock) { + pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> { self.content.push_str(other.content.as_str()); + Ok(()) } pub fn to_bytes(&self) -> Vec { @@ -243,3 +355,22 @@ impl RevisionObjectMock { serde_json::from_slice(bytes).unwrap() } } + +pub struct RevisionObjectMockSerde(); +impl RevisionObjectDeserializer for RevisionObjectMockSerde { + type Output = RevisionObjectMock; + + fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult { + let mut object = RevisionObjectMock::new(""); + if revisions.is_empty() { + return Ok(object); + } + + for revision in revisions { + let revision_object = RevisionObjectMock::from_bytes(&revision.bytes); + let _ = object.compose(revision_object)?; + } + + Ok(object) + } +} diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 58bae2594d..66e1816637 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -86,7 +86,7 @@ fn crate_log_filter(level: String) -> String { filters.push(format!("lib_ws={}", level)); filters.push(format!("lib_infra={}", level)); filters.push(format!("flowy_sync={}", level)); - // filters.push(format!("flowy_revision={}", level)); + filters.push(format!("flowy_revision={}", level)); // filters.push(format!("lib_dispatch={}", level)); filters.push(format!("dart_ffi={}", "info"));