diff --git a/backend/src/context.rs b/backend/src/context.rs index 3fb6a4b25d..ab7e6891ff 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -7,7 +7,7 @@ use actix_web::web::Data; use crate::services::document::{ persistence::DocumentKVPersistence, - ws_receiver::{make_document_ws_receiver, HttpServerDocumentPersistence}, + ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence}, }; use flowy_collaboration::sync::ServerDocumentManager; use lib_ws::WSModule; @@ -28,16 +28,16 @@ impl AppContext { let mut ws_receivers = WebSocketReceivers::new(); let kv_store = make_document_kv_store(pg_pool.clone()); - let persistence = Arc::new(FlowyPersistence { pg_pool, kv_store }); + let flowy_persistence = Arc::new(FlowyPersistence { pg_pool, kv_store }); - let document_persistence = Arc::new(HttpServerDocumentPersistence(persistence.clone())); + let document_persistence = Arc::new(HttpDocumentCloudPersistence(flowy_persistence.kv_store())); let document_manager = Arc::new(ServerDocumentManager::new(document_persistence)); - let document_ws_receiver = make_document_ws_receiver(persistence.clone(), document_manager.clone()); + let document_ws_receiver = make_document_ws_receiver(flowy_persistence.clone(), document_manager.clone()); ws_receivers.set(WSModule::Doc, document_ws_receiver); AppContext { ws_server, - persistence: Data::new(persistence), + persistence: Data::new(flowy_persistence), ws_receivers: Data::new(ws_receivers), document_manager: Data::new(document_manager), } diff --git a/backend/src/services/document/persistence.rs b/backend/src/services/document/persistence.rs index b7b5ec4ffa..18e3307ffb 100644 --- a/backend/src/services/document/persistence.rs +++ b/backend/src/services/document/persistence.rs @@ -15,8 +15,9 @@ use flowy_collaboration::{ Revision as RevisionPB, }, sync::ServerDocumentManager, + util::make_doc_from_revisions, }; -use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; + use protobuf::Message; use std::sync::Arc; use uuid::Uuid; @@ -27,7 +28,7 @@ pub(crate) async fn create_document( mut params: CreateDocParams, ) -> Result<(), ServerError> { let revisions = params.take_revisions().take_items(); - let _ = kv_store.batch_set_revision(revisions.into()).await?; + let _ = kv_store.set_revision(revisions.into()).await?; Ok(()) } @@ -37,8 +38,12 @@ pub async fn read_document( params: DocumentId, ) -> Result { let _ = Uuid::parse_str(¶ms.doc_id).context("Parse document id to uuid failed")?; - let revisions = kv_store.batch_get_revisions(¶ms.doc_id, None).await?; - make_doc_from_revisions(¶ms.doc_id, revisions) + let revisions = kv_store.get_revisions(¶ms.doc_id, None).await?; + match make_doc_from_revisions(¶ms.doc_id, revisions) { + Ok(Some(document_info)) => Ok(document_info), + Ok(None) => Err(ServerError::record_not_found().context(format!("{} not exist", params.doc_id))), + Err(e) => Err(ServerError::internal().context(e)), + } } #[tracing::instrument(level = "debug", skip(document_manager, params), fields(delta), err)] @@ -60,7 +65,8 @@ pub async fn reset_document( #[tracing::instrument(level = "debug", skip(kv_store), err)] pub(crate) async fn delete_document(kv_store: &Arc, doc_id: Uuid) -> Result<(), ServerError> { - let _ = kv_store.batch_delete_revisions(&doc_id.to_string(), None).await?; + // TODO: delete revisions may cause time issue. Maybe delete asynchronously? + let _ = kv_store.delete_revisions(&doc_id.to_string(), None).await?; Ok(()) } @@ -81,23 +87,14 @@ impl std::ops::DerefMut for DocumentKVPersistence { impl DocumentKVPersistence { pub(crate) fn new(kv_store: Arc) -> Self { DocumentKVPersistence { inner: kv_store } } - pub(crate) async fn batch_set_revision(&self, revisions: Vec) -> Result<(), ServerError> { + pub(crate) async fn set_revision(&self, revisions: Vec) -> Result<(), ServerError> { let items = revisions_to_key_value_items(revisions)?; self.inner .transaction(|mut t| Box::pin(async move { t.batch_set(items).await })) .await } - pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result { - let doc_id = doc_id.to_owned(); - let items = self - .inner - .transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await })) - .await?; - Ok(key_value_items_to_revisions(items)) - } - - pub(crate) async fn batch_get_revisions>>>( + pub(crate) async fn get_revisions>>>( &self, doc_id: &str, rev_ids: T, @@ -125,7 +122,7 @@ impl DocumentKVPersistence { Ok(key_value_items_to_revisions(items)) } - pub(crate) async fn batch_delete_revisions>>>( + pub(crate) async fn delete_revisions>>>( &self, doc_id: &str, rev_ids: T, @@ -182,35 +179,3 @@ fn key_value_items_to_revisions(items: Vec) -> RepeatedRevisionPB { #[inline] fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) } - -#[inline] -fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevisionPB) -> Result { - let revisions = revisions.take_items(); - if revisions.is_empty() { - return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id))); - } - - let mut document_delta = RichTextDelta::new(); - let mut base_rev_id = 0; - let mut rev_id = 0; - // TODO: replace with make_delta_from_revisions - for revision in revisions { - base_rev_id = revision.base_rev_id; - rev_id = revision.rev_id; - - if revision.delta_data.is_empty() { - tracing::warn!("revision delta_data is empty"); - } - - let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?; - document_delta = document_delta.compose(&delta).map_err(internal_error)?; - } - - let text = document_delta.to_json(); - let mut document_info = DocumentInfo::new(); - document_info.set_doc_id(doc_id.to_owned()); - document_info.set_text(text); - document_info.set_base_rev_id(base_rev_id); - document_info.set_rev_id(rev_id); - Ok(document_info) -} diff --git a/backend/src/services/document/ws_actor.rs b/backend/src/services/document/ws_actor.rs index 3739416d37..0b0f66a4f1 100644 --- a/backend/src/services/document/ws_actor.rs +++ b/backend/src/services/document/ws_actor.rs @@ -62,15 +62,15 @@ impl DocumentWebSocketActor { match msg { WSActorMessage::ClientData { client_data, - persistence, + persistence: _, ret, } => { - let _ = ret.send(self.handle_client_data(client_data, persistence).await); + let _ = ret.send(self.handle_client_data(client_data).await); }, } } - async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc) -> Result<()> { + async fn handle_client_data(&self, client_data: WSClientData) -> Result<()> { let WSClientData { user, socket, data } = client_data; let document_client_data = spawn_blocking(move || parse_from_bytes::(&data)) .await @@ -83,11 +83,7 @@ impl DocumentWebSocketActor { document_client_data.ty ); - let user = Arc::new(HttpDocumentUser { - user, - socket, - persistence, - }); + let user = Arc::new(HttpDocumentUser { user, socket }); match &document_client_data.ty { DocumentClientWSDataTypePB::ClientPushRev => { @@ -122,7 +118,6 @@ fn verify_md5(revision: &RevisionPB) -> Result<()> { pub struct HttpDocumentUser { pub user: Arc, pub(crate) socket: Socket, - pub persistence: Arc, } impl std::fmt::Debug for HttpDocumentUser { @@ -151,17 +146,6 @@ impl RevisionUser for HttpDocumentUser { let msg: WebSocketMessage = data.into(); self.socket.try_send(msg).map_err(internal_error) }, - SyncResponse::NewRevision(mut repeated_revision) => { - let kv_store = self.persistence.kv_store(); - tokio::task::spawn(async move { - let revisions = repeated_revision.take_items().into(); - match kv_store.batch_set_revision(revisions).await { - Ok(_) => {}, - Err(e) => log::error!("{}", e), - } - }); - Ok(()) - }, }; match result { diff --git a/backend/src/services/document/ws_receiver.rs b/backend/src/services/document/ws_receiver.rs index 17cea410ce..4aa9e75277 100644 --- a/backend/src/services/document/ws_receiver.rs +++ b/backend/src/services/document/ws_receiver.rs @@ -2,7 +2,7 @@ use crate::{ context::FlowyPersistence, services::{ document::{ - persistence::{create_document, read_document, revisions_to_key_value_items}, + persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence}, ws_actor::{DocumentWebSocketActor, WSActorMessage}, }, web_socket::{WSClientData, WebSocketReceiver}, @@ -18,7 +18,7 @@ use flowy_collaboration::{ RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB, }, - sync::{ServerDocumentManager, ServerDocumentPersistence}, + sync::{DocumentCloudPersistence, ServerDocumentManager}, util::repeated_revision_from_repeated_revision_pb, }; use lib_infra::future::BoxResultFuture; @@ -76,18 +76,19 @@ impl WebSocketReceiver for DocumentWebSocketReceiver { } } -pub struct HttpServerDocumentPersistence(pub Arc); -impl Debug for HttpServerDocumentPersistence { +pub struct HttpDocumentCloudPersistence(pub Arc); +impl Debug for HttpDocumentCloudPersistence { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") } } -impl ServerDocumentPersistence for HttpServerDocumentPersistence { +impl DocumentCloudPersistence for HttpDocumentCloudPersistence { + fn enable_sync(&self) -> bool { true } fn read_document(&self, doc_id: &str) -> BoxResultFuture { let params = DocumentId { doc_id: doc_id.to_string(), ..Default::default() }; - let kv_store = self.0.kv_store(); + let kv_store = self.0.clone(); Box::pin(async move { let mut pb_doc = read_document(&kv_store, params) .await @@ -104,7 +105,7 @@ impl ServerDocumentPersistence for HttpServerDocumentPersistence { doc_id: &str, repeated_revision: RepeatedRevisionPB, ) -> BoxResultFuture { - let kv_store = self.0.kv_store(); + let kv_store = self.0.clone(); let doc_id = doc_id.to_owned(); Box::pin(async move { let revisions = repeated_revision_from_repeated_revision_pb(repeated_revision.clone())?.into_inner(); @@ -125,19 +126,22 @@ impl ServerDocumentPersistence for HttpServerDocumentPersistence { doc_id: &str, rev_ids: Option>, ) -> BoxResultFuture, CollaborateError> { - let kv_store = self.0.kv_store(); + let kv_store = self.0.clone(); let doc_id = doc_id.to_owned(); let f = || async move { - match rev_ids { - None => { - let mut repeated_revision = kv_store.get_doc_revisions(&doc_id).await?; - Ok(repeated_revision.take_items().into()) - }, - Some(rev_ids) => { - let mut repeated_revision = kv_store.batch_get_revisions(&doc_id, rev_ids).await?; - Ok(repeated_revision.take_items().into()) - }, - } + let mut repeated_revision = kv_store.get_revisions(&doc_id, rev_ids).await?; + Ok(repeated_revision.take_items().into()) + }; + + Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) }) + } + + fn save_revisions(&self, mut repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + let kv_store = self.0.clone(); + let f = || async move { + let revisions = repeated_revision.take_items().into(); + let _ = kv_store.set_revision(revisions).await?; + Ok(()) }; Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) }) @@ -148,7 +152,7 @@ impl ServerDocumentPersistence for HttpServerDocumentPersistence { doc_id: &str, mut repeated_revision: RepeatedRevisionPB, ) -> BoxResultFuture<(), CollaborateError> { - let kv_store = self.0.kv_store(); + let kv_store = self.0.clone(); let doc_id = doc_id.to_owned(); let f = || async move { kv_store diff --git a/backend/tests/document_test/edit_script.rs b/backend/tests/document_test/edit_script.rs index ee1352e972..3fbebeed92 100644 --- a/backend/tests/document_test/edit_script.rs +++ b/backend/tests/document_test/edit_script.rs @@ -17,7 +17,7 @@ use backend::services::document::persistence::{read_document, reset_document}; use flowy_collaboration::entities::revision::{RepeatedRevision, Revision}; use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, DocumentId as DocumentIdPB}; use flowy_collaboration::sync::ServerDocumentManager; -use flowy_net::services::ws_conn::FlowyWebSocketConnect; +use flowy_net::ws::connection::FlowyWebSocketConnect; use lib_ot::core::Interval; pub struct DocumentTest { diff --git a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs index aaa8f52d6a..66b0988901 100644 --- a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs @@ -2,7 +2,13 @@ use crate::{ context::DocumentUser, core::{ web_socket::{make_document_ws_manager, DocumentWebSocketManager}, - *, + DocumentMD5, + DocumentRevisionManager, + DocumentWSReceiver, + DocumentWebSocket, + EditorCommand, + EditorCommandQueue, + RevisionServer, }, errors::FlowyError, }; @@ -20,7 +26,7 @@ pub struct ClientDocumentEditor { pub doc_id: String, #[allow(dead_code)] rev_manager: Arc, - ws_manager: Arc, + ws_manager: Arc, edit_queue: UnboundedSender, } @@ -153,7 +159,7 @@ impl ClientDocumentEditor { #[tracing::instrument(level = "debug", skip(self))] pub fn stop(&self) { self.ws_manager.stop(); } - pub(crate) fn ws_handler(&self) -> Arc { self.ws_manager.receiver() } + pub(crate) fn ws_handler(&self) -> Arc { self.ws_manager.clone() } } fn spawn_edit_queue( diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs deleted file mode 100644 index f74015b509..0000000000 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs +++ /dev/null @@ -1,288 +0,0 @@ -use crate::{ - core::{web_socket::ws_manager::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS}, - ws_receivers::{DocumentWSReceiver, DocumentWebSocket}, -}; -use async_stream::stream; -use bytes::Bytes; -use flowy_collaboration::entities::{ - revision::{RevId, RevisionRange}, - ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser}, -}; -use flowy_error::{internal_error, FlowyError, FlowyResult}; -use futures::stream::StreamExt; -use lib_infra::future::FutureResult; -use lib_ws::WSConnectState; -use std::{convert::TryFrom, sync::Arc}; -use tokio::{ - sync::{ - broadcast, - mpsc, - mpsc::{UnboundedReceiver, UnboundedSender}, - }, - task::spawn_blocking, - time::{interval, Duration}, -}; - -pub(crate) struct HttpWebSocketManager { - doc_id: String, - data_provider: Arc, - stream_consumer: Arc, - ws_conn: Arc, - ws_msg_tx: UnboundedSender, - ws_msg_rx: Option>, - stop_sync_tx: SinkStopTx, - state: broadcast::Sender, -} - -impl HttpWebSocketManager { - pub(crate) fn new( - doc_id: &str, - ws_conn: Arc, - data_provider: Arc, - stream_consumer: Arc, - ) -> Self { - let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); - let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); - let doc_id = doc_id.to_string(); - let (state, _) = broadcast::channel(2); - let mut manager = HttpWebSocketManager { - doc_id, - data_provider, - stream_consumer, - ws_conn, - ws_msg_tx, - ws_msg_rx: Some(ws_msg_rx), - stop_sync_tx, - state, - }; - manager.run(); - manager - } - - fn run(&mut self) { - let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once"); - let sink = DocumentWSSink::new( - &self.doc_id, - self.data_provider.clone(), - self.ws_conn.clone(), - self.stop_sync_tx.subscribe(), - ); - let stream = DocumentWSStream::new( - &self.doc_id, - self.stream_consumer.clone(), - ws_msg_rx, - self.stop_sync_tx.subscribe(), - ); - tokio::spawn(sink.run()); - tokio::spawn(stream.run()); - } - - pub fn scribe_state(&self) -> broadcast::Receiver { self.state.subscribe() } -} - -impl DocumentWebSocketManager for Arc { - fn stop(&self) { - if self.stop_sync_tx.send(()).is_ok() { - tracing::debug!("{} stop sync", self.doc_id) - } - } - - fn receiver(&self) -> Arc { self.clone() } -} - -impl DocumentWSReceiver for HttpWebSocketManager { - fn receive_ws_data(&self, doc_data: DocumentServerWSData) { - match self.ws_msg_tx.send(doc_data) { - Ok(_) => {}, - Err(e) => tracing::error!("❌ Propagate ws message failed. {}", e), - } - } - - fn connect_state_changed(&self, state: &WSConnectState) { - match self.state.send(state.clone()) { - Ok(_) => {}, - Err(e) => tracing::error!("{}", e), - } - } -} - -impl std::ops::Drop for HttpWebSocketManager { - fn drop(&mut self) { tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id) } -} - -pub trait DocumentWSSteamConsumer: Send + Sync { - fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>; - fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>; - fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>; - fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>; -} - -pub struct DocumentWSStream { - doc_id: String, - consumer: Arc, - ws_msg_rx: Option>, - stop_rx: Option, -} - -impl DocumentWSStream { - pub fn new( - doc_id: &str, - consumer: Arc, - ws_msg_rx: mpsc::UnboundedReceiver, - stop_rx: SinkStopRx, - ) -> Self { - DocumentWSStream { - doc_id: doc_id.to_owned(), - consumer, - ws_msg_rx: Some(ws_msg_rx), - stop_rx: Some(stop_rx), - } - } - - pub async fn run(mut self) { - let mut receiver = self.ws_msg_rx.take().expect("Only take once"); - let mut stop_rx = self.stop_rx.take().expect("Only take once"); - let doc_id = self.doc_id.clone(); - let stream = stream! { - loop { - tokio::select! { - result = receiver.recv() => { - match result { - Some(msg) => { - yield msg - }, - None => { - tracing::debug!("[DocumentStream:{}] loop exit", doc_id); - break; - }, - } - }, - _ = stop_rx.recv() => { - tracing::debug!("[DocumentStream:{}] loop exit", doc_id); - break - }, - }; - } - }; - - stream - .for_each(|msg| async { - match self.handle_message(msg).await { - Ok(_) => {}, - Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e), - } - }) - .await; - } - - async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> { - let DocumentServerWSData { doc_id: _, ty, data } = msg; - let bytes = spawn_blocking(move || Bytes::from(data)) - .await - .map_err(internal_error)?; - - tracing::trace!("[DocumentStream]: new message: {:?}", ty); - match ty { - DocumentServerWSDataType::ServerPushRev => { - let _ = self.consumer.receive_push_revision(bytes).await?; - }, - DocumentServerWSDataType::ServerPullRev => { - let range = RevisionRange::try_from(bytes)?; - let _ = self.consumer.pull_revisions_in_range(range).await?; - }, - DocumentServerWSDataType::ServerAck => { - let rev_id = RevId::try_from(bytes).unwrap().value; - let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await; - }, - DocumentServerWSDataType::UserConnect => { - let new_user = NewDocumentUser::try_from(bytes)?; - let _ = self.consumer.receive_new_user_connect(new_user).await; - // Notify the user that someone has connected to this document - }, - } - Ok(()) - } -} - -pub type Tick = (); -pub type SinkStopRx = broadcast::Receiver<()>; -pub type SinkStopTx = broadcast::Sender<()>; - -pub trait DocumentWSSinkDataProvider: Send + Sync { - fn next(&self) -> FutureResult, FlowyError>; -} - -pub struct DocumentWSSink { - provider: Arc, - ws_sender: Arc, - stop_rx: Option, - doc_id: String, -} - -impl DocumentWSSink { - pub fn new( - doc_id: &str, - provider: Arc, - ws_sender: Arc, - stop_rx: SinkStopRx, - ) -> Self { - Self { - provider, - ws_sender, - stop_rx: Some(stop_rx), - doc_id: doc_id.to_owned(), - } - } - - pub async fn run(mut self) { - let (tx, mut rx) = mpsc::unbounded_channel(); - let mut stop_rx = self.stop_rx.take().expect("Only take once"); - let doc_id = self.doc_id.clone(); - tokio::spawn(tick(tx)); - let stream = stream! { - loop { - tokio::select! { - result = rx.recv() => { - match result { - Some(msg) => yield msg, - None => break, - } - }, - _ = stop_rx.recv() => { - tracing::debug!("[DocumentSink:{}] loop exit", doc_id); - break - }, - }; - } - }; - stream - .for_each(|_| async { - match self.send_next_revision().await { - Ok(_) => {}, - Err(e) => log::error!("[DocumentSink]: Send failed, {:?}", e), - } - }) - .await; - } - - async fn send_next_revision(&self) -> FlowyResult<()> { - match self.provider.next().await? { - None => { - tracing::trace!("Finish synchronizing revisions"); - Ok(()) - }, - Some(data) => { - tracing::trace!("[DocumentSink]: send: {}:{}-{:?}", data.doc_id, data.id(), data.ty); - self.ws_sender.send(data) - // let _ = tokio::time::timeout(Duration::from_millis(2000), - }, - } - } -} - -async fn tick(sender: mpsc::UnboundedSender) { - let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)); - while sender.send(()).is_ok() { - interval.tick().await; - } -} diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/local_ws_impl.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/local_ws_impl.rs deleted file mode 100644 index e3be436cbd..0000000000 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/local_ws_impl.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::core::{web_socket::DocumentWebSocketManager, DocumentWSReceiver}; -use flowy_collaboration::entities::ws::DocumentServerWSData; -use lib_ws::WSConnectState; -use std::sync::Arc; - -pub(crate) struct LocalWebSocketManager {} - -impl DocumentWebSocketManager for Arc { - fn stop(&self) {} - - fn receiver(&self) -> Arc { self.clone() } -} - -impl DocumentWSReceiver for LocalWebSocketManager { - fn receive_ws_data(&self, _doc_data: DocumentServerWSData) {} - - fn connect_state_changed(&self, _state: &WSConnectState) {} -} diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs index 5fc1dc2604..19d342b2e8 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs @@ -1,7 +1,283 @@ -#![allow(clippy::module_inception)] -mod http_ws_impl; -mod local_ws_impl; mod ws_manager; +pub use ws_manager::*; -pub(crate) use http_ws_impl::*; -pub(crate) use ws_manager::*; +use crate::core::{ + web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer}, + DocumentRevisionManager, + DocumentWSReceiver, + DocumentWebSocket, + EditorCommand, + TransformDeltas, +}; +use bytes::Bytes; +use flowy_collaboration::{ + entities::{ + revision::{RepeatedRevision, Revision, RevisionRange}, + ws::{DocumentClientWSData, DocumentServerWSDataType, NewDocumentUser}, + }, + errors::CollaborateResult, +}; +use flowy_error::{internal_error, FlowyError, FlowyResult}; +use lib_infra::future::FutureResult; +use lib_ws::WSConnectState; +use std::{collections::VecDeque, convert::TryFrom, sync::Arc}; +use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock}; + +pub(crate) async fn make_document_ws_manager( + doc_id: String, + user_id: String, + edit_cmd_tx: UnboundedSender, + rev_manager: Arc, + ws_conn: Arc, +) -> Arc { + let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); + let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + doc_id: doc_id.clone(), + edit_cmd_tx, + rev_manager: rev_manager.clone(), + shared_sink: shared_sink.clone(), + }); + let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink)); + let ws_manager = Arc::new(DocumentWebSocketManager::new( + &doc_id, + ws_conn, + data_provider, + ws_stream_consumer, + )); + listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager); + ws_manager +} + +fn listen_document_ws_state( + _user_id: &str, + _doc_id: &str, + mut subscriber: broadcast::Receiver, + _rev_manager: Arc, +) { + tokio::spawn(async move { + while let Ok(state) = subscriber.recv().await { + match state { + WSConnectState::Init => {}, + WSConnectState::Connecting => {}, + WSConnectState::Connected => {}, + WSConnectState::Disconnected => {}, + } + } + }); +} + +pub(crate) struct DocumentWebSocketSteamConsumerAdapter { + pub(crate) doc_id: String, + pub(crate) edit_cmd_tx: UnboundedSender, + pub(crate) rev_manager: Arc, + pub(crate) shared_sink: Arc, +} + +impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { + let rev_manager = self.rev_manager.clone(); + let edit_cmd_tx = self.edit_cmd_tx.clone(); + let shared_sink = self.shared_sink.clone(); + let doc_id = self.doc_id.clone(); + FutureResult::new(async move { + if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? { + let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]); + shared_sink.push_back(data).await; + } + Ok(()) + }) + } + + fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError> { + let shared_sink = self.shared_sink.clone(); + FutureResult::new(async move { shared_sink.ack(id, ty).await }) + } + + fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { + // the _new_user will be used later + FutureResult::new(async move { Ok(()) }) + } + + fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { + let rev_manager = self.rev_manager.clone(); + let shared_sink = self.shared_sink.clone(); + let doc_id = self.doc_id.clone(); + FutureResult::new(async move { + let revisions = rev_manager.get_revisions_in_range(range).await?; + let data = DocumentClientWSData::from_revisions(&doc_id, revisions); + shared_sink.push_back(data).await; + Ok(()) + }) + } +} + +pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc); +impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { + fn next(&self) -> FutureResult, FlowyError> { + let shared_sink = self.0.clone(); + FutureResult::new(async move { shared_sink.next().await }) + } +} + +async fn transform_pushed_revisions( + revisions: Vec, + edit_cmd: &UnboundedSender, +) -> FlowyResult { + let (ret, rx) = oneshot::channel::>(); + let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret }); + Ok(rx.await.map_err(internal_error)??) +} + +#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] +pub(crate) async fn handle_remote_revision( + edit_cmd_tx: UnboundedSender, + rev_manager: Arc, + bytes: Bytes, +) -> FlowyResult> { + let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner(); + if revisions.is_empty() { + return Ok(None); + } + + let first_revision = revisions.first().unwrap(); + if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await { + if local_revision.md5 == first_revision.md5 { + // The local revision is equal to the pushed revision. Just ignore it. + revisions = revisions.split_off(1); + if revisions.is_empty() { + return Ok(None); + } + } else { + return Ok(None); + } + } + + let TransformDeltas { + client_prime, + server_prime, + } = transform_pushed_revisions(revisions.clone(), &edit_cmd_tx).await?; + + match server_prime { + None => { + // The server_prime is None means the client local revisions conflict with the + // server, and it needs to override the client delta. + let (ret, rx) = oneshot::channel(); + let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta { + revisions, + delta: client_prime, + ret, + }); + let _ = rx.await.map_err(internal_error)??; + Ok(None) + }, + Some(server_prime) => { + let (ret, rx) = oneshot::channel(); + let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta { + revisions, + client_delta: client_prime, + server_delta: server_prime, + ret, + }); + Ok(rx.await.map_err(internal_error)??) + }, + } +} + +#[derive(Clone)] +enum SourceType { + Shared, + Revision, +} + +#[derive(Clone)] +pub(crate) struct SharedWSSinkDataProvider { + shared: Arc>>, + rev_manager: Arc, + source_ty: Arc>, +} + +impl SharedWSSinkDataProvider { + pub(crate) fn new(rev_manager: Arc) -> Self { + SharedWSSinkDataProvider { + shared: Arc::new(RwLock::new(VecDeque::new())), + rev_manager, + source_ty: Arc::new(RwLock::new(SourceType::Shared)), + } + } + + #[allow(dead_code)] + pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); } + + async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); } + + async fn next(&self) -> FlowyResult> { + let source_ty = self.source_ty.read().await.clone(); + match source_ty { + SourceType::Shared => match self.shared.read().await.front() { + None => { + *self.source_ty.write().await = SourceType::Revision; + Ok(None) + }, + Some(data) => { + tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); + Ok(Some(data.clone())) + }, + }, + SourceType::Revision => { + if !self.shared.read().await.is_empty() { + *self.source_ty.write().await = SourceType::Shared; + return Ok(None); + } + + match self.rev_manager.next_sync_revision().await? { + Some(rev) => { + let doc_id = rev.doc_id.clone(); + Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev]))) + }, + None => { + // + let doc_id = self.rev_manager.doc_id.clone(); + let latest_rev_id = self.rev_manager.rev_id(); + Ok(Some(DocumentClientWSData::ping(&doc_id, latest_rev_id))) + }, + } + }, + } + } + + async fn ack(&self, id: String, _ty: DocumentServerWSDataType) -> FlowyResult<()> { + // let _ = self.rev_manager.ack_revision(id).await?; + let source_ty = self.source_ty.read().await.clone(); + match source_ty { + SourceType::Shared => { + let should_pop = match self.shared.read().await.front() { + None => false, + Some(val) => { + let expected_id = val.id(); + if expected_id == id { + true + } else { + tracing::error!("The front element's {} is not equal to the {}", expected_id, id); + false + } + }, + }; + if should_pop { + let _ = self.shared.write().await.pop_front(); + } + }, + SourceType::Revision => { + match id.parse::() { + Ok(rev_id) => { + let _ = self.rev_manager.ack_revision(rev_id).await?; + }, + Err(e) => { + tracing::error!("Parse rev_id from {} failed. {}", id, e); + }, + }; + }, + } + + Ok(()) + } +} diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs index 482818dd43..ddbcdd96a0 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs @@ -1,310 +1,283 @@ -use crate::core::{ - web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager}, - DocumentRevisionManager, - DocumentWSReceiver, - DocumentWebSocket, - EditorCommand, - TransformDeltas, +use crate::{ + core::SYNC_INTERVAL_IN_MILLIS, + ws_receivers::{DocumentWSReceiver, DocumentWebSocket}, }; +use async_stream::stream; use bytes::Bytes; -use flowy_collaboration::{ - entities::{ - revision::{RepeatedRevision, Revision, RevisionRange}, - ws::{DocumentClientWSData, NewDocumentUser}, - }, - errors::CollaborateResult, +use flowy_collaboration::entities::{ + revision::{RevId, RevisionRange}, + ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser}, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; +use futures::stream::StreamExt; use lib_infra::future::FutureResult; - -use flowy_collaboration::entities::ws::DocumentServerWSDataType; - use lib_ws::WSConnectState; -use std::{collections::VecDeque, convert::TryFrom, sync::Arc}; -use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock}; +use std::{convert::TryFrom, sync::Arc}; +use tokio::{ + sync::{ + broadcast, + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender}, + }, + task::spawn_blocking, + time::{interval, Duration}, +}; -pub(crate) trait DocumentWebSocketManager: Send + Sync { - fn stop(&self); - fn receiver(&self) -> Arc; -} - -pub(crate) async fn make_document_ws_manager( +pub struct DocumentWebSocketManager { doc_id: String, - user_id: String, - edit_cmd_tx: UnboundedSender, - rev_manager: Arc, + data_provider: Arc, + stream_consumer: Arc, ws_conn: Arc, -) -> Arc { - // if cfg!(feature = "http_server") { - // let shared_sink = - // Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); - // let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { - // doc_id: doc_id.clone(), - // edit_cmd_tx, - // rev_manager: rev_manager.clone(), - // shared_sink: shared_sink.clone(), - // }); - // let data_provider = - // Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink)); - // let ws_manager = Arc::new(HttpWebSocketManager::new( - // &doc_id, - // ws_conn, - // data_provider, - // ws_stream_consumer, - // )); - // listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), - // rev_manager); Arc::new(ws_manager) - // } else { - // Arc::new(Arc::new(LocalWebSocketManager {})) - // } - let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); - let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { - doc_id: doc_id.clone(), - edit_cmd_tx, - rev_manager: rev_manager.clone(), - shared_sink: shared_sink.clone(), - }); - let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink)); - let ws_manager = Arc::new(HttpWebSocketManager::new( - &doc_id, - ws_conn, - data_provider, - ws_stream_consumer, - )); - listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager); - Arc::new(ws_manager) + ws_msg_tx: UnboundedSender, + ws_msg_rx: Option>, + stop_sync_tx: SinkStopTx, + state: broadcast::Sender, } -fn listen_document_ws_state( - _user_id: &str, - _doc_id: &str, - mut subscriber: broadcast::Receiver, - _rev_manager: Arc, -) { - tokio::spawn(async move { - while let Ok(state) = subscriber.recv().await { - match state { - WSConnectState::Init => {}, - WSConnectState::Connecting => {}, - WSConnectState::Connected => {}, - WSConnectState::Disconnected => {}, - } +impl DocumentWebSocketManager { + pub(crate) fn new( + doc_id: &str, + ws_conn: Arc, + data_provider: Arc, + stream_consumer: Arc, + ) -> Self { + let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); + let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); + let doc_id = doc_id.to_string(); + let (state, _) = broadcast::channel(2); + let mut manager = DocumentWebSocketManager { + doc_id, + data_provider, + stream_consumer, + ws_conn, + ws_msg_tx, + ws_msg_rx: Some(ws_msg_rx), + stop_sync_tx, + state, + }; + manager.run(); + manager + } + + fn run(&mut self) { + let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once"); + let sink = DocumentWSSink::new( + &self.doc_id, + self.data_provider.clone(), + self.ws_conn.clone(), + self.stop_sync_tx.subscribe(), + ); + let stream = DocumentWSStream::new( + &self.doc_id, + self.stream_consumer.clone(), + ws_msg_rx, + self.stop_sync_tx.subscribe(), + ); + tokio::spawn(sink.run()); + tokio::spawn(stream.run()); + } + + pub fn scribe_state(&self) -> broadcast::Receiver { self.state.subscribe() } + + pub(crate) fn stop(&self) { + if self.stop_sync_tx.send(()).is_ok() { + tracing::debug!("{} stop sync", self.doc_id) } - }); + } } -pub(crate) struct DocumentWebSocketSteamConsumerAdapter { - pub(crate) doc_id: String, - pub(crate) edit_cmd_tx: UnboundedSender, - pub(crate) rev_manager: Arc, - pub(crate) shared_sink: Arc, +impl DocumentWSReceiver for DocumentWebSocketManager { + fn receive_ws_data(&self, doc_data: DocumentServerWSData) { + match self.ws_msg_tx.send(doc_data) { + Ok(_) => {}, + Err(e) => tracing::error!("❌ Propagate ws message failed. {}", e), + } + } + + fn connect_state_changed(&self, state: &WSConnectState) { + match self.state.send(state.clone()) { + Ok(_) => {}, + Err(e) => tracing::error!("{}", e), + } + } } -impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { - fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { - let rev_manager = self.rev_manager.clone(); - let edit_cmd_tx = self.edit_cmd_tx.clone(); - let shared_sink = self.shared_sink.clone(); +impl std::ops::Drop for DocumentWebSocketManager { + fn drop(&mut self) { tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id) } +} + +pub trait DocumentWSSteamConsumer: Send + Sync { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>; + fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>; + fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>; + fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>; +} + +pub struct DocumentWSStream { + doc_id: String, + consumer: Arc, + ws_msg_rx: Option>, + stop_rx: Option, +} + +impl DocumentWSStream { + pub fn new( + doc_id: &str, + consumer: Arc, + ws_msg_rx: mpsc::UnboundedReceiver, + stop_rx: SinkStopRx, + ) -> Self { + DocumentWSStream { + doc_id: doc_id.to_owned(), + consumer, + ws_msg_rx: Some(ws_msg_rx), + stop_rx: Some(stop_rx), + } + } + + pub async fn run(mut self) { + let mut receiver = self.ws_msg_rx.take().expect("Only take once"); + let mut stop_rx = self.stop_rx.take().expect("Only take once"); let doc_id = self.doc_id.clone(); - FutureResult::new(async move { - if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? { - let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]); - shared_sink.push_back(data).await; - } - Ok(()) - }) - } - - fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError> { - let shared_sink = self.shared_sink.clone(); - FutureResult::new(async move { shared_sink.ack(id, ty).await }) - } - - fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { - // the _new_user will be used later - FutureResult::new(async move { Ok(()) }) - } - - fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { - let rev_manager = self.rev_manager.clone(); - let shared_sink = self.shared_sink.clone(); - let doc_id = self.doc_id.clone(); - FutureResult::new(async move { - let revisions = rev_manager.get_revisions_in_range(range).await?; - let data = DocumentClientWSData::from_revisions(&doc_id, revisions); - shared_sink.push_back(data).await; - Ok(()) - }) - } -} - -pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc); -impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { - fn next(&self) -> FutureResult, FlowyError> { - let shared_sink = self.0.clone(); - FutureResult::new(async move { shared_sink.next().await }) - } -} - -async fn transform_pushed_revisions( - revisions: Vec, - edit_cmd: &UnboundedSender, -) -> FlowyResult { - let (ret, rx) = oneshot::channel::>(); - let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret }); - Ok(rx.await.map_err(internal_error)??) -} - -#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] -pub(crate) async fn handle_remote_revision( - edit_cmd_tx: UnboundedSender, - rev_manager: Arc, - bytes: Bytes, -) -> FlowyResult> { - let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner(); - if revisions.is_empty() { - return Ok(None); - } - - let first_revision = revisions.first().unwrap(); - if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await { - if local_revision.md5 == first_revision.md5 { - // The local revision is equal to the pushed revision. Just ignore it. - revisions = revisions.split_off(1); - if revisions.is_empty() { - return Ok(None); - } - } else { - return Ok(None); - } - } - - let TransformDeltas { - client_prime, - server_prime, - } = transform_pushed_revisions(revisions.clone(), &edit_cmd_tx).await?; - - match server_prime { - None => { - // The server_prime is None means the client local revisions conflict with the - // server, and it needs to override the client delta. - let (ret, rx) = oneshot::channel(); - let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta { - revisions, - delta: client_prime, - ret, - }); - let _ = rx.await.map_err(internal_error)??; - Ok(None) - }, - Some(server_prime) => { - let (ret, rx) = oneshot::channel(); - let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta { - revisions, - client_delta: client_prime, - server_delta: server_prime, - ret, - }); - Ok(rx.await.map_err(internal_error)??) - }, - } -} - -#[derive(Clone)] -enum SourceType { - Shared, - Revision, -} - -#[derive(Clone)] -pub(crate) struct SharedWSSinkDataProvider { - shared: Arc>>, - rev_manager: Arc, - source_ty: Arc>, -} - -impl SharedWSSinkDataProvider { - pub(crate) fn new(rev_manager: Arc) -> Self { - SharedWSSinkDataProvider { - shared: Arc::new(RwLock::new(VecDeque::new())), - rev_manager, - source_ty: Arc::new(RwLock::new(SourceType::Shared)), - } - } - - #[allow(dead_code)] - pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); } - - async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); } - - async fn next(&self) -> FlowyResult> { - let source_ty = self.source_ty.read().await.clone(); - match source_ty { - SourceType::Shared => match self.shared.read().await.front() { - None => { - *self.source_ty.write().await = SourceType::Revision; - Ok(None) - }, - Some(data) => { - tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); - Ok(Some(data.clone())) - }, - }, - SourceType::Revision => { - if !self.shared.read().await.is_empty() { - *self.source_ty.write().await = SourceType::Shared; - return Ok(None); - } - - match self.rev_manager.next_sync_revision().await? { - Some(rev) => { - let doc_id = rev.doc_id.clone(); - Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev]))) - }, - None => { - // - let doc_id = self.rev_manager.doc_id.clone(); - let latest_rev_id = self.rev_manager.rev_id(); - Ok(Some(DocumentClientWSData::ping(&doc_id, latest_rev_id))) - }, - } - }, - } - } - - async fn ack(&self, id: String, _ty: DocumentServerWSDataType) -> FlowyResult<()> { - // let _ = self.rev_manager.ack_revision(id).await?; - let source_ty = self.source_ty.read().await.clone(); - match source_ty { - SourceType::Shared => { - let should_pop = match self.shared.read().await.front() { - None => false, - Some(val) => { - let expected_id = val.id(); - if expected_id == id { - true - } else { - tracing::error!("The front element's {} is not equal to the {}", expected_id, id); - false + let stream = stream! { + loop { + tokio::select! { + result = receiver.recv() => { + match result { + Some(msg) => { + yield msg + }, + None => { + tracing::debug!("[DocumentStream:{}] loop exit", doc_id); + break; + }, } }, + _ = stop_rx.recv() => { + tracing::debug!("[DocumentStream:{}] loop exit", doc_id); + break + }, }; - if should_pop { - let _ = self.shared.write().await.pop_front(); + } + }; + + stream + .for_each(|msg| async { + match self.handle_message(msg).await { + Ok(_) => {}, + Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e), } + }) + .await; + } + + async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> { + let DocumentServerWSData { doc_id: _, ty, data } = msg; + let bytes = spawn_blocking(move || Bytes::from(data)) + .await + .map_err(internal_error)?; + + tracing::trace!("[DocumentStream]: new message: {:?}", ty); + match ty { + DocumentServerWSDataType::ServerPushRev => { + let _ = self.consumer.receive_push_revision(bytes).await?; }, - SourceType::Revision => { - match id.parse::() { - Ok(rev_id) => { - let _ = self.rev_manager.ack_revision(rev_id).await?; - }, - Err(e) => { - tracing::error!("Parse rev_id from {} failed. {}", id, e); - }, - }; + DocumentServerWSDataType::ServerPullRev => { + let range = RevisionRange::try_from(bytes)?; + let _ = self.consumer.pull_revisions_in_range(range).await?; + }, + DocumentServerWSDataType::ServerAck => { + let rev_id = RevId::try_from(bytes).unwrap().value; + let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await; + }, + DocumentServerWSDataType::UserConnect => { + let new_user = NewDocumentUser::try_from(bytes)?; + let _ = self.consumer.receive_new_user_connect(new_user).await; + // Notify the user that someone has connected to this document }, } - Ok(()) } } + +pub type Tick = (); +pub type SinkStopRx = broadcast::Receiver<()>; +pub type SinkStopTx = broadcast::Sender<()>; + +pub trait DocumentWSSinkDataProvider: Send + Sync { + fn next(&self) -> FutureResult, FlowyError>; +} + +pub struct DocumentWSSink { + provider: Arc, + ws_sender: Arc, + stop_rx: Option, + doc_id: String, +} + +impl DocumentWSSink { + pub fn new( + doc_id: &str, + provider: Arc, + ws_sender: Arc, + stop_rx: SinkStopRx, + ) -> Self { + Self { + provider, + ws_sender, + stop_rx: Some(stop_rx), + doc_id: doc_id.to_owned(), + } + } + + pub async fn run(mut self) { + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut stop_rx = self.stop_rx.take().expect("Only take once"); + let doc_id = self.doc_id.clone(); + tokio::spawn(tick(tx)); + let stream = stream! { + loop { + tokio::select! { + result = rx.recv() => { + match result { + Some(msg) => yield msg, + None => break, + } + }, + _ = stop_rx.recv() => { + tracing::debug!("[DocumentSink:{}] loop exit", doc_id); + break + }, + }; + } + }; + stream + .for_each(|_| async { + match self.send_next_revision().await { + Ok(_) => {}, + Err(e) => log::error!("[DocumentSink] send failed, {:?}", e), + } + }) + .await; + } + + async fn send_next_revision(&self) -> FlowyResult<()> { + match self.provider.next().await? { + None => { + tracing::trace!("Finish synchronizing revisions"); + Ok(()) + }, + Some(data) => { + tracing::trace!("[DocumentSink] send: {}:{}-{:?}", data.doc_id, data.id(), data.ty); + self.ws_sender.send(data) + }, + } + } +} + +async fn tick(sender: mpsc::UnboundedSender) { + let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)); + while sender.send(()).is_ok() { + interval.tick().await; + } +} diff --git a/frontend/rust-lib/flowy-net/Cargo.toml b/frontend/rust-lib/flowy-net/Cargo.toml index 0106c9a7e8..a5769caaf5 100644 --- a/frontend/rust-lib/flowy-net/Cargo.toml +++ b/frontend/rust-lib/flowy-net/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] lib-dispatch = { path = "../lib-dispatch" } -flowy-error = { path = "../flowy-error" } +flowy-error = { path = "../flowy-error", features = ["collaboration"] } flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"} backend-service = { path = "../../../shared-lib/backend-service" } diff --git a/frontend/rust-lib/flowy-net/src/handlers/mod.rs b/frontend/rust-lib/flowy-net/src/handlers/mod.rs index 302f9b787f..62d248748c 100644 --- a/frontend/rust-lib/flowy-net/src/handlers/mod.rs +++ b/frontend/rust-lib/flowy-net/src/handlers/mod.rs @@ -1,4 +1,4 @@ -use crate::{entities::NetworkState, services::ws_conn::FlowyWebSocketConnect}; +use crate::{entities::NetworkState, ws::connection::FlowyWebSocketConnect}; use flowy_error::FlowyError; use lib_dispatch::prelude::{Data, Unit}; use std::sync::Arc; diff --git a/frontend/rust-lib/flowy-net/src/lib.rs b/frontend/rust-lib/flowy-net/src/lib.rs index c1ac5bafb9..b3f0ce775d 100644 --- a/frontend/rust-lib/flowy-net/src/lib.rs +++ b/frontend/rust-lib/flowy-net/src/lib.rs @@ -4,4 +4,4 @@ mod event; mod handlers; pub mod module; pub mod protobuf; -pub mod services; +pub mod ws; diff --git a/frontend/rust-lib/flowy-net/src/module.rs b/frontend/rust-lib/flowy-net/src/module.rs index 4a2c62f8d2..26de3769f3 100644 --- a/frontend/rust-lib/flowy-net/src/module.rs +++ b/frontend/rust-lib/flowy-net/src/module.rs @@ -1,4 +1,4 @@ -use crate::{event::NetworkEvent, handlers::*, services::ws_conn::FlowyWebSocketConnect}; +use crate::{event::NetworkEvent, handlers::*, ws::connection::FlowyWebSocketConnect}; use lib_dispatch::prelude::*; use std::sync::Arc; diff --git a/frontend/rust-lib/flowy-net/src/services/local/mod.rs b/frontend/rust-lib/flowy-net/src/services/local/mod.rs deleted file mode 100644 index 01cc6865de..0000000000 --- a/frontend/rust-lib/flowy-net/src/services/local/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod local_server; -mod local_ws; -mod persistence; - -pub use local_ws::*; diff --git a/frontend/rust-lib/flowy-net/src/services/local/persistence.rs b/frontend/rust-lib/flowy-net/src/services/local/persistence.rs deleted file mode 100644 index 55de97fa4b..0000000000 --- a/frontend/rust-lib/flowy-net/src/services/local/persistence.rs +++ /dev/null @@ -1,74 +0,0 @@ -use dashmap::DashMap; -use flowy_collaboration::{ - entities::doc::DocumentInfo, - errors::CollaborateError, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, - sync::*, - util::repeated_revision_from_repeated_revision_pb, -}; -use lib_infra::future::BoxResultFuture; -use std::{ - fmt::{Debug, Formatter}, - sync::Arc, -}; - -pub(crate) struct LocalServerDocumentPersistence { - // For the moment, we use memory to cache the data, it will be implemented with other storage. - // Like the Firestore,Dropbox.etc. - inner: Arc>, -} - -impl Debug for LocalServerDocumentPersistence { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") } -} - -impl std::default::Default for LocalServerDocumentPersistence { - fn default() -> Self { - LocalServerDocumentPersistence { - inner: Arc::new(DashMap::new()), - } - } -} - -impl ServerDocumentPersistence for LocalServerDocumentPersistence { - fn read_document(&self, doc_id: &str) -> BoxResultFuture { - let inner = self.inner.clone(); - let doc_id = doc_id.to_owned(); - Box::pin(async move { - match inner.get(&doc_id) { - None => Err(CollaborateError::record_not_found()), - Some(val) => { - // - Ok(val.value().clone()) - }, - } - }) - } - - fn create_document( - &self, - doc_id: &str, - repeated_revision: RepeatedRevisionPB, - ) -> BoxResultFuture { - let doc_id = doc_id.to_owned(); - let inner = self.inner.clone(); - Box::pin(async move { - let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?; - let document_info = DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())?; - inner.insert(doc_id, document_info.clone()); - Ok(document_info) - }) - } - - fn read_revisions( - &self, - _doc_id: &str, - _rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError> { - Box::pin(async move { Ok(vec![]) }) - } - - fn reset_document(&self, _doc_id: &str, _revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { - unimplemented!() - } -} diff --git a/frontend/rust-lib/flowy-net/src/services/ws_conn.rs b/frontend/rust-lib/flowy-net/src/ws/connection.rs similarity index 79% rename from frontend/rust-lib/flowy-net/src/services/ws_conn.rs rename to frontend/rust-lib/flowy-net/src/ws/connection.rs index fdec3cf334..adbfd98fbb 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws_conn.rs +++ b/frontend/rust-lib/flowy-net/src/ws/connection.rs @@ -94,31 +94,27 @@ impl FlowyWebSocketConnect { #[tracing::instrument(level = "debug", skip(ws_conn))] pub fn listen_on_websocket(ws_conn: Arc) { - if cfg!(feature = "http_server") { - let ws = ws_conn.inner.clone(); - let mut notify = ws_conn.inner.subscribe_connect_state(); - let _ = tokio::spawn(async move { - loop { - match notify.recv().await { - Ok(state) => { - tracing::info!("Websocket state changed: {}", state); - match state { - WSConnectState::Init => {}, - WSConnectState::Connected => {}, - WSConnectState::Connecting => {}, - WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await, - } - }, - Err(e) => { - tracing::error!("Websocket state notify error: {:?}", e); - break; - }, - } + let ws = ws_conn.inner.clone(); + let mut notify = ws_conn.inner.subscribe_connect_state(); + let _ = tokio::spawn(async move { + loop { + match notify.recv().await { + Ok(state) => { + tracing::info!("Websocket state changed: {}", state); + match state { + WSConnectState::Init => {}, + WSConnectState::Connected => {}, + WSConnectState::Connecting => {}, + WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await, + } + }, + Err(e) => { + tracing::error!("Websocket state notify error: {:?}", e); + break; + }, } - }); - } else { - // do nothing - }; + } + }); } async fn retry_connect(ws: Arc, count: usize) { diff --git a/frontend/rust-lib/flowy-net/src/services/http/http_ws.rs b/frontend/rust-lib/flowy-net/src/ws/http/http_ws.rs similarity index 96% rename from frontend/rust-lib/flowy-net/src/services/http/http_ws.rs rename to frontend/rust-lib/flowy-net/src/ws/http/http_ws.rs index 4b81047095..1d019b4c5d 100644 --- a/frontend/rust-lib/flowy-net/src/services/http/http_ws.rs +++ b/frontend/rust-lib/flowy-net/src/ws/http/http_ws.rs @@ -1,4 +1,4 @@ -use crate::services::ws_conn::{FlowyRawWebSocket, FlowyWSSender}; +use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender}; use flowy_error::internal_error; pub use flowy_error::FlowyError; use lib_infra::future::FutureResult; diff --git a/frontend/rust-lib/flowy-net/src/services/http/mod.rs b/frontend/rust-lib/flowy-net/src/ws/http/mod.rs similarity index 100% rename from frontend/rust-lib/flowy-net/src/services/http/mod.rs rename to frontend/rust-lib/flowy-net/src/ws/http/mod.rs diff --git a/frontend/rust-lib/flowy-net/src/services/local/local_server.rs b/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs similarity index 89% rename from frontend/rust-lib/flowy-net/src/services/local/local_server.rs rename to frontend/rust-lib/flowy-net/src/ws/local/local_server.rs index 7283af9b1e..d1e38ae3aa 100644 --- a/frontend/rust-lib/flowy-net/src/services/local/local_server.rs +++ b/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs @@ -1,4 +1,4 @@ -use crate::services::local::persistence::LocalServerDocumentPersistence; +use crate::ws::local::persistence::LocalDocumentCloudPersistence; use bytes::Bytes; use flowy_collaboration::{ entities::ws::{DocumentClientWSData, DocumentClientWSDataType}, @@ -13,12 +13,12 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender}; pub struct LocalDocumentServer { pub doc_manager: Arc, sender: mpsc::UnboundedSender, - persistence: Arc, + persistence: Arc, } impl LocalDocumentServer { pub fn new(sender: mpsc::UnboundedSender) -> Self { - let persistence = Arc::new(LocalServerDocumentPersistence::default()); + let persistence = Arc::new(LocalDocumentCloudPersistence::default()); let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone())); LocalDocumentServer { doc_manager, @@ -41,7 +41,6 @@ impl LocalDocumentServer { let user = Arc::new(LocalDocumentUser { user_id, ws_sender: self.sender.clone(), - persistence: self.persistence.clone(), }); let ty = client_data.ty.clone(); let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap(); @@ -64,7 +63,6 @@ impl LocalDocumentServer { struct LocalDocumentUser { user_id: String, ws_sender: mpsc::UnboundedSender, - persistence: Arc, } impl RevisionUser for LocalDocumentUser { @@ -105,9 +103,6 @@ impl RevisionUser for LocalDocumentUser { }; send_fn(sender, msg); }, - SyncResponse::NewRevision(mut _repeated_revision) => { - // unimplemented!() - }, } }); } diff --git a/frontend/rust-lib/flowy-net/src/services/local/local_ws.rs b/frontend/rust-lib/flowy-net/src/ws/local/local_ws.rs similarity index 98% rename from frontend/rust-lib/flowy-net/src/services/local/local_ws.rs rename to frontend/rust-lib/flowy-net/src/ws/local/local_ws.rs index 959e3d69d9..9608de4b03 100644 --- a/frontend/rust-lib/flowy-net/src/services/local/local_ws.rs +++ b/frontend/rust-lib/flowy-net/src/ws/local/local_ws.rs @@ -1,6 +1,6 @@ -use crate::services::{ +use crate::ws::{ + connection::{FlowyRawWebSocket, FlowyWSSender}, local::local_server::LocalDocumentServer, - ws_conn::{FlowyRawWebSocket, FlowyWSSender}, }; use bytes::Bytes; use dashmap::DashMap; diff --git a/frontend/rust-lib/flowy-net/src/ws/local/mod.rs b/frontend/rust-lib/flowy-net/src/ws/local/mod.rs new file mode 100644 index 0000000000..29bb454a03 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/ws/local/mod.rs @@ -0,0 +1,24 @@ +mod local_server; +mod local_ws; +mod persistence; + +use flowy_collaboration::errors::CollaborateError; +pub use local_ws::*; + +use flowy_collaboration::protobuf::RepeatedRevision as RepeatedRevisionPB; +use lib_infra::future::BoxResultFuture; + +pub trait DocumentCloudStorage: Send + Sync { + fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + fn get_revisions( + &self, + doc_id: &str, + rev_ids: Option>, + ) -> BoxResultFuture; + + fn reset_document( + &self, + doc_id: &str, + repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError>; +} diff --git a/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs b/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs new file mode 100644 index 0000000000..b9e3a23e9c --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs @@ -0,0 +1,130 @@ +use crate::ws::local::DocumentCloudStorage; +use dashmap::DashMap; +use flowy_collaboration::{ + entities::doc::DocumentInfo, + errors::CollaborateError, + protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + sync::*, + util::{make_doc_from_revisions, repeated_revision_from_repeated_revision_pb}, +}; +use lib_infra::future::BoxResultFuture; +use std::{ + convert::TryInto, + fmt::{Debug, Formatter}, + sync::Arc, +}; + +pub(crate) struct LocalDocumentCloudPersistence { + // For the moment, we use memory to cache the data, it will be implemented with other storage. + // Like the Firestore,Dropbox.etc. + storage: Arc, +} + +impl Debug for LocalDocumentCloudPersistence { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") } +} + +impl std::default::Default for LocalDocumentCloudPersistence { + fn default() -> Self { + LocalDocumentCloudPersistence { + storage: Arc::new(MemoryDocumentCloudStorage::default()), + } + } +} + +impl DocumentCloudPersistence for LocalDocumentCloudPersistence { + fn enable_sync(&self) -> bool { false } + + fn read_document(&self, doc_id: &str) -> BoxResultFuture { + let storage = self.storage.clone(); + let doc_id = doc_id.to_owned(); + Box::pin(async move { + let repeated_revision = storage.get_revisions(&doc_id, None).await?; + match make_doc_from_revisions(&doc_id, repeated_revision) { + Ok(Some(mut document_info_pb)) => { + let document_info: DocumentInfo = (&mut document_info_pb) + .try_into() + .map_err(|e| CollaborateError::internal().context(e))?; + Ok(document_info) + }, + Ok(None) => Err(CollaborateError::record_not_found()), + Err(e) => Err(CollaborateError::internal().context(e)), + } + }) + } + + fn create_document( + &self, + doc_id: &str, + repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture { + let doc_id = doc_id.to_owned(); + let storage = self.storage.clone(); + Box::pin(async move { + let _ = storage.set_revisions(repeated_revision.clone()).await?; + let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?; + let document_info = DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())?; + Ok(document_info) + }) + } + + fn read_revisions( + &self, + doc_id: &str, + rev_ids: Option>, + ) -> BoxResultFuture, CollaborateError> { + let doc_id = doc_id.to_owned(); + let storage = self.storage.clone(); + Box::pin(async move { + let mut repeated_revision = storage.get_revisions(&doc_id, rev_ids).await?; + let revisions: Vec = repeated_revision.take_items().into(); + Ok(revisions) + }) + } + + fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + let storage = self.storage.clone(); + Box::pin(async move { + let _ = storage.set_revisions(repeated_revision).await?; + Ok(()) + }) + } + + fn reset_document(&self, doc_id: &str, revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + let storage = self.storage.clone(); + let doc_id = doc_id.to_owned(); + Box::pin(async move { + let _ = storage.reset_document(&doc_id, revisions).await?; + Ok(()) + }) + } +} + +struct MemoryDocumentCloudStorage {} +impl std::default::Default for MemoryDocumentCloudStorage { + fn default() -> Self { Self {} } +} +impl DocumentCloudStorage for MemoryDocumentCloudStorage { + fn set_revisions(&self, _repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + Box::pin(async move { Ok(()) }) + } + + fn get_revisions( + &self, + _doc_id: &str, + _rev_ids: Option>, + ) -> BoxResultFuture { + Box::pin(async move { + let repeated_revisions = RepeatedRevisionPB::new(); + Ok(repeated_revisions) + }) + } + + fn reset_document( + &self, + _doc_id: &str, + _repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError> { + Box::pin(async move { Ok(()) }) + } +} diff --git a/frontend/rust-lib/flowy-net/src/services/mod.rs b/frontend/rust-lib/flowy-net/src/ws/mod.rs similarity index 59% rename from frontend/rust-lib/flowy-net/src/services/mod.rs rename to frontend/rust-lib/flowy-net/src/ws/mod.rs index 82872d6334..b161c5ae8b 100644 --- a/frontend/rust-lib/flowy-net/src/services/mod.rs +++ b/frontend/rust-lib/flowy-net/src/ws/mod.rs @@ -1,3 +1,3 @@ +pub mod connection; pub mod http; pub mod local; -pub mod ws_conn; diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 2d7d21472b..14958cf8fa 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -13,7 +13,7 @@ use flowy_document::{ }; use flowy_net::{ cloud::document::{DocumentHttpCloudService, DocumentLocalCloudService}, - services::ws_conn::FlowyWebSocketConnect, + ws::connection::FlowyWebSocketConnect, }; use flowy_user::services::UserSession; use lib_infra::future::FutureResult; diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 93162f7320..cb97e299e2 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -6,9 +6,9 @@ use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core}; use flowy_document::context::DocumentContext; use flowy_net::{ entities::NetworkType, - services::{ + ws::{ + connection::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect}, local::LocalWebSocket, - ws_conn::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect}, }, }; use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig}; diff --git a/frontend/rust-lib/flowy-sdk/src/module.rs b/frontend/rust-lib/flowy-sdk/src/module.rs index 2bc7994139..49867864b3 100644 --- a/frontend/rust-lib/flowy-sdk/src/module.rs +++ b/frontend/rust-lib/flowy-sdk/src/module.rs @@ -1,5 +1,5 @@ use flowy_core::context::CoreContext; -use flowy_net::services::ws_conn::FlowyWebSocketConnect; +use flowy_net::ws::connection::FlowyWebSocketConnect; use flowy_user::services::UserSession; use lib_dispatch::prelude::Module; use std::sync::Arc; diff --git a/shared-lib/flowy-collaboration/src/sync/server.rs b/shared-lib/flowy-collaboration/src/sync/server.rs index 76a4276315..ba5ba87344 100644 --- a/shared-lib/flowy-collaboration/src/sync/server.rs +++ b/shared-lib/flowy-collaboration/src/sync/server.rs @@ -16,7 +16,9 @@ use tokio::{ task::spawn_blocking, }; -pub trait ServerDocumentPersistence: Send + Sync + Debug { +pub trait DocumentCloudPersistence: Send + Sync + Debug { + fn enable_sync(&self) -> bool; + fn read_document(&self, doc_id: &str) -> BoxResultFuture; fn create_document( @@ -31,6 +33,8 @@ pub trait ServerDocumentPersistence: Send + Sync + Debug { rev_ids: Option>, ) -> BoxResultFuture, CollaborateError>; + fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + fn reset_document( &self, doc_id: &str, @@ -40,11 +44,11 @@ pub trait ServerDocumentPersistence: Send + Sync + Debug { pub struct ServerDocumentManager { open_doc_map: Arc>>>, - persistence: Arc, + persistence: Arc, } impl ServerDocumentManager { - pub fn new(persistence: Arc) -> Self { + pub fn new(persistence: Arc) -> Self { Self { open_doc_map: Arc::new(RwLock::new(HashMap::new())), persistence, @@ -91,7 +95,7 @@ impl ServerDocumentManager { let doc_id = client_data.doc_id.clone(); match self.get_document_handler(&doc_id).await { None => { - tracing::trace!("Document:{} doesn't exist, ignore pinging", doc_id); + tracing::trace!("Document:{} doesn't exist, ignore client ping", doc_id); Ok(()) }, Some(handler) => { @@ -169,23 +173,26 @@ impl std::ops::Drop for ServerDocumentManager { struct OpenDocHandle { doc_id: String, sender: mpsc::Sender, - persistence: Arc, users: DashMap>, } impl OpenDocHandle { - fn new(doc: DocumentInfo, persistence: Arc) -> Result { + fn new(doc: DocumentInfo, persistence: Arc) -> Result { let doc_id = doc.doc_id.clone(); let (sender, receiver) = mpsc::channel(100); let users = DashMap::new(); - let queue = DocumentCommandQueue::new(receiver, doc)?; - tokio::task::spawn(queue.run()); - Ok(Self { - doc_id, - sender, + + let delta = RichTextDelta::from_bytes(&doc.text)?; + let synchronizer = Arc::new(RevisionSynchronizer::new( + &doc.doc_id, + doc.rev_id, + Document::from_delta(delta), persistence, - users, - }) + )); + + let queue = DocumentCommandQueue::new(&doc.doc_id, receiver, synchronizer)?; + tokio::task::spawn(queue.run()); + Ok(Self { doc_id, sender, users }) } #[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)] @@ -195,12 +202,10 @@ impl OpenDocHandle { repeated_revision: RepeatedRevisionPB, ) -> Result<(), CollaborateError> { let (ret, rx) = oneshot::channel(); - let persistence = self.persistence.clone(); self.users.insert(user.user_id(), user.clone()); let msg = DocumentCommand::ApplyRevisions { user, repeated_revision, - persistence, ret, }; @@ -211,13 +216,7 @@ impl OpenDocHandle { async fn apply_ping(&self, rev_id: i64, user: Arc) -> Result<(), CollaborateError> { let (ret, rx) = oneshot::channel(); self.users.insert(user.user_id(), user.clone()); - let persistence = self.persistence.clone(); - let msg = DocumentCommand::Ping { - user, - persistence, - rev_id, - ret, - }; + let msg = DocumentCommand::Ping { user, rev_id, ret }; let result = self.send(msg, rx).await?; result } @@ -225,12 +224,7 @@ impl OpenDocHandle { #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)] async fn apply_document_reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { let (ret, rx) = oneshot::channel(); - let persistence = self.persistence.clone(); - let msg = DocumentCommand::Reset { - persistence, - repeated_revision, - ret, - }; + let msg = DocumentCommand::Reset { repeated_revision, ret }; let result = self.send(msg, rx).await?; result } @@ -247,8 +241,7 @@ impl OpenDocHandle { impl std::ops::Drop for OpenDocHandle { fn drop(&mut self) { - // - log::debug!("{} OpenDocHandle was drop", self.doc_id); + tracing::debug!("{} OpenDocHandle was drop", self.doc_id); } } @@ -257,17 +250,14 @@ enum DocumentCommand { ApplyRevisions { user: Arc, repeated_revision: RepeatedRevisionPB, - persistence: Arc, ret: oneshot::Sender>, }, Ping { user: Arc, - persistence: Arc, rev_id: i64, ret: oneshot::Sender>, }, Reset { - persistence: Arc, repeated_revision: RepeatedRevisionPB, ret: oneshot::Sender>, }, @@ -280,16 +270,13 @@ struct DocumentCommandQueue { } impl DocumentCommandQueue { - fn new(receiver: mpsc::Receiver, doc: DocumentInfo) -> Result { - let delta = RichTextDelta::from_bytes(&doc.text)?; - let synchronizer = Arc::new(RevisionSynchronizer::new( - &doc.doc_id, - doc.rev_id, - Document::from_delta(delta), - )); - + fn new( + doc_id: &str, + receiver: mpsc::Receiver, + synchronizer: Arc, + ) -> Result { Ok(Self { - doc_id: doc.doc_id, + doc_id: doc_id.to_owned(), receiver: Some(receiver), synchronizer, }) @@ -317,39 +304,21 @@ impl DocumentCommandQueue { DocumentCommand::ApplyRevisions { user, repeated_revision, - persistence, ret, } => { let result = self .synchronizer - .sync_revisions(user, repeated_revision, persistence) + .sync_revisions(user, repeated_revision) .await .map_err(internal_error); let _ = ret.send(result); }, - DocumentCommand::Ping { - user, - persistence, - rev_id, - ret, - } => { - let result = self - .synchronizer - .pong(user, persistence, rev_id) - .await - .map_err(internal_error); + DocumentCommand::Ping { user, rev_id, ret } => { + let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error); let _ = ret.send(result); }, - DocumentCommand::Reset { - persistence, - repeated_revision, - ret, - } => { - let result = self - .synchronizer - .reset(persistence, repeated_revision) - .await - .map_err(internal_error); + DocumentCommand::Reset { repeated_revision, ret } => { + let result = self.synchronizer.reset(repeated_revision).await.map_err(internal_error); let _ = ret.send(result); }, } @@ -358,7 +327,7 @@ impl DocumentCommandQueue { impl std::ops::Drop for DocumentCommandQueue { fn drop(&mut self) { - log::debug!("{} DocumentCommandQueue was drop", self.doc_id); + tracing::debug!("{} DocumentCommandQueue was drop", self.doc_id); } } diff --git a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs index 43cb98a582..14d7a9ecd2 100644 --- a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs @@ -6,7 +6,7 @@ use crate::{ }, errors::CollaborateError, protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, - sync::ServerDocumentPersistence, + sync::DocumentCloudPersistence, util::*, }; use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; @@ -30,36 +30,41 @@ pub enum SyncResponse { Pull(DocumentServerWSData), Push(DocumentServerWSData), Ack(DocumentServerWSData), - NewRevision(RepeatedRevisionPB), } pub struct RevisionSynchronizer { pub doc_id: String, pub rev_id: AtomicI64, document: Arc>, + persistence: Arc, } impl RevisionSynchronizer { - pub fn new(doc_id: &str, rev_id: i64, document: Document) -> RevisionSynchronizer { + pub fn new( + doc_id: &str, + rev_id: i64, + document: Document, + persistence: Arc, + ) -> RevisionSynchronizer { let document = Arc::new(RwLock::new(document)); RevisionSynchronizer { doc_id: doc_id.to_string(), rev_id: AtomicI64::new(rev_id), document, + persistence, } } - #[tracing::instrument(level = "debug", skip(self, user, repeated_revision, persistence), err)] + #[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)] pub async fn sync_revisions( &self, user: Arc, repeated_revision: RepeatedRevisionPB, - persistence: Arc, ) -> Result<(), CollaborateError> { let doc_id = self.doc_id.clone(); if repeated_revision.get_items().is_empty() { // Return all the revisions to client - let revisions = persistence.read_revisions(&doc_id, None).await?; + let revisions = self.persistence.read_revisions(&doc_id, None).await?; let repeated_revision = repeated_revision_from_revision_pbs(revisions)?; let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, repeated_revision); user.receive(SyncResponse::Push(data)); @@ -68,7 +73,7 @@ impl RevisionSynchronizer { let server_base_rev_id = self.rev_id.load(SeqCst); let first_revision = repeated_revision.get_items().first().unwrap().clone(); - if self.is_applied_before(&first_revision, &persistence).await { + if self.is_applied_before(&first_revision, &self.persistence).await { // Server has received this revision before, so ignore the following revisions return Ok(()); } @@ -81,7 +86,7 @@ impl RevisionSynchronizer { for revision in repeated_revision.get_items() { let _ = self.compose_revision(revision)?; } - user.receive(SyncResponse::NewRevision(repeated_revision)); + let _ = self.persistence.save_revisions(repeated_revision).await?; } else { // The server document is outdated, pull the missing revision from the client. let range = RevisionRange { @@ -103,24 +108,18 @@ impl RevisionSynchronizer { // delta. let from_rev_id = first_revision.rev_id; let to_rev_id = server_base_rev_id; - let _ = self - .push_revisions_to_user(user, persistence, from_rev_id, to_rev_id) - .await; + let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await; }, } Ok(()) } - #[tracing::instrument(level = "trace", skip(self, user, persistence), fields(server_rev_id), err)] - pub async fn pong( - &self, - user: Arc, - persistence: Arc, - client_rev_id: i64, - ) -> Result<(), CollaborateError> { + #[tracing::instrument(level = "trace", skip(self, user), fields(server_rev_id), err)] + pub async fn pong(&self, user: Arc, client_rev_id: i64) -> Result<(), CollaborateError> { let doc_id = self.doc_id.clone(); let server_rev_id = self.rev_id(); tracing::Span::current().record("server_rev_id", &server_rev_id); + match server_rev_id.cmp(&client_rev_id) { Ordering::Less => { tracing::error!("Client should not send ping and the server should pull the revisions from the client") @@ -133,27 +132,21 @@ impl RevisionSynchronizer { let from_rev_id = client_rev_id; let to_rev_id = server_rev_id; tracing::trace!("Push revisions to user"); - let _ = self - .push_revisions_to_user(user, persistence, from_rev_id, to_rev_id) - .await; + let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await; }, } Ok(()) } - #[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)] - pub async fn reset( - &self, - persistence: Arc, - repeated_revision: RepeatedRevisionPB, - ) -> Result<(), CollaborateError> { + #[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(doc_id), err)] + pub async fn reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { let doc_id = self.doc_id.clone(); tracing::Span::current().record("doc_id", &doc_id.as_str()); let revisions: Vec = repeated_revision.get_items().to_vec(); let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions); let delta = make_delta_from_revision_pb(revisions)?; - let _ = persistence.reset_document(&doc_id, repeated_revision).await?; + let _ = self.persistence.reset_document(&doc_id, repeated_revision).await?; *self.document.write() = Document::from_delta(delta); let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); Ok(()) @@ -194,7 +187,7 @@ impl RevisionSynchronizer { async fn is_applied_before( &self, new_revision: &RevisionPB, - persistence: &Arc, + persistence: &Arc, ) -> bool { let rev_ids = Some(vec![new_revision.rev_id]); if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await { @@ -208,15 +201,9 @@ impl RevisionSynchronizer { false } - async fn push_revisions_to_user( - &self, - user: Arc, - persistence: Arc, - from: i64, - to: i64, - ) { + async fn push_revisions_to_user(&self, user: Arc, from: i64, to: i64) { let rev_ids: Vec = (from..=to).collect(); - let revisions = match persistence.read_revisions(&self.doc_id, Some(rev_ids)).await { + let revisions = match self.persistence.read_revisions(&self.doc_id, Some(rev_ids)).await { Ok(revisions) => { assert_eq!( revisions.is_empty(), diff --git a/shared-lib/flowy-collaboration/src/util.rs b/shared-lib/flowy-collaboration/src/util.rs index f8c43c3ce6..a5d3890c4e 100644 --- a/shared-lib/flowy-collaboration/src/util.rs +++ b/shared-lib/flowy-collaboration/src/util.rs @@ -1,10 +1,11 @@ use crate::{ entities::revision::{RepeatedRevision, Revision}, errors::{CollaborateError, CollaborateResult}, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + protobuf::{DocumentInfo as DocumentInfoPB, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, }; use lib_ot::{ core::{OperationTransformable, NEW_LINE, WHITESPACE}, + errors::OTError, rich_text::RichTextDelta, }; use std::{ @@ -116,3 +117,39 @@ pub fn pair_rev_id_from_revisions(revisions: &[Revision]) -> (i64, i64) { (0, rev_id) } } + +#[inline] +pub fn make_doc_from_revisions( + doc_id: &str, + mut revisions: RepeatedRevisionPB, +) -> Result, OTError> { + let revisions = revisions.take_items(); + if revisions.is_empty() { + // return Err(CollaborateError::record_not_found().context(format!("{} not + // exist", doc_id))); + return Ok(None); + } + + let mut document_delta = RichTextDelta::new(); + let mut base_rev_id = 0; + let mut rev_id = 0; + for revision in revisions { + base_rev_id = revision.base_rev_id; + rev_id = revision.rev_id; + + if revision.delta_data.is_empty() { + tracing::warn!("revision delta_data is empty"); + } + + let delta = RichTextDelta::from_bytes(revision.delta_data)?; + document_delta = document_delta.compose(&delta)?; + } + + let text = document_delta.to_json(); + let mut document_info = DocumentInfoPB::new(); + document_info.set_doc_id(doc_id.to_owned()); + document_info.set_text(text); + document_info.set_base_rev_id(base_rev_id); + document_info.set_rev_id(rev_id); + Ok(Some(document_info)) +}