From d97abcc99f418b51d56ff45dc4519cda0f444888 Mon Sep 17 00:00:00 2001 From: appflowy Date: Mon, 24 Jan 2022 21:17:31 +0800 Subject: [PATCH] rm unuse code & rename folder -> client_folder --- backend/src/application.rs | 2 +- frontend/Makefile.toml | 2 +- .../rust-lib/flowy-core/src/controller.rs | 2 +- .../flowy-core/src/services/folder_editor.rs | 2 +- .../src/services/persistence/migration.rs | 2 +- .../src/services/persistence/mod.rs | 2 +- .../flowy-core/src/services/web_socket.rs | 2 +- .../rust-lib/flowy-sync/src/ws_manager.rs | 4 +- shared-lib/flowy-collaboration/Cargo.toml | 2 - .../src/{folder => client_folder}/builder.rs | 2 +- .../{folder => client_folder}/folder_pad.rs | 4 +- .../src/{folder => client_folder}/mod.rs | 0 .../src/entities/revision.rs | 6 + shared-lib/flowy-collaboration/src/lib.rs | 2 +- .../src/sync/synchronizer.rs | 243 ------------------ 15 files changed, 20 insertions(+), 257 deletions(-) rename shared-lib/flowy-collaboration/src/{folder => client_folder}/builder.rs (97%) rename shared-lib/flowy-collaboration/src/{folder => client_folder}/folder_pad.rs (99%) rename shared-lib/flowy-collaboration/src/{folder => client_folder}/mod.rs (100%) delete mode 100644 shared-lib/flowy-collaboration/src/sync/synchronizer.rs diff --git a/backend/src/application.rs b/backend/src/application.rs index 455a893268..5092f22751 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -135,7 +135,7 @@ fn user_scope() -> Scope { } pub async fn init_app_context(configuration: &Settings) -> AppContext { - let level = std::env::var("RUST_LOG").unwrap_or("info".to_owned()); + let level = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()); let _ = crate::services::log::Builder::new("flowy-server") .env_filter(&level) .build(); diff --git a/frontend/Makefile.toml b/frontend/Makefile.toml index 9f3d9ea57f..33b1833615 100644 --- a/frontend/Makefile.toml +++ b/frontend/Makefile.toml @@ -15,7 +15,7 @@ CARGO_MAKE_EXTEND_WORKSPACE_MAKEFILE = true CARGO_MAKE_CRATE_FS_NAME = "dart_ffi" CARGO_MAKE_CRATE_NAME = "dart-ffi" VERSION = "0.0.2" -FEATURES = "flutter" +FEATURES = "flutter,http_server" PRODUCT_NAME = "AppFlowy" #CRATE_TYPE: https://doc.rust-lang.org/reference/linkage.html CRATE_TYPE = "staticlib" diff --git a/frontend/rust-lib/flowy-core/src/controller.rs b/frontend/rust-lib/flowy-core/src/controller.rs index a450dd8842..5d5aba5cfa 100755 --- a/frontend/rust-lib/flowy-core/src/controller.rs +++ b/frontend/rust-lib/flowy-core/src/controller.rs @@ -5,7 +5,7 @@ use flowy_core_data_model::user_default; use flowy_sync::RevisionWebSocket; use lazy_static::lazy_static; -use flowy_collaboration::{entities::ws_data::ServerRevisionWSData, folder::FolderPad}; +use flowy_collaboration::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData}; use flowy_document::FlowyDocumentManager; use std::{collections::HashMap, convert::TryInto, fmt::Formatter, sync::Arc}; diff --git a/frontend/rust-lib/flowy-core/src/services/folder_editor.rs b/frontend/rust-lib/flowy-core/src/services/folder_editor.rs index 6136e8257a..4b4d981507 100755 --- a/frontend/rust-lib/flowy-core/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-core/src/services/folder_editor.rs @@ -1,7 +1,7 @@ use crate::services::web_socket::make_folder_ws_manager; use flowy_collaboration::{ + client_folder::{FolderChange, FolderPad}, entities::{revision::Revision, ws_data::ServerRevisionWSData}, - folder::{FolderChange, FolderPad}, }; use crate::controller::FolderId; diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs index 27c6229d36..c0e5744961 100755 --- a/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs @@ -2,7 +2,7 @@ use crate::{ module::WorkspaceDatabase, services::persistence::{AppTableSql, TrashTableSql, ViewTableSql, WorkspaceTableSql}, }; -use flowy_collaboration::{entities::revision::md5, folder::FolderPad}; +use flowy_collaboration::{client_folder::FolderPad, entities::revision::md5}; use flowy_core_data_model::entities::{ app::{App, RepeatedApp}, view::{RepeatedView, View}, diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs index bfeb87fb07..448a9ef02a 100755 --- a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs @@ -3,8 +3,8 @@ pub mod version_1; mod version_2; use flowy_collaboration::{ + client_folder::FolderPad, entities::revision::{Revision, RevisionState}, - folder::FolderPad, }; use std::sync::Arc; use tokio::sync::RwLock; diff --git a/frontend/rust-lib/flowy-core/src/services/web_socket.rs b/frontend/rust-lib/flowy-core/src/services/web_socket.rs index 0073296377..c6e752769c 100755 --- a/frontend/rust-lib/flowy-core/src/services/web_socket.rs +++ b/frontend/rust-lib/flowy-core/src/services/web_socket.rs @@ -1,11 +1,11 @@ use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS; use bytes::Bytes; use flowy_collaboration::{ + client_folder::FolderPad, entities::{ revision::RevisionRange, ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSDataType}, }, - folder::FolderPad, }; use flowy_error::FlowyError; use flowy_sync::*; diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index aa108a366a..850bd2fb33 100755 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -200,17 +200,19 @@ impl RevisionWSStream { async fn handle_message(&self, msg: ServerRevisionWSData) -> FlowyResult<()> { let ServerRevisionWSData { object_id, ty, data } = msg; let bytes = Bytes::from(data); - tracing::trace!("[{}]: new message: {}:{:?}", self, object_id, ty); match ty { ServerRevisionWSDataType::ServerPushRev => { + tracing::trace!("[{}]: new push revision: {}:{:?}", self, object_id, ty); let _ = self.consumer.receive_push_revision(bytes).await?; } ServerRevisionWSDataType::ServerPullRev => { let range = RevisionRange::try_from(bytes)?; + tracing::trace!("[{}]: new pull: {}:{}-{:?}", self, object_id, range, ty); let _ = self.consumer.pull_revisions_in_range(range).await?; } ServerRevisionWSDataType::ServerAck => { let rev_id = RevId::try_from(bytes).unwrap().value; + tracing::trace!("[{}]: new ack: {}:{}-{:?}", self, object_id, rev_id, ty); let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await; } ServerRevisionWSDataType::UserConnect => { diff --git a/shared-lib/flowy-collaboration/Cargo.toml b/shared-lib/flowy-collaboration/Cargo.toml index 930d5e34e8..2802406986 100644 --- a/shared-lib/flowy-collaboration/Cargo.toml +++ b/shared-lib/flowy-collaboration/Cargo.toml @@ -27,5 +27,3 @@ parking_lot = "0.11" dashmap = "4.0" futures = "0.3.15" async-stream = "0.3.2" - -[dev-dependencies] diff --git a/shared-lib/flowy-collaboration/src/folder/builder.rs b/shared-lib/flowy-collaboration/src/client_folder/builder.rs similarity index 97% rename from shared-lib/flowy-collaboration/src/folder/builder.rs rename to shared-lib/flowy-collaboration/src/client_folder/builder.rs index 3da8a65215..3e33189b8f 100644 --- a/shared-lib/flowy-collaboration/src/folder/builder.rs +++ b/shared-lib/flowy-collaboration/src/client_folder/builder.rs @@ -1,7 +1,7 @@ use crate::{ + client_folder::{default_folder_delta, FolderPad}, entities::revision::Revision, errors::{CollaborateError, CollaborateResult}, - folder::{default_folder_delta, FolderPad}, }; use flowy_core_data_model::entities::{trash::Trash, workspace::Workspace}; use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder}; diff --git a/shared-lib/flowy-collaboration/src/folder/folder_pad.rs b/shared-lib/flowy-collaboration/src/client_folder/folder_pad.rs similarity index 99% rename from shared-lib/flowy-collaboration/src/folder/folder_pad.rs rename to shared-lib/flowy-collaboration/src/client_folder/folder_pad.rs index 5800922c50..f1d914bc85 100644 --- a/shared-lib/flowy-collaboration/src/folder/folder_pad.rs +++ b/shared-lib/flowy-collaboration/src/client_folder/folder_pad.rs @@ -1,10 +1,10 @@ use crate::{ + client_folder::builder::FolderPadBuilder, entities::{ folder_info::FolderDelta, revision::{md5, Revision}, }, errors::{CollaborateError, CollaborateResult}, - folder::builder::FolderPadBuilder, }; use dissimilar::*; use flowy_core_data_model::entities::{app::App, trash::Trash, view::View, workspace::Workspace}; @@ -401,7 +401,7 @@ fn cal_diff(old: String, new: String) -> Delta { #[cfg(test)] mod tests { #![allow(clippy::all)] - use crate::{entities::folder_info::FolderDelta, folder::folder_pad::FolderPad}; + use crate::{client_folder::folder_pad::FolderPad, entities::folder_info::FolderDelta}; use chrono::Utc; use flowy_core_data_model::entities::{app::App, trash::Trash, view::View, workspace::Workspace}; use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder}; diff --git a/shared-lib/flowy-collaboration/src/folder/mod.rs b/shared-lib/flowy-collaboration/src/client_folder/mod.rs similarity index 100% rename from shared-lib/flowy-collaboration/src/folder/mod.rs rename to shared-lib/flowy-collaboration/src/client_folder/mod.rs diff --git a/shared-lib/flowy-collaboration/src/entities/revision.rs b/shared-lib/flowy-collaboration/src/entities/revision.rs index d36ab1b986..21ec8630f8 100644 --- a/shared-lib/flowy-collaboration/src/entities/revision.rs +++ b/shared-lib/flowy-collaboration/src/entities/revision.rs @@ -182,6 +182,12 @@ pub struct RevisionRange { pub end: i64, } +impl std::fmt::Display for RevisionRange { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("[{},{}]", self.start, self.end)) + } +} + impl RevisionRange { pub fn len(&self) -> i64 { debug_assert!(self.end >= self.start); diff --git a/shared-lib/flowy-collaboration/src/lib.rs b/shared-lib/flowy-collaboration/src/lib.rs index d787a23042..801c954605 100644 --- a/shared-lib/flowy-collaboration/src/lib.rs +++ b/shared-lib/flowy-collaboration/src/lib.rs @@ -1,7 +1,7 @@ pub mod client_document; +pub mod client_folder; pub mod entities; pub mod errors; -pub mod folder; pub mod protobuf; pub mod server_document; pub mod server_folder; diff --git a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs deleted file mode 100644 index 5e43281ff1..0000000000 --- a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs +++ /dev/null @@ -1,243 +0,0 @@ -use crate::{ - document::Document, - entities::{ - revision::RevisionRange, - ws::{DocumentServerWSData, DocumentServerWSDataBuilder}, - }, - errors::CollaborateError, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, - sync::DocumentPersistence, - util::*, -}; -use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; -use parking_lot::RwLock; -use std::{ - cmp::Ordering, - fmt::Debug, - sync::{ - atomic::{AtomicI64, Ordering::SeqCst}, - Arc, - }, - time::Duration, -}; - -pub trait RevisionUser: Send + Sync + Debug { - fn user_id(&self) -> String; - fn receive(&self, resp: SyncResponse); -} - -pub enum SyncResponse { - Pull(DocumentServerWSData), - Push(DocumentServerWSData), - Ack(DocumentServerWSData), - NewRevision(RepeatedRevisionPB), -} - -pub struct RevisionSynchronizer { - pub doc_id: String, - pub rev_id: AtomicI64, - document: Arc>, -} - -impl RevisionSynchronizer { - pub fn new(doc_id: &str, rev_id: i64, document: Document) -> RevisionSynchronizer { - let document = Arc::new(RwLock::new(document)); - RevisionSynchronizer { - doc_id: doc_id.to_string(), - rev_id: AtomicI64::new(rev_id), - document, - } - } - - #[tracing::instrument(level = "debug", skip(self, user, repeated_revision, persistence), err)] - pub async fn sync_revisions( - &self, - user: Arc, - repeated_revision: RepeatedRevisionPB, - persistence: Arc, - ) -> Result<(), CollaborateError> { - let doc_id = self.doc_id.clone(); - if repeated_revision.get_items().is_empty() { - // Return all the revisions to client - let revisions = persistence.get_doc_revisions(&doc_id).await?; - let repeated_revision = repeated_revision_from_revision_pbs(revisions)?; - let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, repeated_revision); - user.receive(SyncResponse::Push(data)); - return Ok(()); - } - - let server_base_rev_id = self.rev_id.load(SeqCst); - let first_revision = repeated_revision.get_items().first().unwrap().clone(); - if self.is_applied_before(&first_revision, &persistence).await { - // Server has received this revision before, so ignore the following revisions - return Ok(()); - } - - match server_base_rev_id.cmp(&first_revision.rev_id) { - Ordering::Less => { - let server_rev_id = next(server_base_rev_id); - if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id { - // The rev is in the right order, just compose it. - for revision in repeated_revision.get_items() { - let _ = self.compose_revision(revision)?; - } - user.receive(SyncResponse::NewRevision(repeated_revision)); - } else { - // The server document is outdated, pull the missing revision from the client. - let range = RevisionRange { - doc_id: self.doc_id.clone(), - start: server_rev_id, - end: first_revision.rev_id, - }; - let msg = DocumentServerWSDataBuilder::build_pull_message(&self.doc_id, range); - user.receive(SyncResponse::Pull(msg)); - } - } - Ordering::Equal => { - // Do nothing - log::warn!("Applied revision rev_id is the same as cur_rev_id"); - } - Ordering::Greater => { - // The client document is outdated. Transform the client revision delta and then - // send the prime delta to the client. Client should compose the this prime - // delta. - let from_rev_id = first_revision.rev_id; - let to_rev_id = server_base_rev_id; - let _ = self - .push_revisions_to_user(user, persistence, from_rev_id, to_rev_id) - .await; - } - } - Ok(()) - } - - #[tracing::instrument(level = "trace", skip(self, user, persistence), fields(server_rev_id), err)] - pub async fn pong( - &self, - user: Arc, - persistence: Arc, - client_rev_id: i64, - ) -> Result<(), CollaborateError> { - let doc_id = self.doc_id.clone(); - let server_rev_id = self.rev_id(); - tracing::Span::current().record("server_rev_id", &server_rev_id); - match server_rev_id.cmp(&client_rev_id) { - Ordering::Less => { - tracing::error!("Client should not send ping and the server should pull the revisions from the client") - } - Ordering::Equal => tracing::trace!("{} is up to date.", doc_id), - Ordering::Greater => { - // The client document is outdated. Transform the client revision delta and then - // send the prime delta to the client. Client should compose the this prime - // delta. - let from_rev_id = client_rev_id; - let to_rev_id = server_rev_id; - tracing::trace!("Push revisions to user"); - let _ = self - .push_revisions_to_user(user, persistence, from_rev_id, to_rev_id) - .await; - } - } - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)] - pub async fn reset( - &self, - persistence: Arc, - repeated_revision: RepeatedRevisionPB, - ) -> Result<(), CollaborateError> { - let doc_id = self.doc_id.clone(); - tracing::Span::current().record("doc_id", &doc_id.as_str()); - let revisions: Vec = repeated_revision.get_items().to_vec(); - let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions); - let delta = make_delta_from_revision_pb(revisions)?; - - let _ = persistence.reset_document(&doc_id, repeated_revision).await?; - *self.document.write() = Document::from_delta(delta); - let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); - Ok(()) - } - - pub fn doc_json(&self) -> String { - self.document.read().to_json() - } - - fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> { - let delta = RichTextDelta::from_bytes(&revision.delta_data)?; - let _ = self.compose_delta(delta)?; - let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, revision))] - fn transform_revision(&self, revision: &RevisionPB) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> { - let cli_delta = RichTextDelta::from_bytes(&revision.delta_data)?; - let result = self.document.read().delta().transform(&cli_delta)?; - Ok(result) - } - - fn compose_delta(&self, delta: RichTextDelta) -> Result<(), CollaborateError> { - if delta.is_empty() { - log::warn!("Composed delta is empty"); - } - - match self.document.try_write_for(Duration::from_millis(300)) { - None => log::error!("Failed to acquire write lock of document"), - Some(mut write_guard) => { - let _ = write_guard.compose_delta(delta); - } - } - Ok(()) - } - - pub(crate) fn rev_id(&self) -> i64 { - self.rev_id.load(SeqCst) - } - - async fn is_applied_before(&self, new_revision: &RevisionPB, persistence: &Arc) -> bool { - if let Ok(revisions) = persistence.get_revisions(&self.doc_id, vec![new_revision.rev_id]).await { - if let Some(revision) = revisions.first() { - if revision.md5 == new_revision.md5 { - return true; - } - } - }; - - false - } - - async fn push_revisions_to_user( - &self, - user: Arc, - persistence: Arc, - from: i64, - to: i64, - ) { - let rev_ids: Vec = (from..=to).collect(); - let revisions = match persistence.get_revisions(&self.doc_id, rev_ids).await { - Ok(revisions) => { - debug_assert!(!revisions.is_empty(), "revisions should not be empty if the doc exists"); - revisions - } - Err(e) => { - tracing::error!("{}", e); - vec![] - } - }; - - tracing::debug!("Push revision: {} -> {} to client", from, to); - match repeated_revision_from_revision_pbs(revisions) { - Ok(repeated_revision) => { - let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, repeated_revision); - user.receive(SyncResponse::Push(data)); - } - Err(e) => tracing::error!("{}", e), - } - } -} - -#[inline] -fn next(rev_id: i64) -> i64 { - rev_id + 1 -}