From 1744938a515872d446e28807d278327fe01448fb Mon Sep 17 00:00:00 2001 From: appflowy Date: Sat, 18 Dec 2021 00:23:26 +0800 Subject: [PATCH] fix bugs --- frontend/rust-lib/flowy-core/Flowy.toml | 2 +- frontend/rust-lib/flowy-document/Flowy.toml | 2 +- .../src/services/doc/controller.rs | 21 +- .../src/services/doc/edit/editor.rs | 60 ++-- .../src/services/doc/edit/mod.rs | 4 +- .../doc/edit/{edit_ws.rs => web_socket.rs} | 18 +- .../src/services/doc/revision/cache/cache.rs | 273 +++++------------- .../src/services/doc/revision/cache/disk.rs | 49 +++- .../src/services/doc/revision/cache/memory.rs | 167 ++++------- .../src/services/doc/revision/cache/mod.rs | 1 + .../src/services/doc/revision/cache/sync.rs | 71 +++++ .../src/services/doc/revision/manager.rs | 94 +++++- .../flowy-document/src/services/doc/ws/mod.rs | 0 .../rust-lib/flowy-test/src/doc_script.rs | 4 +- frontend/rust-lib/flowy-user/Flowy.toml | 2 +- .../src/core/sync/synchronizer.rs | 6 +- .../flowy-collaboration/src/entities/ws/ws.rs | 2 +- 17 files changed, 409 insertions(+), 367 deletions(-) rename frontend/rust-lib/flowy-document/src/services/doc/edit/{edit_ws.rs => web_socket.rs} (93%) create mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs delete mode 100644 frontend/rust-lib/flowy-document/src/services/doc/ws/mod.rs diff --git a/frontend/rust-lib/flowy-core/Flowy.toml b/frontend/rust-lib/flowy-core/Flowy.toml index 152585acd5..667c9f7ae4 100644 --- a/frontend/rust-lib/flowy-core/Flowy.toml +++ b/frontend/rust-lib/flowy-core/Flowy.toml @@ -1,3 +1,3 @@ -proto_crates = ["src/entities", "src/event.rs", "src/errors.rs", "src/notify"] +proto_crates = ["src/entities", "src/event.rs", "src/notify"] event_files = ["src/event.rs"] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-document/Flowy.toml b/frontend/rust-lib/flowy-document/Flowy.toml index d55cb31ce9..f9ff7f7438 100644 --- a/frontend/rust-lib/flowy-document/Flowy.toml +++ b/frontend/rust-lib/flowy-document/Flowy.toml @@ -1,3 +1,3 @@ -proto_crates = ["src/event.rs", "src/errors.rs", "src/notify"] +proto_crates = ["src/notify"] event_files = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs index 12813a1daf..7ea9a59fe1 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs @@ -28,7 +28,12 @@ pub(crate) struct DocController { impl DocController { pub(crate) fn new(server: Server, user: Arc, ws_handlers: Arc) -> Self { let open_cache = Arc::new(OpenDocCache::new()); - Self { server, ws_handlers, open_cache, user } + Self { + server, + ws_handlers, + open_cache, + user, + } } pub(crate) fn init(&self) -> FlowyResult<()> { @@ -93,8 +98,13 @@ impl DocController { pool: Arc, ) -> Result, FlowyError> { let user = self.user.clone(); + let token = self.user.token()?; let rev_manager = self.make_rev_manager(doc_id, pool.clone())?; - let doc_editor = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_handlers.ws()).await?; + let server = Arc::new(RevisionServerImpl { + token, + server: self.server.clone(), + }); + let doc_editor = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_handlers.ws(), server).await?; let ws_handler = doc_editor.ws_handler(); self.ws_handlers.register_handler(doc_id, ws_handler); self.open_cache.insert(&doc_id, &doc_editor); @@ -105,13 +115,8 @@ impl DocController { // Opti: require upgradable_read lock and then upgrade to write lock using // RwLockUpgradableReadGuard::upgrade(xx) of ws // let doc = self.read_doc(doc_id, pool.clone()).await?; - let token = self.user.token()?; let user_id = self.user.user_id()?; - let server = Arc::new(RevisionServerImpl { - token, - server: self.server.clone(), - }); - let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool, server)); + let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool)); Ok(RevisionManager::new(&user_id, doc_id, cache)) } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index c5fc1ae140..8f5a697c93 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -13,16 +13,15 @@ use lib_ot::{ revision::{RevId, RevType, Revision, RevisionRange}, rich_text::{RichTextAttribute, RichTextDelta}, }; -use parking_lot::RwLock; use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; -type SinkVec = Arc>>; +use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot, RwLock}; + pub struct ClientDocEditor { pub doc_id: String, rev_manager: Arc, ws_manager: Arc, edit_cmd_tx: UnboundedSender, - sink_vec: SinkVec, + sink_data_provider: SinkDataProvider, user: Arc, } @@ -33,22 +32,23 @@ impl ClientDocEditor { pool: Arc, mut rev_manager: RevisionManager, ws: Arc, + server: Arc, ) -> FlowyResult> { - let delta = rev_manager.load_document().await?; + let delta = rev_manager.load_document(server).await?; let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); let rev_manager = Arc::new(rev_manager); - let sink_vec = Arc::new(RwLock::new(VecDeque::new())); + let sink_data_provider = Arc::new(RwLock::new(VecDeque::new())); let data_provider = Arc::new(DocumentSinkDataProviderAdapter { rev_manager: rev_manager.clone(), - sink_vec: sink_vec.clone(), + data_provider: sink_data_provider.clone(), }); let stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { doc_id: doc_id.clone(), edit_cmd_tx: edit_cmd_tx.clone(), rev_manager: rev_manager.clone(), user: user.clone(), - sink_vec: sink_vec.clone(), + sink_data_provider: sink_data_provider.clone(), }); let ws_manager = Arc::new(WebSocketManager::new(&doc_id, ws, data_provider, stream_consumer)); let editor = Arc::new(Self { @@ -56,7 +56,7 @@ impl ClientDocEditor { rev_manager, ws_manager, edit_cmd_tx, - sink_vec, + sink_data_provider, user, }); Ok(editor) @@ -202,7 +202,7 @@ struct DocumentWebSocketSteamConsumerAdapter { edit_cmd_tx: UnboundedSender, rev_manager: Arc, user: Arc, - sink_vec: SinkVec, + sink_data_provider: SinkDataProvider, } impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { @@ -210,43 +210,50 @@ impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { let user = self.user.clone(); let rev_manager = self.rev_manager.clone(); let edit_cmd_tx = self.edit_cmd_tx.clone(); + let sink_data_provider = self.sink_data_provider.clone(); let doc_id = self.doc_id.clone(); FutureResult::new(async move { let user_id = user.user_id()?; - let _revision = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await?; + if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? { + sink_data_provider.write().await.push_back(revision.into()); + } Ok(()) }) } - fn make_revision_from_range(&self, range: RevisionRange) -> FutureResult { - let rev_manager = self.rev_manager.clone(); - FutureResult::new(async move { - let revision = rev_manager.mk_revisions(range).await?; - Ok(revision) - }) - } - - fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError> { + fn receive_ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError> { let rev_manager = self.rev_manager.clone(); FutureResult::new(async move { let _ = rev_manager.ack_revision(rev_id).await?; Ok(()) }) } + + fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { + let rev_manager = self.rev_manager.clone(); + let sink_data_provider = self.sink_data_provider.clone(); + FutureResult::new(async move { + let revision = rev_manager.mk_revisions(range).await?; + sink_data_provider.write().await.push_back(revision.into()); + Ok(()) + }) + } } +type SinkDataProvider = Arc>>; + struct DocumentSinkDataProviderAdapter { rev_manager: Arc, - sink_vec: SinkVec, + data_provider: SinkDataProvider, } impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter { fn next(&self) -> FutureResult, FlowyError> { let rev_manager = self.rev_manager.clone(); - let sink_vec = self.sink_vec.clone(); + let data_provider = self.data_provider.clone(); FutureResult::new(async move { - if sink_vec.read().is_empty() { + if data_provider.read().await.is_empty() { match rev_manager.next_sync_revision().await? { Some(rev) => { tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id); @@ -255,9 +262,12 @@ impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter { None => Ok(None), } } else { - match sink_vec.read().front() { + match data_provider.read().await.front() { None => Ok(None), - Some(data) => Ok(Some(data.clone())), + Some(data) => { + tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); + Ok(Some(data.clone())) + }, } } }) diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs index 794da84e62..de21adfac6 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,8 +1,8 @@ mod edit_queue; -mod edit_ws; mod editor; mod model; +mod web_socket; pub(crate) use edit_queue::*; -pub use edit_ws::*; pub use editor::*; +pub use web_socket::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_ws.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/web_socket.rs similarity index 93% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/edit_ws.rs rename to frontend/rust-lib/flowy-document/src/services/doc/edit/web_socket.rs index 59d548eaa8..adcdf39a42 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_ws.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/web_socket.rs @@ -1,10 +1,7 @@ use crate::services::doc::{DocumentWebSocket, DocumentWsHandler, SYNC_INTERVAL_IN_MILLIS}; use async_stream::stream; use bytes::Bytes; -use flowy_collaboration::{ - entities::ws::{DocumentWSData, DocumentWSDataType}, - Revision, -}; +use flowy_collaboration::entities::ws::{DocumentWSData, DocumentWSDataType}; use flowy_error::{internal_error, FlowyError, FlowyResult}; use futures::stream::StreamExt; use lib_infra::future::FutureResult; @@ -66,7 +63,6 @@ impl WebSocketManager { &self.doc_id, self.stream_consumer.clone(), ws_msg_rx, - self.ws.clone(), self.stop_sync_tx.subscribe(), ); tokio::spawn(sink.run()); @@ -117,15 +113,14 @@ impl DocumentWsHandler for WebSocketManager { pub trait DocumentWebSocketSteamConsumer: Send + Sync { fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>; - fn make_revision_from_range(&self, range: RevisionRange) -> FutureResult; - fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError>; + fn receive_ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError>; + fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>; } pub(crate) struct DocumentWebSocketStream { doc_id: String, consumer: Arc, ws_msg_rx: Option>, - ws_sender: Arc, stop_rx: Option, } @@ -134,14 +129,12 @@ impl DocumentWebSocketStream { doc_id: &str, consumer: Arc, ws_msg_rx: mpsc::UnboundedReceiver, - ws_sender: Arc, stop_rx: SinkStopRx, ) -> Self { DocumentWebSocketStream { doc_id: doc_id.to_owned(), consumer, ws_msg_rx: Some(ws_msg_rx), - ws_sender, stop_rx: Some(stop_rx), } } @@ -200,12 +193,11 @@ impl DocumentWebSocketStream { }, DocumentWSDataType::PullRev => { let range = RevisionRange::try_from(bytes)?; - let revision = self.consumer.make_revision_from_range(range).await?; - let _ = self.ws_sender.send(revision.into()); + let _ = self.consumer.send_revision_in_range(range).await?; }, DocumentWSDataType::Acked => { let rev_id = RevId::try_from(bytes)?; - let _ = self.consumer.ack_revision(rev_id.into()).await; + let _ = self.consumer.receive_ack_revision(rev_id.into()).await; }, DocumentWSDataType::UserConnect => {}, } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs index 6858108afa..ce85c8c4dd 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs @@ -1,56 +1,51 @@ use crate::{ errors::FlowyError, services::doc::revision::{ - cache::{disk::RevisionDiskCache, memory::RevisionMemoryCache}, + cache::{ + disk::{Persistence, RevisionDiskCache}, + memory::{RevisionMemoryCache, RevisionMemoryCacheMissing}, + sync::RevisionSyncSeq, + }, RevisionRecord, - RevisionServer, }, - sql_tables::RevTableSql, }; -use bytes::Bytes; -use flowy_collaboration::{entities::doc::Doc, util::md5}; + use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyResult}; use lib_infra::future::FutureResult; use lib_ot::{ - core::{Operation, OperationTransformable}, - revision::{RevState, RevType, Revision, RevisionRange}, + core::Operation, + revision::{RevState, Revision, RevisionRange}, rich_text::RichTextDelta, }; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::{ - sync::{mpsc, RwLock}, + sync::RwLock, task::{spawn_blocking, JoinHandle}, }; -type DocRevisionDeskCache = dyn RevisionDiskCache; +type DocRevisionDiskCache = dyn RevisionDiskCache; pub struct RevisionCache { - user_id: String, doc_id: String, - dish_cache: Arc, + pub disk_cache: Arc, memory_cache: Arc, + sync_seq: Arc, defer_save: RwLock>>, - server: Arc, } impl RevisionCache { - pub fn new( - user_id: &str, - doc_id: &str, - pool: Arc, - server: Arc, - ) -> RevisionCache { + pub fn new(user_id: &str, doc_id: &str, pool: Arc) -> RevisionCache { + let disk_cache = Arc::new(Persistence::new(user_id, pool)); + let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone()))); + let sync_seq = Arc::new(RevisionSyncSeq::new()); let doc_id = doc_id.to_owned(); - let dish_cache = Arc::new(Persistence::new(user_id, pool)); - let memory_cache = Arc::new(RevisionMemoryCache::new()); Self { - user_id: user_id.to_owned(), doc_id, - dish_cache, + disk_cache, memory_cache, + sync_seq, defer_save: RwLock::new(None), - server, } } @@ -63,7 +58,8 @@ impl RevisionCache { revision, state: RevState::StateLocal, }; - self.memory_cache.add_revision(record).await?; + let _ = self.memory_cache.add_revision(&record).await; + self.sync_seq.add_revision(record).await?; self.save_revisions().await; Ok(()) } @@ -77,104 +73,60 @@ impl RevisionCache { revision, state: RevState::StateLocal, }; - self.memory_cache.add_revision(record).await?; + self.memory_cache.add_revision(&record).await; self.save_revisions().await; Ok(()) } #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id))] pub async fn ack_revision(&self, rev_id: i64) { - self.memory_cache.ack_revision(&rev_id).await; + self.sync_seq.ack_revision(&rev_id).await; self.save_revisions().await; } - pub async fn query_revision(&self, doc_id: &str, rev_id: i64) -> Option { - match self.memory_cache.query_revision(&rev_id).await { - None => match self.dish_cache.read_revision(doc_id, rev_id) { - Ok(revision) => revision, - Err(e) => { - log::error!("query_revision error: {:?}", e); - None - }, - }, - Some(record) => Some(record), - } + pub async fn get_revision(&self, _doc_id: &str, rev_id: i64) -> Option { + self.memory_cache.get_revision(&rev_id).await } async fn save_revisions(&self) { + // https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362 if let Some(handler) = self.defer_save.write().await.take() { handler.abort(); } - if self.memory_cache.is_empty() { - return; - } + // if self.sync_seq.is_empty() { + // return; + // } - let memory_cache = self.memory_cache.clone(); - let disk_cache = self.dish_cache.clone(); - *self.defer_save.write().await = Some(tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(300)).await; - let (ids, records) = memory_cache.revisions(); - match disk_cache.create_revisions(records) { - Ok(_) => { - memory_cache.remove_revisions(ids); - }, - Err(e) => log::error!("Save revision failed: {:?}", e), - } - })); + // let memory_cache = self.sync_seq.clone(); + // let disk_cache = self.disk_cache.clone(); + // *self.defer_save.write().await = Some(tokio::spawn(async move { + // tokio::time::sleep(Duration::from_millis(300)).await; + // let (ids, records) = memory_cache.revisions(); + // match disk_cache.create_revisions(records) { + // Ok(_) => { + // memory_cache.remove_revisions(ids); + // }, + // Err(e) => log::error!("Save revision failed: {:?}", e), + // } + // })); } pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult> { - let revs = self.memory_cache.revisions_in_range(&range).await?; - if revs.len() == range.len() as usize { - Ok(revs) - } else { - let doc_id = self.doc_id.clone(); - let disk_cache = self.dish_cache.clone(); - let records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) - .await - .map_err(internal_error)??; - - let revisions = records - .into_iter() - .map(|record| record.revision) - .collect::>(); - Ok(revisions) - } - } - - pub async fn load_document(&self) -> FlowyResult { - // Loading the document from disk and it will be sync with server. - let result = load_from_disk(&self.doc_id, self.memory_cache.clone(), self.dish_cache.clone()).await; - if result.is_ok() { - return result; - } - - // The document doesn't exist in local. Try load from server - let doc = self.server.fetch_document(&self.doc_id).await?; - let delta_data = Bytes::from(doc.data.clone()); - let doc_md5 = md5(&delta_data); - let revision = Revision::new( - &doc.id, - doc.base_rev_id, - doc.rev_id, - delta_data, - RevType::Remote, - &self.user_id, - doc_md5, - ); - - self.add_remote_revision(revision).await?; - Ok(doc) + let records = self.memory_cache.get_revisions_in_range(&range).await?; + Ok(records + .into_iter() + .map(|record| record.revision) + .collect::>()) } pub(crate) fn next_revision(&self) -> FutureResult, FlowyError> { - let memory_cache = self.memory_cache.clone(); - let disk_cache = self.dish_cache.clone(); + let sync_seq = self.sync_seq.clone(); + let disk_cache = self.disk_cache.clone(); let doc_id = self.doc_id.clone(); FutureResult::new(async move { - match memory_cache.front_local_revision().await { - None => match memory_cache.front_local_rev_id().await { + match sync_seq.next_sync_revision().await { + None => match sync_seq.next_sync_rev_id().await { None => Ok(None), Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? { None => Ok(None), @@ -187,116 +139,37 @@ impl RevisionCache { } } -async fn load_from_disk( - doc_id: &str, - memory_cache: Arc, - disk_cache: Arc, -) -> FlowyResult { - let doc_id = doc_id.to_owned(); - let (tx, mut rx) = mpsc::channel(2); - let doc = spawn_blocking(move || { - let records = disk_cache.read_revisions(&doc_id)?; - if records.is_empty() { - return Err(FlowyError::record_not_found().context("Local doesn't have this document")); +impl RevisionMemoryCacheMissing for Arc { + fn get_revision_record(&self, doc_id: &str, rev_id: i64) -> Result, FlowyError> { + match self.read_revision(&doc_id, rev_id)? { + None => { + tracing::warn!("Can't find revision in {} with rev_id: {}", doc_id, rev_id); + Ok(None) + }, + Some(record) => Ok(Some(record)), } + } - let (base_rev_id, rev_id) = records.last().unwrap().revision.pair_rev_id(); - let mut delta = RichTextDelta::new(); - for (_, record) in records.into_iter().enumerate() { - // Opti: revision's clone may cause memory issues - match RichTextDelta::from_bytes(record.revision.clone().delta_data) { - Ok(local_delta) => { - delta = delta.compose(&local_delta)?; - match tx.blocking_send(record) { - Ok(_) => {}, - Err(e) => tracing::error!("❌Load document from disk error: {}", e), - } - }, - Err(e) => { - tracing::error!("Deserialize delta from revision failed: {}", e); - }, - } - } + fn get_revision_records_with_range( + &self, + doc_id: &str, + range: RevisionRange, + ) -> FutureResult, FlowyError> { + let disk_cache = self.clone(); + let doc_id = doc_id.to_owned(); + FutureResult::new(async move { + let records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) + .await + .map_err(internal_error)??; - correct_delta_if_need(&mut delta); - Result::::Ok(Doc { - id: doc_id, - data: delta.to_json(), - rev_id, - base_rev_id, + Ok::, FlowyError>(records) }) - }) - .await - .map_err(internal_error)?; - - while let Some(record) = rx.recv().await { - match memory_cache.add_revision(record).await { - Ok(_) => {}, - Err(e) => log::error!("{:?}", e), - } - } - doc -} - -fn correct_delta_if_need(delta: &mut RichTextDelta) { - if delta.ops.last().is_none() { - return; - } - - let data = delta.ops.last().as_ref().unwrap().get_data(); - if !data.ends_with('\n') { - log::error!("❌The op must end with newline. Correcting it by inserting newline op"); - delta.ops.push(Operation::Insert("\n".into())); - } -} - -pub(crate) struct Persistence { - user_id: String, - pub(crate) pool: Arc, -} - -impl RevisionDiskCache for Persistence { - type Error = FlowyError; - - fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error> { - let conn = &*self.pool.get().map_err(internal_error)?; - conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = RevTableSql::create_rev_table(revisions, conn)?; - Ok(()) - }) - } - - fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error> { - let conn = &*self.pool.get().map_err(internal_error).unwrap(); - let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?; - Ok(revisions) - } - - fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error> { - let conn = self.pool.get().map_err(internal_error)?; - let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?; - Ok(some) - } - - fn read_revisions(&self, doc_id: &str) -> Result, Self::Error> { - let conn = self.pool.get().map_err(internal_error)?; - let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?; - Ok(some) - } -} - -impl Persistence { - pub(crate) fn new(user_id: &str, pool: Arc) -> Self { - Self { - user_id: user_id.to_owned(), - pool, - } } } #[cfg(feature = "flowy_unit_test")] impl RevisionCache { - pub fn dish_cache(&self) -> Arc { self.dish_cache.clone() } + pub fn disk_cache(&self) -> Arc { self.disk_cache.clone() } - pub fn memory_cache(&self) -> Arc { self.memory_cache.clone() } + pub fn memory_cache(&self) -> Arc { self.sync_seq.clone() } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs index a72a6ede8b..1cbde7dd91 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs @@ -1,7 +1,10 @@ use crate::services::doc::revision::RevisionRecord; +use crate::sql_tables::RevTableSql; +use flowy_database::ConnectionPool; +use flowy_error::{internal_error, FlowyError}; use lib_ot::revision::RevisionRange; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; pub trait RevisionDiskCache: Sync + Send { type Error: Debug; @@ -10,3 +13,47 @@ pub trait RevisionDiskCache: Sync + Send { fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error>; fn read_revisions(&self, doc_id: &str) -> Result, Self::Error>; } + +pub(crate) struct Persistence { + user_id: String, + pub(crate) pool: Arc, +} + +impl RevisionDiskCache for Persistence { + type Error = FlowyError; + + fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error> { + let conn = &*self.pool.get().map_err(internal_error)?; + conn.immediate_transaction::<_, FlowyError, _>(|| { + let _ = RevTableSql::create_rev_table(revisions, conn)?; + Ok(()) + }) + } + + fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error> { + let conn = &*self.pool.get().map_err(internal_error).unwrap(); + let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?; + Ok(revisions) + } + + fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error> { + let conn = self.pool.get().map_err(internal_error)?; + let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?; + Ok(some) + } + + fn read_revisions(&self, doc_id: &str) -> Result, Self::Error> { + let conn = self.pool.get().map_err(internal_error)?; + let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?; + Ok(some) + } +} + +impl Persistence { + pub(crate) fn new(user_id: &str, pool: Arc) -> Self { + Self { + user_id: user_id.to_owned(), + pool, + } + } +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs index 618a20db90..a468dffc9e 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs @@ -1,126 +1,87 @@ -use crate::services::doc::revision::RevisionRecord; +use crate::services::doc::RevisionRecord; use dashmap::DashMap; -use lib_ot::{ - errors::OTError, - revision::{RevState, Revision, RevisionRange}, -}; -use std::{collections::VecDeque, sync::Arc}; +use flowy_error::FlowyError; +use lib_infra::future::FutureResult; +use lib_ot::revision::RevisionRange; +use std::sync::Arc; use tokio::sync::RwLock; -pub struct RevisionMemoryCache { +pub(crate) trait RevisionMemoryCacheMissing: Send + Sync { + fn get_revision_record(&self, doc_id: &str, rev_id: i64) -> Result, FlowyError>; + fn get_revision_records_with_range( + &self, + doc_id: &str, + range: RevisionRange, + ) -> FutureResult, FlowyError>; +} + +pub(crate) struct RevisionMemoryCache { + doc_id: String, revs_map: Arc>, - local_revs: Arc>>, -} - -impl std::default::Default for RevisionMemoryCache { - fn default() -> Self { - let local_revs = Arc::new(RwLock::new(VecDeque::new())); - RevisionMemoryCache { - revs_map: Arc::new(DashMap::new()), - local_revs, - } - } + rev_loader: Arc, + revs_order: Arc>>, } +// TODO: remove outdated revisions to reduce memory usage impl RevisionMemoryCache { - pub fn new() -> Self { RevisionMemoryCache::default() } - - pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> { - // The last revision's rev_id must be greater than the new one. - if let Some(rev_id) = self.local_revs.read().await.back() { - if *rev_id >= record.revision.rev_id { - return Err(OTError::revision_id_conflict() - .context(format!("The new revision's id must be greater than {}", rev_id))); - } + pub(crate) fn new(doc_id: &str, rev_loader: Arc) -> Self { + RevisionMemoryCache { + doc_id: doc_id.to_owned(), + revs_map: Arc::new(DashMap::new()), + rev_loader, + revs_order: Arc::new(RwLock::new(vec![])), } - - match record.state { - RevState::StateLocal => { - tracing::debug!("{}:add revision {}", record.revision.doc_id, record.revision.rev_id); - self.local_revs.write().await.push_back(record.revision.rev_id); - }, - RevState::Acked => {}, - } - - self.revs_map.insert(record.revision.rev_id, record); - Ok(()) } - pub fn remove_revisions(&self, ids: Vec) { self.revs_map.retain(|k, _| !ids.contains(k)); } + pub(crate) async fn is_empty(&self) -> bool { self.revs_order.read().await.is_empty() } - pub async fn ack_revision(&self, rev_id: &i64) { - if let Some(pop_rev_id) = self.front_local_rev_id().await { - if &pop_rev_id != rev_id { + pub(crate) fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) } + + pub(crate) async fn add_revision(&self, record: &RevisionRecord) { + if let Some(rev_id) = self.revs_order.read().await.last() { + if *rev_id >= record.revision.rev_id { + tracing::error!("Duplicated revision added to memory_cache"); return; } } - - match self.local_revs.write().await.pop_front() { - None => {}, - Some(pop_rev_id) => { - if &pop_rev_id != rev_id { - tracing::error!("The front rev_id:{} not equal to ack rev_id: {}", pop_rev_id, rev_id); - assert_eq!(&pop_rev_id, rev_id); - } else { - tracing::debug!("pop revision {}", pop_rev_id); - } - }, - } + self.revs_map.insert(record.revision.rev_id, record.clone()); + self.revs_order.write().await.push(record.revision.rev_id); } - pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result, OTError> { - let revs = range - .iter() - .flat_map(|rev_id| match self.revs_map.get(&rev_id) { - None => None, - Some(record) => Some(record.revision.clone()), - }) - .collect::>(); - - if revs.len() == range.len() as usize { - Ok(revs) - } else { - Ok(vec![]) - } - } - - pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) } - - pub fn is_empty(&self) -> bool { self.revs_map.is_empty() } - - pub fn revisions(&self) -> (Vec, Vec) { - let mut records: Vec = vec![]; - let mut ids: Vec = vec![]; - - self.revs_map.iter().for_each(|kv| { - records.push(kv.value().clone()); - ids.push(*kv.key()); - }); - (ids, records) - } - - pub async fn query_revision(&self, rev_id: &i64) -> Option { - self.revs_map.get(&rev_id).map(|r| r.value().clone()) - } - - pub async fn front_local_revision(&self) -> Option<(i64, RevisionRecord)> { - match self.local_revs.read().await.front() { - None => None, - Some(rev_id) => match self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())) { - None => None, - Some(val) => { - tracing::debug!("{}:try send revision {}", val.1.revision.doc_id, val.1.revision.rev_id); - Some(val) + pub(crate) async fn get_revision(&self, rev_id: &i64) -> Option { + match self.revs_map.get(&rev_id).map(|r| r.value().clone()) { + None => match self.rev_loader.get_revision_record(&self.doc_id, *rev_id) { + Ok(revision) => revision, + Err(e) => { + tracing::error!("{}", e); + None }, }, + Some(revision) => Some(revision), } } - pub async fn front_local_rev_id(&self) -> Option { self.local_revs.read().await.front().copied() } -} + pub(crate) async fn get_revisions_in_range( + &self, + range: &RevisionRange, + ) -> Result, FlowyError> { + let range_len = range.len() as usize; + let revs = range + .iter() + .flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone())) + .collect::>(); -#[cfg(feature = "flowy_unit_test")] -impl RevisionMemoryCache { - pub fn revs_map(&self) -> Arc> { self.revs_map.clone() } - pub fn pending_revs(&self) -> Arc>> { self.local_revs.clone() } + if revs.len() == range_len { + Ok(revs) + } else { + let revs = self + .rev_loader + .get_revision_records_with_range(&self.doc_id, range.clone()) + .await?; + if revs.len() != range_len { + log::error!("Revisions len is not equal to range required"); + } + Ok(revs) + } + } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs index 2886d90fe0..916b334e4c 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs @@ -3,6 +3,7 @@ mod cache; mod disk; mod memory; mod model; +mod sync; pub use cache::*; pub use model::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs new file mode 100644 index 0000000000..449d3101e0 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs @@ -0,0 +1,71 @@ +use crate::services::doc::revision::RevisionRecord; +use dashmap::DashMap; +use lib_ot::errors::OTError; +use std::{collections::VecDeque, sync::Arc}; +use tokio::sync::RwLock; + +pub struct RevisionSyncSeq { + revs_map: Arc>, + local_revs: Arc>>, +} + +impl std::default::Default for RevisionSyncSeq { + fn default() -> Self { + let local_revs = Arc::new(RwLock::new(VecDeque::new())); + RevisionSyncSeq { + revs_map: Arc::new(DashMap::new()), + local_revs, + } + } +} + +impl RevisionSyncSeq { + pub fn new() -> Self { RevisionSyncSeq::default() } + + pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> { + // The last revision's rev_id must be greater than the new one. + if let Some(rev_id) = self.local_revs.read().await.back() { + if *rev_id >= record.revision.rev_id { + return Err(OTError::revision_id_conflict() + .context(format!("The new revision's id must be greater than {}", rev_id))); + } + } + self.revs_map.insert(record.revision.rev_id, record); + Ok(()) + } + + pub async fn ack_revision(&self, rev_id: &i64) { + if let Some(pop_rev_id) = self.next_sync_rev_id().await { + if &pop_rev_id != rev_id { + tracing::error!("The next ack rev_id must be equal to the next rev_id"); + assert_eq!(&pop_rev_id, rev_id); + return; + } + + tracing::debug!("pop revision {}", pop_rev_id); + self.revs_map.remove(&pop_rev_id); + let _ = self.local_revs.write().await.pop_front(); + } + } + + pub async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> { + match self.local_revs.read().await.front() { + None => None, + Some(rev_id) => match self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())) { + None => None, + Some(val) => { + tracing::debug!("{}:try send revision {}", val.1.revision.doc_id, val.1.revision.rev_id); + Some(val) + }, + }, + } + } + + pub async fn next_sync_rev_id(&self) -> Option { self.local_revs.read().await.front().copied() } +} + +#[cfg(feature = "flowy_unit_test")] +impl RevisionSyncSeq { + pub fn revs_map(&self) -> Arc> { self.revs_map.clone() } + pub fn pending_revs(&self) -> Arc>> { self.local_revs.clone() } +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 9cc4c6d4ed..a1c5f99cdb 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -1,4 +1,5 @@ use crate::{errors::FlowyError, services::doc::revision::RevisionCache}; +use bytes::Bytes; use flowy_collaboration::{ entities::doc::Doc, util::{md5, RevIdCounter}, @@ -6,8 +7,8 @@ use flowy_collaboration::{ use flowy_error::FlowyResult; use lib_infra::future::FutureResult; use lib_ot::{ - core::OperationTransformable, - revision::{RevType, Revision, RevisionRange}, + core::{Operation, OperationTransformable}, + revision::{RevState, RevType, Revision, RevisionRange}, rich_text::RichTextDelta, }; use std::sync::Arc; @@ -34,8 +35,16 @@ impl RevisionManager { } } - pub async fn load_document(&mut self) -> FlowyResult { - let doc = self.cache.load_document().await?; + pub async fn load_document(&mut self, server: Arc) -> FlowyResult { + let revisions = RevisionLoader { + doc_id: self.doc_id.clone(), + user_id: self.user_id.clone(), + server, + cache: self.cache.clone(), + } + .load() + .await?; + let doc = mk_doc_from_revisions(&self.doc_id, revisions)?; self.update_rev_id_counter_value(doc.rev_id); Ok(doc.delta()?) } @@ -100,3 +109,80 @@ impl RevisionManager { impl RevisionManager { pub fn revision_cache(&self) -> Arc { self.cache.clone() } } + +struct RevisionLoader { + doc_id: String, + user_id: String, + server: Arc, + cache: Arc, +} + +impl RevisionLoader { + async fn load(&self) -> Result, FlowyError> { + let records = self.cache.disk_cache.read_revisions(&self.doc_id)?; + let revisions: Vec; + if records.is_empty() { + let doc = self.server.fetch_document(&self.doc_id).await?; + let delta_data = Bytes::from(doc.data.clone()); + let doc_md5 = md5(&delta_data); + let revision = Revision::new( + &doc.id, + doc.base_rev_id, + doc.rev_id, + delta_data, + RevType::Remote, + &self.user_id, + doc_md5, + ); + let _ = self.cache.add_remote_revision(revision.clone()).await?; + revisions = vec![revision]; + } else { + for record in &records { + match record.state { + RevState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await { + Ok(_) => {}, + Err(e) => tracing::error!("{}", e), + }, + RevState::Acked => {}, + } + } + revisions = records.into_iter().map(|record| record.revision).collect::<_>(); + } + + Ok(revisions) + } +} + +fn mk_doc_from_revisions(doc_id: &str, revisions: Vec) -> FlowyResult { + let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id(); + let mut delta = RichTextDelta::new(); + for (_, revision) in revisions.into_iter().enumerate() { + match RichTextDelta::from_bytes(revision.delta_data) { + Ok(local_delta) => { + delta = delta.compose(&local_delta)?; + }, + Err(e) => { + tracing::error!("Deserialize delta from revision failed: {}", e); + }, + } + } + correct_delta_if_need(&mut delta); + + Result::::Ok(Doc { + id: doc_id.to_owned(), + data: delta.to_json(), + rev_id, + base_rev_id, + }) +} +fn correct_delta_if_need(delta: &mut RichTextDelta) { + if delta.ops.last().is_none() { + return; + } + + let data = delta.ops.last().as_ref().unwrap().get_data(); + if !data.ends_with('\n') { + log::error!("❌The op must end with newline. Correcting it by inserting newline op"); + delta.ops.push(Operation::Insert("\n".into())); + } +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/ws/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/ws/mod.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frontend/rust-lib/flowy-test/src/doc_script.rs b/frontend/rust-lib/flowy-test/src/doc_script.rs index 6c699033e2..d5cb59825a 100644 --- a/frontend/rust-lib/flowy-test/src/doc_script.rs +++ b/frontend/rust-lib/flowy-test/src/doc_script.rs @@ -47,7 +47,7 @@ impl EditorTest { let rev_manager = self.editor.rev_manager(); let cache = rev_manager.revision_cache(); let _memory_cache = cache.memory_cache(); - let _disk_cache = cache.dish_cache(); + let _disk_cache = cache.disk_cache(); let doc_id = self.editor.doc_id.clone(); let _user_id = self.sdk.user_session.user_id().unwrap(); let ws_manager = self.sdk.ws_manager.clone(); @@ -71,7 +71,7 @@ impl EditorTest { self.editor.replace(interval, s).await.unwrap(); }, EditorScript::AssertRevisionState(rev_id, state) => { - let record = cache.query_revision(&doc_id, rev_id).await.unwrap(); + let record = cache.get_revision(&doc_id, rev_id).await.unwrap(); assert_eq!(record.state, state); }, EditorScript::AssertCurrentRevId(rev_id) => { diff --git a/frontend/rust-lib/flowy-user/Flowy.toml b/frontend/rust-lib/flowy-user/Flowy.toml index 6560862303..2056e35fa5 100644 --- a/frontend/rust-lib/flowy-user/Flowy.toml +++ b/frontend/rust-lib/flowy-user/Flowy.toml @@ -1,3 +1,3 @@ -proto_crates = ["src/event.rs", "src/errors.rs", "src/notify"] +proto_crates = ["src/event.rs", "src/notify"] event_files = ["src/event.rs"] \ No newline at end of file diff --git a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs index ec93db482e..3c01cb3a22 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs @@ -1,20 +1,16 @@ use crate::{ core::document::Document, - entities::ws::{DocumentWSData, DocumentWSDataType, WsDocumentDataBuilder}, + entities::ws::{DocumentWSData, WsDocumentDataBuilder}, }; -use bytes::Bytes; use lib_ot::{ core::OperationTransformable, errors::OTError, - protobuf::RevId, revision::{RevType, Revision, RevisionRange}, rich_text::RichTextDelta, }; use parking_lot::RwLock; -use protobuf::Message; use std::{ cmp::Ordering, - convert::TryInto, fmt::Debug, sync::{ atomic::{AtomicI64, Ordering::SeqCst}, diff --git a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs index a13ac2fa79..53fb551f16 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs @@ -84,7 +84,7 @@ impl WsDocumentDataBuilder { // DocumentWSDataType::Acked -> RevId pub fn build_acked_message(doc_id: &str, rev_id: i64) -> DocumentWSData { - let cloned_rev_id = rev_id.clone(); + let cloned_rev_id = rev_id; let rev_id: RevId = rev_id.into(); let bytes: Bytes = rev_id.try_into().unwrap();