diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index 755a85c550..7b5849d541 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -48,8 +48,6 @@ impl ServerEditDoc { }) } - pub fn document_json(&self) -> String { self.document.read().to_json() } - #[tracing::instrument( level = "debug", skip(self, user), @@ -64,14 +62,14 @@ impl ServerEditDoc { match cur_rev_id.cmp(&rev_id) { Ordering::Less => { user.socket - .do_send(mk_pull_rev_ws_message(&self.doc_id, next(cur_rev_id), rev_id)) + .do_send(mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id)) .map_err(internal_error)?; }, Ordering::Equal => {}, Ordering::Greater => { let doc_delta = self.document.read().delta().clone(); let cli_revision = self.mk_revision(rev_id, doc_delta); - let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); + let ws_cli_revision = mk_push_message(&self.doc_id, cli_revision); user.socket.do_send(ws_cli_revision).map_err(internal_error)?; }, } @@ -94,38 +92,42 @@ impl ServerEditDoc { revision: Revision, pg_pool: Data, ) -> Result<(), ServerError> { - // Opti: find out another way to keep the user socket available. self.users.insert(user.id(), user.clone()); let cur_rev_id = self.rev_id.load(SeqCst); match cur_rev_id.cmp(&revision.rev_id) { Ordering::Less => { let next_rev_id = next(cur_rev_id); - if next_rev_id == revision.base_rev_id || cur_rev_id == revision.base_rev_id { + if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id { + // The rev is in the right order, just compose it. let _ = self.compose_revision(&revision, pg_pool).await?; user.socket - .do_send(mk_acked_ws_message(&revision)) + .do_send(mk_acked_message(&revision)) + .map_err(internal_error)?; + } else { + // The server document is outdated, pull the missing revision from the client. + user.socket + .do_send(mk_pull_message(&self.doc_id, next_rev_id, revision.rev_id)) .map_err(internal_error)?; } - - // The server document is outdated, try to get the missing revision from the - // client. - user.socket - .do_send(mk_pull_rev_ws_message(&self.doc_id, next_rev_id, revision.rev_id)) - .map_err(internal_error)?; }, - Ordering::Equal => {}, + Ordering::Equal => { + // Do nothing + }, Ordering::Greater => { // The client document is outdated. Transform the client revision delta and then // send the prime delta to the client. Client should compose the this prime // delta. let cli_revision = self.transform_revision(&revision)?; - let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); - user.socket.do_send(ws_cli_revision).map_err(internal_error)?; + user.socket + .do_send(mk_push_message(&self.doc_id, cli_revision)) + .map_err(internal_error)?; }, } Ok(()) } + pub fn document_json(&self) -> String { self.document.read().to_json() } + async fn compose_revision(&self, revision: &Revision, pg_pool: Data) -> Result<(), ServerError> { let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?; let _ = self.compose_delta(delta)?; @@ -197,7 +199,7 @@ impl ServerEditDoc { } } -fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { +fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { let bytes = revision.write_to_bytes().unwrap(); let data = WsDocumentData { doc_id: doc_id.to_string(), @@ -207,7 +209,7 @@ fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor mk_ws_message(data) } -fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { +fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { let range = RevisionRange { doc_id: doc_id.to_string(), start: from_rev_id, @@ -224,7 +226,7 @@ fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsM mk_ws_message(data) } -fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor { +fn mk_acked_message(revision: &Revision) -> WsMessageAdaptor { // let mut wtr = vec![]; // let _ = wtr.write_i64::(revision.rev_id); diff --git a/backend/tests/document/edit.rs b/backend/tests/document/edit.rs index aea9965627..1656cc388d 100644 --- a/backend/tests/document/edit.rs +++ b/backend/tests/document/edit.rs @@ -30,6 +30,20 @@ async fn delta_sync_while_editing() { .await; } +#[actix_rt::test] +async fn delta_sync_multi_revs() { + let test = DocumentTest::new().await; + test.run_scripts(vec![ + DocScript::ConnectWs, + DocScript::OpenDoc, + DocScript::InsertText(0, "abc"), + DocScript::InsertText(3, "123"), + DocScript::InsertText(6, "efg"), + DocScript::InsertText(9, "456"), + ]) + .await; +} + #[actix_rt::test] async fn delta_sync_while_editing_with_attribute() { let test = DocumentTest::new().await; diff --git a/rust-lib/flowy-document/Cargo.toml b/rust-lib/flowy-document/Cargo.toml index 27103efb37..ce09ef7642 100644 --- a/rust-lib/flowy-document/Cargo.toml +++ b/rust-lib/flowy-document/Cargo.toml @@ -41,6 +41,7 @@ md5 = "0.7.0" byteorder = {version = "1.3.4"} async-stream = "0.3.2" futures = "0.3.15" +pin-project = "1.0.0" [dev-dependencies] flowy-test = { path = "../flowy-test" } diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index f0bc89a9eb..f1f19a826e 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -80,6 +80,10 @@ pub struct Revision { pub ty: RevType, } +impl Revision { + pub fn is_empty(&self) -> bool { self.base_rev_id == self.rev_id } +} + impl std::fmt::Debug for Revision { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { let _ = f.write_fmt(format_args!("doc_id {}, ", self.doc_id))?; @@ -108,7 +112,10 @@ impl Revision { let delta_data = delta.as_ref().to_vec(); let base_rev_id = base_rev_id.into(); let rev_id = rev_id.into(); - debug_assert!(base_rev_id != rev_id); + + if base_rev_id != 0 { + debug_assert!(base_rev_id != rev_id); + } Self { base_rev_id, diff --git a/rust-lib/flowy-document/src/lib.rs b/rust-lib/flowy-document/src/lib.rs index 3656b825d0..73594c434c 100644 --- a/rust-lib/flowy-document/src/lib.rs +++ b/rust-lib/flowy-document/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(vecdeque_binary_search)] + pub mod entities; pub mod errors; pub mod module; diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index c320cf25a8..6a8fcb7e6e 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -10,7 +10,7 @@ use crate::{ cache::DocCache, doc::{ edit::{ClientEditDoc, EditDocWsHandler}, - revision::{DocRevision, RevisionServer}, + revision::RevisionServer, }, server::Server, ws::WsDocumentManager, @@ -18,7 +18,7 @@ use crate::{ }; use flowy_database::ConnectionPool; use flowy_infra::future::{wrap_future, FnFuture, ResultFuture}; -use flowy_ot::core::Delta; + use tokio::time::{interval, Duration}; pub(crate) struct DocController { diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 0731f97f98..6fdc3446c7 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -1,6 +1,6 @@ use crate::{ entities::{ - doc::{Doc, DocDelta, RevId, RevType, Revision, RevisionRange}, + doc::{DocDelta, RevId, RevType, Revision, RevisionRange}, ws::{WsDataType, WsDocumentData}, }, errors::{internal_error, DocError, DocResult}, @@ -12,7 +12,7 @@ use crate::{ message::{DocumentMsg, TransformDeltas}, model::OpenDocAction, }, - revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor}, + revision::{DocRevision, RevisionManager, RevisionServer, RevisionStoreActor}, UndoResult, }, ws::{DocumentWebSocket, WsDocumentHandler}, @@ -45,12 +45,14 @@ impl ClientEditDoc { server: Arc, user: Arc, ) -> DocResult { - let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone()); - let doc = load_document(rev_store.clone()).await?; - let delta = doc.delta()?; - let rev_manager = Arc::new(RevisionManager::new(doc_id, doc.rev_id.into(), rev_store)); + let (sender, mut receiver) = mpsc::channel(1); + let mut rev_manager = RevisionManager::new(doc_id, pool.clone(), server.clone(), sender); + spawn_rev_receiver(receiver, ws.clone()); + + let delta = rev_manager.load_document().await?; let document = spawn_doc_edit_actor(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); + let rev_manager = Arc::new(rev_manager); let edit_doc = Self { doc_id, rev_manager, @@ -72,7 +74,7 @@ impl ClientEditDoc { }; let _ = self.document.send(msg); let delta = rx.await.map_err(internal_error)??; - let rev_id = self.mk_revision(delta).await?; + let rev_id = self.save_revision(delta).await?; save_document(self.document.clone(), rev_id.into()).await } @@ -81,7 +83,7 @@ impl ClientEditDoc { let msg = DocumentMsg::Delete { interval, ret }; let _ = self.document.send(msg); let delta = rx.await.map_err(internal_error)??; - let _ = self.mk_revision(delta).await?; + let _ = self.save_revision(delta).await?; Ok(()) } @@ -94,7 +96,7 @@ impl ClientEditDoc { }; let _ = self.document.send(msg); let delta = rx.await.map_err(internal_error)??; - let _ = self.mk_revision(delta).await?; + let _ = self.save_revision(delta).await?; Ok(()) } @@ -107,7 +109,7 @@ impl ClientEditDoc { }; let _ = self.document.send(msg); let delta = rx.await.map_err(internal_error)??; - let _ = self.mk_revision(delta).await?; + let _ = self.save_revision(delta).await?; Ok(()) } @@ -152,7 +154,7 @@ impl ClientEditDoc { } #[tracing::instrument(level = "debug", skip(self, delta), fields(revision_delta = %delta.to_json(), send_state, base_rev_id, rev_id))] - async fn mk_revision(&self, delta: Delta) -> Result { + async fn save_revision(&self, delta: Delta) -> Result { let delta_data = delta.to_bytes(); let (base_rev_id, rev_id) = self.rev_manager.next_rev_id(); tracing::Span::current().record("base_rev_id", &base_rev_id); @@ -161,15 +163,6 @@ impl ClientEditDoc { let delta_data = delta_data.to_vec(); let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local); let _ = self.rev_manager.add_revision(&revision).await?; - match self.ws.send(revision.into()) { - Ok(_) => { - tracing::Span::current().record("send_state", &"success"); - }, - Err(e) => { - tracing::Span::current().record("send_state", &format!("failed: {:?}", e).as_str()); - }, - }; - Ok(rev_id.into()) } @@ -184,7 +177,7 @@ impl ClientEditDoc { let _ = self.document.send(msg); let _ = rx.await.map_err(internal_error)??; - let rev_id = self.mk_revision(delta).await?; + let rev_id = self.save_revision(delta).await?; save_document(self.document.clone(), rev_id).await } @@ -304,14 +297,24 @@ impl WsDocumentHandler for EditDocWsHandler { match state { WsState::Init => {}, WsState::Connected(_) => self.0.notify_open_doc(), - WsState::Disconnected(e) => { - log::error!("websocket error: {:?}", e); - // - }, + WsState::Disconnected(_e) => {}, } } } +fn spawn_rev_receiver(mut receiver: mpsc::Receiver, ws: Arc) { + tokio::spawn(async move { + loop { + while let Some(revision) = receiver.recv().await { + match ws.send(revision.into()) { + Ok(_) => {}, + Err(e) => log::error!("Send revision failed: {:?}", e), + }; + } + } + }); +} + async fn save_document(document: UnboundedSender, rev_id: RevId) -> DocResult<()> { let (ret, rx) = oneshot::channel::>(); let _ = document.send(DocumentMsg::SaveDocument { rev_id, ret }); @@ -319,27 +322,9 @@ async fn save_document(document: UnboundedSender, rev_id: RevId) -> result } -fn spawn_rev_store_actor( - doc_id: &str, - pool: Arc, - server: Arc, -) -> mpsc::Sender { - let (sender, receiver) = mpsc::channel::(50); - let actor = RevisionStoreActor::new(doc_id, pool, receiver, server); - tokio::spawn(actor.run()); - sender -} - fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc) -> UnboundedSender { let (sender, receiver) = mpsc::unbounded_channel::(); let actor = DocumentActor::new(&doc_id, delta, pool.clone(), receiver); tokio::spawn(actor.run()); sender } - -async fn load_document(sender: mpsc::Sender) -> DocResult { - let (ret, rx) = oneshot::channel(); - let _ = sender.send(RevisionCmd::DocumentDelta { ret }).await; - let result = rx.await.map_err(internal_error)?; - result -} diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 714d2cacb7..fd77902849 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -1,10 +1,12 @@ use crate::{ entities::doc::{Doc, RevId, RevType, Revision, RevisionRange}, - errors::{internal_error, DocError}, - services::{doc::revision::store_actor::RevisionCmd, util::RevIdCounter, ws::DocumentWebSocket}, + errors::{internal_error, DocError, DocResult}, + services::{doc::revision::RevisionStoreActor, util::RevIdCounter, ws::DocumentWebSocket}, }; +use flowy_database::ConnectionPool; use flowy_infra::future::ResultFuture; use flowy_ot::core::{Delta, OperationTransformable}; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; pub struct DocRevision { @@ -20,12 +22,18 @@ pub trait RevisionServer: Send + Sync { pub struct RevisionManager { doc_id: String, rev_id_counter: RevIdCounter, - rev_store: mpsc::Sender, + rev_store: Arc, } impl RevisionManager { - pub fn new(doc_id: &str, rev_id: RevId, rev_store: mpsc::Sender) -> Self { - let rev_id_counter = RevIdCounter::new(rev_id.into()); + pub fn new( + doc_id: &str, + pool: Arc, + server: Arc, + pending_rev_sender: mpsc::Sender, + ) -> Self { + let rev_store = Arc::new(RevisionStoreActor::new(doc_id, pool, server, pending_rev_sender)); + let rev_id_counter = RevIdCounter::new(0); Self { doc_id: doc_id.to_string(), rev_id_counter, @@ -33,21 +41,21 @@ impl RevisionManager { } } + pub async fn load_document(&mut self) -> DocResult { + let doc = self.rev_store.fetch_document().await?; + self.set_rev_id(doc.rev_id); + Ok(doc.delta()?) + } + pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> { - let (ret, rx) = oneshot::channel(); - let cmd = RevisionCmd::Revision { - revision: revision.clone(), - ret, - }; - let _ = self.rev_store.send(cmd).await; - let result = rx.await.map_err(internal_error)?; - result + let _ = self.rev_store.handle_new_revision(revision.clone()).await?; + Ok(()) } pub fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> { - let sender = self.rev_store.clone(); + let rev_store = self.rev_store.clone(); tokio::spawn(async move { - let _ = sender.send(RevisionCmd::AckRevision { rev_id }).await; + rev_store.handle_revision_acked(rev_id).await; }); Ok(()) } @@ -64,14 +72,7 @@ impl RevisionManager { pub async fn construct_revisions(&self, range: RevisionRange) -> Result { debug_assert!(&range.doc_id == &self.doc_id); - let (ret, rx) = oneshot::channel(); - let sender = self.rev_store.clone(); - let cmd = RevisionCmd::GetRevisions { - range: range.clone(), - ret, - }; - let _ = sender.send(cmd).await; - let revisions = rx.await.map_err(internal_error)??; + let revisions = self.rev_store.revs_in_range(range.clone()).await?; let mut new_delta = Delta::new(); for revision in revisions { match Delta::from_bytes(revision.delta_data) { diff --git a/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/rust-lib/flowy-document/src/services/doc/revision/mod.rs index b45b634038..0c04f436b2 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/mod.rs @@ -1,6 +1,6 @@ mod manager; mod model; -mod store_actor; +mod rev_store; pub use manager::*; -pub use store_actor::*; +pub use rev_store::*; diff --git a/rust-lib/flowy-document/src/services/doc/revision/model.rs b/rust-lib/flowy-document/src/services/doc/revision/model.rs index d420c8b9cd..cfe2dbff8d 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/model.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/model.rs @@ -2,43 +2,28 @@ use crate::{entities::doc::Revision, errors::DocResult, services::ws::DocumentWe use tokio::sync::oneshot; -pub type Sender = oneshot::Sender>; -pub type Receiver = oneshot::Receiver>; +pub type PendingRevSender = oneshot::Sender>; +pub type PendingRevReceiver = oneshot::Receiver>; -pub struct RevisionOperation { - inner: Revision, - ret: Option, - receiver: Option, +pub struct RevisionContext { + pub revision: Revision, pub state: RevState, } -impl RevisionOperation { - pub fn new(revision: &Revision) -> Self { - let (ret, receiver) = oneshot::channel::>(); - +impl RevisionContext { + pub fn new(revision: Revision) -> Self { Self { - inner: revision.clone(), - ret: Some(ret), - receiver: Some(receiver), + revision, state: RevState::Local, } } - - pub fn receiver(&mut self) -> Receiver { self.receiver.take().expect("Receiver should not be called twice") } - - pub fn finish(&mut self) { - self.state = RevState::Acked; - match self.ret.take() { - None => {}, - Some(ret) => { - let _ = ret.send(Ok(())); - }, - } - } } -impl std::ops::Deref for RevisionOperation { - type Target = Revision; - - fn deref(&self) -> &Self::Target { &self.inner } +pub(crate) struct PendingRevId { + pub rev_id: i64, + pub sender: PendingRevSender, +} + +impl PendingRevId { + pub(crate) fn new(rev_id: i64, sender: PendingRevSender) -> Self { Self { rev_id, sender } } } diff --git a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs b/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs similarity index 52% rename from rust-lib/flowy-document/src/services/doc/revision/store_actor.rs rename to rust-lib/flowy-document/src/services/doc/revision/rev_store.rs index a8b9003bc2..315e2ed0cb 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs @@ -1,43 +1,34 @@ use crate::{ entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange}, errors::{internal_error, DocError, DocResult}, - services::doc::revision::{model::RevisionOperation, DocRevision, RevisionServer}, + services::doc::revision::{ + model::{PendingRevId, PendingRevReceiver, RevisionContext}, + RevisionServer, + }, sql_tables::{RevState, RevTableSql}, }; use async_stream::stream; -use dashmap::DashMap; +use dashmap::{mapref::one::Ref, DashMap, DashSet}; use flowy_database::ConnectionPool; -use flowy_ot::core::{Attributes, Delta, OperationTransformable}; +use flowy_ot::core::{Delta, OperationTransformable}; use futures::{stream::StreamExt, TryFutureExt}; -use std::{sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, + time::Duration, +}; use tokio::{ - sync::{mpsc, oneshot, RwLock}, + sync::{mpsc, oneshot, RwLock, RwLockWriteGuard}, task::{spawn_blocking, JoinHandle}, }; -pub enum RevisionCmd { - Revision { - revision: Revision, - ret: oneshot::Sender>, - }, - AckRevision { - rev_id: RevId, - }, - GetRevisions { - range: RevisionRange, - ret: oneshot::Sender>>, - }, - DocumentDelta { - ret: oneshot::Sender>, - }, -} - pub struct RevisionStoreActor { doc_id: String, persistence: Arc, - revs: Arc>, + revs_map: Arc>, + pending_revs_sender: RevSender, + pending_revs: Arc>>, delay_save: RwLock>>, - receiver: Option>, server: Arc, } @@ -45,74 +36,70 @@ impl RevisionStoreActor { pub fn new( doc_id: &str, pool: Arc, - receiver: mpsc::Receiver, server: Arc, + pending_rev_sender: mpsc::Sender, ) -> RevisionStoreActor { - let persistence = Arc::new(Persistence::new(pool)); - let revs = Arc::new(DashMap::new()); let doc_id = doc_id.to_owned(); + let persistence = Arc::new(Persistence::new(pool)); + let revs_map = Arc::new(DashMap::new()); + let (pending_revs_sender, receiver) = mpsc::unbounded_channel(); + let pending_revs = Arc::new(RwLock::new(VecDeque::new())); + let pending = PendingRevision::new( + &doc_id, + receiver, + persistence.clone(), + revs_map.clone(), + pending_rev_sender, + pending_revs.clone(), + ); + tokio::spawn(pending.run()); Self { doc_id, persistence, - revs, + revs_map, + pending_revs_sender, + pending_revs, delay_save: RwLock::new(None), - receiver: Some(receiver), server, } } - pub async fn run(mut self) { - let mut receiver = self.receiver.take().expect("Should only call once"); - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream.for_each(|msg| self.handle_message(msg)).await; - } - - async fn handle_message(&self, cmd: RevisionCmd) { - match cmd { - RevisionCmd::Revision { revision, ret } => { - let result = self.handle_new_revision(revision).await; - let _ = ret.send(result); - }, - RevisionCmd::AckRevision { rev_id } => { - self.handle_revision_acked(rev_id).await; - }, - RevisionCmd::GetRevisions { range, ret } => { - let result = self.revs_in_range(range).await; - let _ = ret.send(result); - }, - RevisionCmd::DocumentDelta { ret } => { - let delta = self.fetch_document().await; - let _ = ret.send(delta); - }, - } - } - #[tracing::instrument(level = "debug", skip(self, revision))] - async fn handle_new_revision(&self, revision: Revision) -> DocResult<()> { - if self.revs.contains_key(&revision.rev_id) { + pub async fn handle_new_revision(&self, revision: Revision) -> DocResult<()> { + if self.revs_map.contains_key(&revision.rev_id) { return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); } - let mut operation = RevisionOperation::new(&revision); - let _receiver = operation.receiver(); - self.revs.insert(revision.rev_id, operation); + self.pending_revs_sender.send(PendingRevisionMsg::Revision { + revision: revision.clone(), + }); + self.revs_map.insert(revision.rev_id, RevisionContext::new(revision)); self.save_revisions().await; Ok(()) } #[tracing::instrument(level = "debug", skip(self, rev_id))] - async fn handle_revision_acked(&self, rev_id: RevId) { - match self.revs.get_mut(rev_id.as_ref()) { + pub async fn handle_revision_acked(&self, rev_id: RevId) { + let rev_id = rev_id.value; + log::debug!("Receive revision acked: {}", rev_id); + match self.pending_revs.write().await.pop_front() { None => {}, - Some(mut rev) => rev.value_mut().finish(), + Some(pending) => { + debug_assert!(pending.rev_id == rev_id); + if pending.rev_id != rev_id { + log::error!( + "Acked: expected rev_id: {:?}, but receive: {:?}", + pending.rev_id, + rev_id + ); + } + pending.sender.send(Ok(())); + }, + } + match self.revs_map.get_mut(&rev_id) { + None => {}, + Some(mut rev) => rev.value_mut().state = RevState::Acked, } self.save_revisions().await; } @@ -122,42 +109,41 @@ impl RevisionStoreActor { handler.abort(); } - if self.revs.is_empty() { + if self.revs_map.is_empty() { return; } - let revs = self.revs.clone(); + let revs_map = self.revs_map.clone(); let persistence = self.persistence.clone(); *self.delay_save.write().await = Some(tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(300)).await; - - let ids = revs.iter().map(|kv| kv.key().clone()).collect::>(); - let revisions = revs + let ids = revs_map.iter().map(|kv| kv.key().clone()).collect::>(); + let revisions_state = revs_map .iter() - .map(|kv| ((*kv.value()).clone(), kv.state)) + .map(|kv| (kv.revision.clone(), kv.state)) .collect::>(); // TODO: Ok to unwrap? let conn = &*persistence.pool.get().map_err(internal_error).unwrap(); let result = conn.immediate_transaction::<_, DocError, _>(|| { - let _ = persistence.rev_sql.create_rev_table(revisions, conn).unwrap(); + let _ = persistence.rev_sql.create_rev_table(revisions_state, conn).unwrap(); Ok(()) }); match result { - Ok(_) => revs.retain(|k, _| !ids.contains(k)), + Ok(_) => revs_map.retain(|k, _| !ids.contains(k)), Err(e) => log::error!("Save revision failed: {:?}", e), } })); } - async fn revs_in_range(&self, range: RevisionRange) -> DocResult> { + pub async fn revs_in_range(&self, range: RevisionRange) -> DocResult> { let revs = range .iter() - .flat_map(|rev_id| match self.revs.get(&rev_id) { + .flat_map(|rev_id| match self.revs_map.get(&rev_id) { None => None, - Some(rev) => Some((&*(*rev)).clone()), + Some(rev) => Some(rev.revision.clone()), }) .collect::>(); @@ -167,7 +153,7 @@ impl RevisionStoreActor { let doc_id = self.doc_id.clone(); let persistence = self.persistence.clone(); let result = spawn_blocking(move || { - let conn = &*persistence.pool.get().map_err(internal_error)?; + let conn = &*persistence.pool.get().map_err(internal_error).unwrap(); let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?; Ok(revisions) }) @@ -178,7 +164,7 @@ impl RevisionStoreActor { } } - async fn fetch_document(&self) -> DocResult { + pub async fn fetch_document(&self) -> DocResult { let result = fetch_from_local(&self.doc_id, self.persistence.clone()).await; if result.is_ok() { return result; @@ -186,7 +172,16 @@ impl RevisionStoreActor { let doc = self.server.fetch_document_from_remote(&self.doc_id).await?; let revision = revision_from_doc(doc.clone(), RevType::Remote); - let _ = self.handle_new_revision(revision).await?; + let conn = &*self.persistence.pool.get().map_err(internal_error).unwrap(); + let _ = conn.immediate_transaction::<_, DocError, _>(|| { + let _ = self + .persistence + .rev_sql + .create_rev_table(vec![(revision, RevState::Acked)], conn) + .unwrap(); + Ok(()) + })?; + Ok(doc) } } @@ -237,6 +232,106 @@ impl Persistence { } } +enum PendingRevisionMsg { + Revision { revision: Revision }, +} + +type RevSender = mpsc::UnboundedSender; +type RevReceiver = mpsc::UnboundedReceiver; + +struct PendingRevision { + doc_id: String, + pending_revs: Arc>>, + persistence: Arc, + revs_map: Arc>, + msg_receiver: Option, + next_rev: mpsc::Sender, +} + +impl PendingRevision { + pub fn new( + doc_id: &str, + msg_receiver: RevReceiver, + persistence: Arc, + revs_map: Arc>, + next_rev: mpsc::Sender, + pending_revs: Arc>>, + ) -> Self { + Self { + doc_id: doc_id.to_owned(), + pending_revs, + msg_receiver: Some(msg_receiver), + persistence, + revs_map, + next_rev, + } + } + + pub async fn run(mut self) { + let mut receiver = self.msg_receiver.take().expect("Should only call once"); + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream + .for_each(|msg| async { + match self.handle_msg(msg).await { + Ok(_) => {}, + Err(e) => log::error!("{:?}", e), + } + }) + .await; + } + + async fn handle_msg(&self, msg: PendingRevisionMsg) -> DocResult<()> { + match msg { + PendingRevisionMsg::Revision { revision } => self.handle_revision(revision).await, + } + } + + async fn handle_revision(&self, revision: Revision) -> DocResult<()> { + let (sender, receiver) = oneshot::channel(); + let pending_rev = PendingRevId { + rev_id: revision.rev_id, + sender, + }; + self.pending_revs.write().await.push_back(pending_rev); + let _ = self.prepare_next_pending_rev(receiver).await?; + Ok(()) + } + + async fn prepare_next_pending_rev(&self, done: PendingRevReceiver) -> DocResult<()> { + let next_rev_notify = self.next_rev.clone(); + let doc_id = self.doc_id.clone(); + let _ = match self.pending_revs.read().await.front() { + None => Ok(()), + Some(pending) => match self.revs_map.get(&pending.rev_id) { + None => { + let conn = self.persistence.pool.get().map_err(internal_error)?; + let some = self + .persistence + .rev_sql + .read_rev_table(&doc_id, &pending.rev_id, &*conn)?; + match some { + Some(revision) => next_rev_notify.send(revision).await.map_err(internal_error), + None => Ok(()), + } + }, + Some(context) => next_rev_notify + .send(context.revision.clone()) + .await + .map_err(internal_error), + }, + }?; + let _ = tokio::time::timeout(Duration::from_millis(2000), done).await; + Ok(()) + } +} + // fn update_revisions(&self) { // let rev_ids = self // .revs diff --git a/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index 128c44b7b1..a96cdf04e8 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -51,21 +51,16 @@ impl RevTableSql { pub(crate) fn read_rev_tables( &self, - doc_id_s: &str, - rev_id_s: Option, + did: &str, + rid: Option, conn: &SqliteConnection, ) -> Result, DocError> { - let mut filter = dsl::rev_table - .filter(doc_id.eq(doc_id_s)) - .order(rev_id.asc()) - .into_boxed(); - - if let Some(rev_id_s) = rev_id_s { - filter = filter.filter(rev_id.eq(rev_id_s)) + let mut filter = dsl::rev_table.filter(doc_id.eq(did)).order(rev_id.asc()).into_boxed(); + if let Some(rid) = rid { + filter = filter.filter(rev_id.eq(rid)) } let rev_tables = filter.load::(conn)?; - let revisions = rev_tables .into_iter() .map(|table| table.into()) @@ -73,6 +68,22 @@ impl RevTableSql { Ok(revisions) } + pub(crate) fn read_rev_table( + &self, + did: &str, + rid: &i64, + conn: &SqliteConnection, + ) -> Result, DocError> { + let filter = dsl::rev_table.filter(doc_id.eq(did)).filter(rev_id.eq(rid)); + let result = filter.first::(conn); + + if Err(diesel::NotFound) == result { + Ok(None) + } else { + Ok(Some(result?.into())) + } + } + pub(crate) fn read_rev_tables_with_range( &self, doc_id_s: &str,