From c872226b442bbce9adf91abaa268ee562c975205 Mon Sep 17 00:00:00 2001 From: appflowy Date: Mon, 4 Oct 2021 21:53:06 +0800 Subject: [PATCH] send revisions to server if server rev is outdated --- backend/src/application.rs | 1 - backend/src/service/doc/edit/edit_doc.rs | 20 +++-- backend/tests/document/edit.rs | 87 +++++++++++++++++-- backend/tests/document/helper.rs | 2 +- .../src/services/doc/edit/edit_actor.rs | 2 +- .../src/services/doc/edit/edit_doc.rs | 10 ++- .../src/services/doc/revision/manager.rs | 35 ++++++-- .../src/services/doc/revision/store_actor.rs | 4 +- .../src/sql_tables/doc/rev_sql.rs | 3 +- 9 files changed, 134 insertions(+), 30 deletions(-) diff --git a/backend/src/application.rs b/backend/src/application.rs index 517bb51640..0c984681d4 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -134,7 +134,6 @@ pub async fn init_app_context(configuration: &Settings) -> AppContext { )); let ws_server = WsServer::new().start(); - AppContext::new(ws_server, pg_pool) } diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index 3b070e09a5..91c0f0ff16 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -65,12 +65,19 @@ impl ServerEditDoc { self.users.insert(user.id(), user.clone()); let cur_rev_id = self.rev_id.load(SeqCst); - if cur_rev_id > rev_id { - let doc_delta = self.document.read().delta().clone(); - let cli_revision = self.mk_revision(rev_id, doc_delta); - log::debug!("Server push rev"); - let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); - user.socket.do_send(ws_cli_revision).map_err(internal_error)?; + match cur_rev_id.cmp(&rev_id) { + Ordering::Less => { + user.socket + .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, rev_id)) + .map_err(internal_error)?; + }, + Ordering::Equal => {}, + Ordering::Greater => { + let doc_delta = self.document.read().delta().clone(); + let cli_revision = self.mk_revision(rev_id, doc_delta); + let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); + user.socket.do_send(ws_cli_revision).map_err(internal_error)?; + }, } Ok(()) @@ -99,7 +106,6 @@ impl ServerEditDoc { if cur_rev_id != revision.base_rev_id { // The server document is outdated, try to get the missing revision from the // client. - log::debug!("Server push rev"); user.socket .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) .map_err(internal_error)?; diff --git a/backend/tests/document/edit.rs b/backend/tests/document/edit.rs index c7182efafb..8ed18ccbd8 100644 --- a/backend/tests/document/edit.rs +++ b/backend/tests/document/edit.rs @@ -1,23 +1,49 @@ use crate::document::helper::{DocScript, DocumentTest}; use flowy_document::services::doc::{Document, FlowyDoc}; +#[rustfmt::skip] +// ┌─────────┐ ┌─────────┐ +// │ Server │ │ Client │ +// └─────────┘ └─────────┘ +// ┌────────────────┐ │ │ ┌────────────────┐ +// │ops: [] rev: 0 │◀┼───── ws ────┼─┤ops: [] rev: 0 │ +// └────────────────┘ │ │ └────────────────┘ +// ┌────────────────────┐ │ │ ┌────────────────────┐ +// │ops: ["abc"] rev: 1 │◀┼───── ws ────┼─│ops: ["abc"] rev: 1 │ +// └────────────────────┘ │ │ └────────────────────┘ +// ┌──────────────────────────┐ │ │ ┌──────────────────────┐ +// │ops: ["abc", "123"] rev: 2│◀┼───── ws ────┼─│ops: ["123"] rev: 2 │ +// └──────────────────────────┘ │ │ └──────────────────────┘ +// │ │ #[actix_rt::test] -async fn sync_doc_insert_text() { +async fn delta_sync_after_ws_connection() { let test = DocumentTest::new().await; test.run_scripts(vec![ DocScript::ConnectWs, DocScript::OpenDoc, DocScript::SendText(0, "abc"), DocScript::SendText(3, "123"), - DocScript::SendText(6, "efg"), - DocScript::AssertClient(r#"[{"insert":"abc123efg\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"abc123efg\n"}]"#), + DocScript::AssertClient(r#"[{"insert":"abc123\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#), ]) .await; } +#[rustfmt::skip] +// ┌─────────┐ ┌─────────┐ +// │ Server │ │ Client │ +// └─────────┘ └─────────┘ +// ┌──────────────────────────┐ │ │ +// │ops: ["123", "456"] rev: 2│ │ │ +// └──────────────────────────┘ │ │ +// │ │ +// ◀── http request ─┤ Open doc +// │ │ +// │ │ ┌──────────────────────────┐ +// ├──http response──┼─▶│ops: ["123", "456"] rev: 2│ +// │ │ └──────────────────────────┘ #[actix_rt::test] -async fn sync_open_empty_doc_and_sync_from_server() { +async fn delta_sync_with_http_request() { let test = DocumentTest::new().await; let mut document = Document::new::(); document.insert(0, "123").unwrap(); @@ -34,7 +60,7 @@ async fn sync_open_empty_doc_and_sync_from_server() { } #[actix_rt::test] -async fn sync_open_empty_doc_and_sync_from_server_using_ws() { +async fn delta_sync_with_server_push_delta() { let test = DocumentTest::new().await; let mut document = Document::new::(); document.insert(0, "123").unwrap(); @@ -49,8 +75,35 @@ async fn sync_open_empty_doc_and_sync_from_server_using_ws() { .await; } +#[rustfmt::skip] +// ┌─────────┐ ┌─────────┐ +// │ Server │ │ Client │ +// └─────────┘ └─────────┘ +// │ │ +// │ │ +// ◀── http request ─┤ Open doc +// │ │ +// │ │ ┌───────────────┐ +// ├──http response──┼─▶│ops: [] rev: 0 │ +// ┌───────────────────┐│ │ └───────────────┘ +// │ops: ["123"] rev: 3││ │ +// └───────────────────┘│ │ ┌────────────────────┐ +// │ │ │ops: ["abc"] rev: 1 │ +// │ │ └────────────────────┘ +// │ │ +// ◀─────────────────┤ start ws connection +// │ │ +// ◀─────────────────┤ notify with rev: 1 +// │ │ +// ┌───────────────────┐ │ │ ┌──────────────────────────┐ +// │ops: ["123"] rev: 3│ ├────Push Rev─────▶ │ops: ["abc", "123"] rev: 4│ +// └───────────────────┘ │ │ └──────────────────────────┘ +// ┌──────────────────────────┐ │ │ ┌────────────────────┐ +// │ops: ["abc", "123"] rev: 4│ ◀────Push Rev─────┤ │ops: ["abc"] rev: 4 │ +// └──────────────────────────┘ │ │ └────────────────────┘ +// │ │ #[actix_rt::test] -async fn sync_open_non_empty_doc_and_sync_with_sever() { +async fn delta_sync_while_local_rev_less_than_server_rev() { let test = DocumentTest::new().await; let mut document = Document::new::(); document.insert(0, "123").unwrap(); @@ -66,3 +119,23 @@ async fn sync_open_non_empty_doc_and_sync_with_sever() { ]) .await; } + +#[actix_rt::test] +async fn delta_sync_while_local_rev_greater_than_server_rev() { + let test = DocumentTest::new().await; + let mut document = Document::new::(); + document.insert(0, "123").unwrap(); + let json = document.to_json(); + + test.run_scripts(vec![ + DocScript::SetServerDocument(json, 1), + DocScript::OpenDoc, + DocScript::AssertClient(r#"[{"insert":"123\n"}]"#), + DocScript::SendText(3, "abc"), + DocScript::SendText(6, "efg"), + DocScript::ConnectWs, + DocScript::AssertClient(r#"[{"insert":"123abcefg\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#), + ]) + .await; +} diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index aac502ae5c..072507e6c9 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -114,7 +114,7 @@ async fn run_scripts(context: Arc>, scripts: Vec { - // sleep(Duration::from_millis(300)).await; + sleep(Duration::from_millis(300)).await; let user_session = context.read().user_session.clone(); let token = user_session.token().unwrap(); let _ = user_session.start_ws_connection(&token).await.unwrap(); diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs index 17c96c416c..944abe0e94 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs @@ -128,7 +128,7 @@ impl DocumentEditActor { async fn compose_delta(&self, delta: Delta) -> DocResult<()> { let result = self.document.write().await.compose_delta(&delta); log::debug!( - "Compose push delta: {}. result: {}", + "Client compose push delta: {}. result: {}", delta.to_json(), self.document.read().await.to_json() ); diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index f528a05569..9cc274b39c 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -47,8 +47,6 @@ impl ClientEditDoc { ) -> DocResult { let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone()); let DocRevision { rev_id, delta } = fetch_document(rev_store.clone()).await?; - log::info!("😁 Document delta: {:?}", delta); - let rev_manager = Arc::new(RevisionManager::new(doc_id, rev_id, rev_store)); let document = spawn_doc_edit_actor(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); @@ -262,7 +260,8 @@ impl ClientEditDoc { }, WsDataType::PullRev => { let range = RevisionRange::try_from(bytes)?; - let _ = self.rev_manager.send_revisions(range).await?; + let revision = self.rev_manager.send_revisions(range).await?; + self.ws.send(revision.into()); }, WsDataType::NewDocUser => {}, WsDataType::Acked => { @@ -290,7 +289,10 @@ impl WsDocumentHandler for EditDocWsHandler { fn state_changed(&self, state: &WsState) { match state { WsState::Init => {}, - WsState::Connected(_) => self.0.notify_open_doc(), + WsState::Connected(_) => { + log::debug!("ws state changed: {}", state); + self.0.notify_open_doc() + }, WsState::Disconnected(_) => {}, } } diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index b60507ac79..99e358a40d 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -4,8 +4,10 @@ use crate::{ services::{doc::revision::store_actor::RevisionCmd, util::RevIdCounter, ws::DocumentWebSocket}, }; use flowy_infra::future::ResultFuture; -use flowy_ot::core::Delta; +use flowy_ot::core::{Delta, OperationTransformable}; +use crate::entities::doc::RevType; +use flowy_ot::errors::OTError; use tokio::sync::{mpsc, oneshot}; pub struct DocRevision { @@ -63,14 +65,35 @@ impl RevisionManager { pub fn update_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); } - pub async fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> { + pub async fn send_revisions(&self, range: RevisionRange) -> Result { debug_assert!(&range.doc_id == &self.doc_id); let (ret, rx) = oneshot::channel(); let sender = self.rev_store.clone(); - let _ = sender.send(RevisionCmd::SendRevisions { range, ret }).await; - let _revisions = rx.await.map_err(internal_error)??; + let cmd = RevisionCmd::GetRevisions { + range: range.clone(), + ret, + }; + let _ = sender.send(cmd).await; + let revisions = rx.await.map_err(internal_error)??; + let mut new_delta = Delta::new(); + for revision in revisions { + match Delta::from_bytes(revision.delta_data) { + Ok(delta) => { + new_delta = new_delta.compose(&delta)?; + }, + Err(_) => {}, + } + } - unimplemented!() - // Ok(()) + let delta_data = new_delta.to_bytes(); + let revision = Revision::new( + range.from_rev_id, + range.to_rev_id, + delta_data.to_vec(), + &self.doc_id, + RevType::Remote, + ); + + Ok(revision) } } diff --git a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs index b58980d70a..94b307c4c8 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs @@ -23,7 +23,7 @@ pub enum RevisionCmd { AckRevision { rev_id: RevId, }, - SendRevisions { + GetRevisions { range: RevisionRange, ret: oneshot::Sender>>, }, @@ -84,7 +84,7 @@ impl RevisionStoreActor { RevisionCmd::AckRevision { rev_id } => { self.handle_revision_acked(rev_id).await; }, - RevisionCmd::SendRevisions { range, ret } => { + RevisionCmd::GetRevisions { range, ret } => { let result = revs_in_range(&self.doc_id, self.persistence.clone(), range).await; let _ = ret.send(result); }, diff --git a/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index d2a9ccb71c..a6b193ba94 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -81,8 +81,9 @@ impl RevTableSql { ) -> Result, DocError> { let rev_tables = dsl::rev_table .filter(rev_id.ge(range.from_rev_id)) - .filter(rev_id.lt(range.to_rev_id)) + .filter(rev_id.le(range.to_rev_id)) .filter(doc_id.eq(doc_id_s)) + .order(rev_id.asc()) .load::(conn)?; let revisions = rev_tables