diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart index 99417e0ce8..ac35cb0b03 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart @@ -16,6 +16,7 @@ class ErrorCode extends $pb.ProtobufEnum { static const ErrorCode UndoFail = ErrorCode._(200, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UndoFail'); static const ErrorCode RedoFail = ErrorCode._(201, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'RedoFail'); static const ErrorCode OutOfBound = ErrorCode._(202, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'OutOfBound'); + static const ErrorCode DuplicateRevision = ErrorCode._(400, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DuplicateRevision'); static const ErrorCode UserUnauthorized = ErrorCode._(999, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserUnauthorized'); static const ErrorCode InternalError = ErrorCode._(1000, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError'); @@ -26,6 +27,7 @@ class ErrorCode extends $pb.ProtobufEnum { UndoFail, RedoFail, OutOfBound, + DuplicateRevision, UserUnauthorized, InternalError, ]; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart index d6a4cfcc24..0af67ef178 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart @@ -18,13 +18,14 @@ const ErrorCode$json = const { const {'1': 'UndoFail', '2': 200}, const {'1': 'RedoFail', '2': 201}, const {'1': 'OutOfBound', '2': 202}, + const {'1': 'DuplicateRevision', '2': 400}, const {'1': 'UserUnauthorized', '2': 999}, const {'1': 'InternalError', '2': 1000}, ], }; /// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEAoMRG9jSWRJbnZhbGlkEAASDwoLRG9jTm90Zm91bmQQARISCg5Xc0Nvbm5lY3RFcnJvchAKEg0KCFVuZG9GYWlsEMgBEg0KCFJlZG9GYWlsEMkBEg8KCk91dE9mQm91bmQQygESFQoQVXNlclVuYXV0aG9yaXplZBDnBxISCg1JbnRlcm5hbEVycm9yEOgH'); +final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEAoMRG9jSWRJbnZhbGlkEAASDwoLRG9jTm90Zm91bmQQARISCg5Xc0Nvbm5lY3RFcnJvchAKEg0KCFVuZG9GYWlsEMgBEg0KCFJlZG9GYWlsEMkBEg8KCk91dE9mQm91bmQQygESFgoRRHVwbGljYXRlUmV2aXNpb24QkAMSFQoQVXNlclVuYXV0aG9yaXplZBDnBxISCg1JbnRlcm5hbEVycm9yEOgH'); @$core.Deprecated('Use docErrorDescriptor instead') const DocError$json = const { '1': 'DocError', diff --git a/backend/Cargo.toml b/backend/Cargo.toml index d8246bef81..ed178e5ef3 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -90,10 +90,14 @@ path = "src/lib.rs" name = "backend" path = "src/main.rs" +[features] +flowy_test = [] + [dev-dependencies] parking_lot = "0.11" once_cell = "1.7.2" linkify = "0.5.0" +backend = { path = ".", features = ["flowy_test"]} flowy-user = { path = "../rust-lib/flowy-user", features = ["http_server"] } flowy-workspace = { path = "../rust-lib/flowy-workspace", features = ["http_server"] } flowy-ws = { path = "../rust-lib/flowy-ws" } diff --git a/backend/src/service/doc/crud.rs b/backend/src/service/doc/crud.rs index 56e92a9d8d..96a2ccb55f 100644 --- a/backend/src/service/doc/crud.rs +++ b/backend/src/service/doc/crud.rs @@ -50,7 +50,7 @@ pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result Result<(), ServerError> { +pub async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> { let doc_id = Uuid::parse_str(¶ms.doc_id)?; let mut transaction = pool .begin() diff --git a/backend/src/service/doc/edit/edit_actor.rs b/backend/src/service/doc/edit/edit_actor.rs index 9deef21ada..3c17858bf2 100644 --- a/backend/src/service/doc/edit/edit_actor.rs +++ b/backend/src/service/doc/edit/edit_actor.rs @@ -107,7 +107,7 @@ impl EditDocActor { user: user.clone(), socket: socket.clone(), }; - let _ = ret.send(self.edit_doc.new_connection(user, rev_id, self.pg_pool.clone()).await); + let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await); }, } } diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index 3e4e822240..3e47ce53c8 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -5,7 +5,6 @@ use crate::service::{ }; use actix_web::web::Data; -use crate::service::doc::edit::interval::Interval; use bytes::Bytes; use dashmap::DashMap; use flowy_document::{ @@ -54,9 +53,18 @@ impl ServerEditDoc { pub fn document_json(&self) -> String { self.document.read().to_json() } - pub async fn new_connection(&self, user: EditUser, rev_id: i64, _pg_pool: Data) -> Result<(), ServerError> { + #[tracing::instrument( + level = "debug", + skip(self, user), + fields( + user_id = %user.id(), + rev_id = %rev_id, + ) + )] + pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> { self.users.insert(user.id(), user.clone()); let cur_rev_id = self.rev_id.load(SeqCst); + if cur_rev_id > rev_id { let doc_delta = self.document.read().delta().clone(); let cli_revision = self.mk_revision(rev_id, doc_delta); diff --git a/backend/tests/document/edit.rs b/backend/tests/document/edit.rs index 40500d29ff..f7599d7d9c 100644 --- a/backend/tests/document/edit.rs +++ b/backend/tests/document/edit.rs @@ -1,10 +1,13 @@ use crate::document::helper::{DocScript, DocumentTest}; +use flowy_document::services::doc::{Document, FlowyDoc}; +use flowy_ot::core::Delta; #[actix_rt::test] -async fn edit_doc_insert_text() { +async fn sync_doc_insert_text() { let test = DocumentTest::new().await; test.run_scripts(vec![ DocScript::ConnectWs, + DocScript::OpenDoc, DocScript::SendText(0, "abc"), DocScript::SendText(3, "123"), DocScript::SendText(6, "efg"), @@ -15,21 +18,52 @@ async fn edit_doc_insert_text() { } #[actix_rt::test] -async fn edit_doc_insert_large_text() { +async fn sync_open_empty_doc_and_sync_from_server() { let test = DocumentTest::new().await; + let mut document = Document::new::(); + document.insert(0, "123").unwrap(); + document.insert(3, "456").unwrap(); + let json = document.to_json(); test.run_scripts(vec![ - DocScript::ConnectWs, - DocScript::SendText(0, "abc"), - DocScript::SendText(0, "abc"), - DocScript::SendText(0, "abc"), - DocScript::SendText(0, "abc"), - DocScript::SendText(0, "abc"), - DocScript::SendText(0, "abc"), - DocScript::SendText(0, "abc"), - DocScript::SendText(0, "abc"), - /* DocScript::AssertClient(r#"[{"insert":"abc123efg\n"}]"#), - * DocScript::AssertServer(r#"[{"insert":"abc123efg\n"}]"#), */ + DocScript::SetServerDocument(json, 3), + DocScript::OpenDoc, + DocScript::AssertClient(r#"[{"insert":"123456\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"123456\n"}]"#), + ]) + .await; +} + +#[actix_rt::test] +async fn sync_open_empty_doc_and_sync_from_server_using_ws() { + let test = DocumentTest::new().await; + let mut document = Document::new::(); + document.insert(0, "123").unwrap(); + let json = document.to_json(); + + test.run_scripts(vec![ + DocScript::OpenDoc, + DocScript::SetServerDocument(json, 3), + DocScript::ConnectWs, + DocScript::AssertClient(r#"[{"insert":"123\n\n"}]"#), + ]) + .await; +} + +#[actix_rt::test] +async fn sync_open_non_empty_doc_and_sync_with_sever() { + let test = DocumentTest::new().await; + let mut document = Document::new::(); + document.insert(0, "123").unwrap(); + let json = document.to_json(); + + test.run_scripts(vec![ + DocScript::OpenDoc, + DocScript::SetServerDocument(json, 3), + DocScript::SendText(0, "abc"), + DocScript::ConnectWs, + DocScript::AssertClient(r#"[{"insert":"123\nabc\n"}]"#), + // DocScript::AssertServer(r#"[{"insert":"123\nabc\n"}]"#), ]) .await; } diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 9687df773d..58cf18ba53 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -5,7 +5,7 @@ use futures_util::{stream, stream::StreamExt}; use sqlx::PgPool; use tokio::time::{sleep, Duration}; -use backend::service::doc::doc::DocManager; +use backend::service::doc::{crud::update_doc, doc::DocManager}; use flowy_document::{entities::doc::QueryDocParams, services::doc::edit::ClientEditDoc as ClientEditDocContext}; use flowy_net::config::ServerConfig; use flowy_test::{workspace::ViewTest, FlowyTest}; @@ -13,6 +13,10 @@ use flowy_user::services::user::UserSession; // use crate::helper::*; use crate::helper::{spawn_server, TestServer}; +use flowy_document::protobuf::UpdateDocParams; +use flowy_ot::core::Delta; +use parking_lot::RwLock; +use serde::__private::Formatter; pub struct DocumentTest { server: TestServer, @@ -24,6 +28,22 @@ pub enum DocScript { SendText(usize, &'static str), AssertClient(&'static str), AssertServer(&'static str), + SetServerDocument(String, i64), // delta_json, rev_id + OpenDoc, +} + +impl std::fmt::Display for DocScript { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let name = match self { + DocScript::ConnectWs => "ConnectWs", + DocScript::SendText(_, _) => "SendText", + DocScript::AssertClient(_) => "AssertClient", + DocScript::AssertServer(_) => "AssertServer", + DocScript::SetServerDocument(_, _) => "SetServerDocument", + DocScript::OpenDoc => "OpenDoc", + }; + f.write_str(&format!("******** {} *********", name)) + } } impl DocumentTest { @@ -37,54 +57,92 @@ impl DocumentTest { pub async fn run_scripts(self, scripts: Vec) { let _ = self.flowy_test.sign_up().await; let DocumentTest { server, flowy_test } = self; - let script_context = ScriptContext { - client_edit_context: create_doc(&flowy_test).await, - user_session: flowy_test.sdk.user_session.clone(), - doc_manager: server.app_ctx.doc_biz.manager.clone(), - pool: Data::new(server.pg_pool.clone()), - }; - + let script_context = Arc::new(RwLock::new(ScriptContext::new(flowy_test, server).await)); run_scripts(script_context, scripts).await; - std::mem::forget(flowy_test); sleep(Duration::from_secs(5)).await; } } #[derive(Clone)] struct ScriptContext { - client_edit_context: Arc, + client_edit_context: Option>, + flowy_test: FlowyTest, user_session: Arc, doc_manager: Arc, pool: Data, + doc_id: String, } -async fn run_scripts(context: ScriptContext, scripts: Vec) { +impl ScriptContext { + async fn new(flowy_test: FlowyTest, server: TestServer) -> Self { + let user_session = flowy_test.sdk.user_session.clone(); + let doc_id = create_doc(&flowy_test).await; + + Self { + client_edit_context: None, + flowy_test, + user_session, + doc_manager: server.app_ctx.doc_biz.manager.clone(), + pool: Data::new(server.pg_pool.clone()), + doc_id, + } + } + + async fn open_doc(&mut self) { + let flowy_document = self.flowy_test.sdk.flowy_document.clone(); + let pool = self.user_session.db_pool().unwrap(); + let doc_id = self.doc_id.clone(); + + let edit_context = flowy_document.open(QueryDocParams { doc_id }, pool).await.unwrap(); + self.client_edit_context = Some(edit_context); + } + + fn client_edit_context(&self) -> Arc { self.client_edit_context.as_ref().unwrap().clone() } +} + +impl Drop for ScriptContext { + fn drop(&mut self) { + // std::mem::forget(self.flowy_test); + } +} + +async fn run_scripts(context: Arc>, scripts: Vec) { let mut fut_scripts = vec![]; for script in scripts { let context = context.clone(); let fut = async move { + let doc_id = context.read().doc_id.clone(); match script { DocScript::ConnectWs => { - let token = context.user_session.token().unwrap(); - let _ = context.user_session.start_ws_connection(&token).await.unwrap(); + // sleep(Duration::from_millis(300)).await; + let user_session = context.read().user_session.clone(); + let token = user_session.token().unwrap(); + let _ = user_session.start_ws_connection(&token).await.unwrap(); + }, + DocScript::OpenDoc => { + context.write().open_doc().await; }, DocScript::SendText(index, s) => { - context.client_edit_context.insert(index, s).await.unwrap(); + context.read().client_edit_context().insert(index, s).await.unwrap(); }, DocScript::AssertClient(s) => { - let json = context.client_edit_context.doc_json().await.unwrap(); + sleep(Duration::from_millis(300)).await; + let json = context.read().client_edit_context().doc_json().await.unwrap(); assert_eq(s, &json); }, DocScript::AssertServer(s) => { - let edit_doc = context - .doc_manager - .get(&context.client_edit_context.doc_id, context.pool) - .await - .unwrap() - .unwrap(); + sleep(Duration::from_millis(300)).await; + + let pg_pool = context.read().pool.clone(); + let doc_manager = context.read().doc_manager.clone(); + let edit_doc = doc_manager.get(&doc_id, pg_pool).await.unwrap().unwrap(); let json = edit_doc.document_json().await.unwrap(); assert_eq(s, &json); }, + DocScript::SetServerDocument(json, rev_id) => { + let pg_pool = context.read().pool.clone(); + save_doc(&doc_id, json, rev_id, pg_pool).await; + }, } }; fut_scripts.push(fut); @@ -94,6 +152,8 @@ async fn run_scripts(context: ScriptContext, scripts: Vec) { while let Some(script) = stream.next().await { let _ = script.await; } + + std::mem::forget(context); } fn assert_eq(expect: &str, receive: &str) { @@ -104,16 +164,16 @@ fn assert_eq(expect: &str, receive: &str) { assert_eq!(expect, receive); } -async fn create_doc(flowy_test: &FlowyTest) -> Arc { +async fn create_doc(flowy_test: &FlowyTest) -> String { let view_test = ViewTest::new(flowy_test).await; let doc_id = view_test.view.id.clone(); - let user_session = flowy_test.sdk.user_session.clone(); - let flowy_document = flowy_test.sdk.flowy_document.clone(); - - let edit_context = flowy_document - .open(QueryDocParams { doc_id }, user_session.db_pool().unwrap()) - .await - .unwrap(); - - edit_context + doc_id +} + +async fn save_doc(doc_id: &str, json: String, rev_id: i64, pool: Data) { + let mut params = UpdateDocParams::new(); + params.set_doc_id(doc_id.to_owned()); + params.set_data(json); + params.set_rev_id(rev_id); + let _ = update_doc(pool.get_ref(), params).await.unwrap(); } diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index b3a848d371..40605ea030 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -10,6 +10,10 @@ pub enum RevType { Remote = 1, } +impl RevType { + pub fn is_local(&self) -> bool { self == &RevType::Local } +} + impl std::default::Default for RevType { fn default() -> Self { RevType::Local } } @@ -54,7 +58,7 @@ impl std::fmt::Display for RevId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.inner)) } } -#[derive(Clone, Default, ProtoBuf)] +#[derive(PartialEq, Eq, Clone, Default, ProtoBuf)] pub struct Revision { #[pb(index = 1)] pub base_rev_id: i64, diff --git a/rust-lib/flowy-document/src/errors.rs b/rust-lib/flowy-document/src/errors.rs index 12a4651ba3..995bbcf94f 100644 --- a/rust-lib/flowy-document/src/errors.rs +++ b/rust-lib/flowy-document/src/errors.rs @@ -51,6 +51,7 @@ impl DocError { static_doc_error!(undo, ErrorCode::UndoFail); static_doc_error!(redo, ErrorCode::RedoFail); static_doc_error!(out_of_bound, ErrorCode::OutOfBound); + static_doc_error!(duplicate_rev, ErrorCode::DuplicateRevision); } pub fn internal_error(e: T) -> DocError @@ -63,27 +64,30 @@ where #[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)] pub enum ErrorCode { #[display(fmt = "DocIdInvalid")] - DocIdInvalid = 0, + DocIdInvalid = 0, #[display(fmt = "DocNotfound")] - DocNotfound = 1, + DocNotfound = 1, #[display(fmt = "Document websocket error")] - WsConnectError = 10, + WsConnectError = 10, #[display(fmt = "Undo failed")] - UndoFail = 200, + UndoFail = 200, #[display(fmt = "Redo failed")] - RedoFail = 201, + RedoFail = 201, #[display(fmt = "Interval out of bound")] - OutOfBound = 202, + OutOfBound = 202, + + #[display(fmt = "Duplicate revision")] + DuplicateRevision = 400, #[display(fmt = "UserUnauthorized")] - UserUnauthorized = 999, + UserUnauthorized = 999, #[display(fmt = "InternalError")] - InternalError = 1000, + InternalError = 1000, } impl std::default::Default for ErrorCode { diff --git a/rust-lib/flowy-document/src/module.rs b/rust-lib/flowy-document/src/module.rs index 4e6fc45b52..f234b3809b 100644 --- a/rust-lib/flowy-document/src/module.rs +++ b/rust-lib/flowy-document/src/module.rs @@ -29,7 +29,7 @@ pub struct FlowyDocument { impl FlowyDocument { pub fn new( user: Arc, - ws_manager: Arc>, + ws_manager: Arc, server_config: &ServerConfig, ) -> FlowyDocument { let server = construct_doc_server(server_config); diff --git a/rust-lib/flowy-document/src/protobuf/model/errors.rs b/rust-lib/flowy-document/src/protobuf/model/errors.rs index e034737dcc..c66ce5628b 100644 --- a/rust-lib/flowy-document/src/protobuf/model/errors.rs +++ b/rust-lib/flowy-document/src/protobuf/model/errors.rs @@ -221,6 +221,7 @@ pub enum ErrorCode { UndoFail = 200, RedoFail = 201, OutOfBound = 202, + DuplicateRevision = 400, UserUnauthorized = 999, InternalError = 1000, } @@ -238,6 +239,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode { 200 => ::std::option::Option::Some(ErrorCode::UndoFail), 201 => ::std::option::Option::Some(ErrorCode::RedoFail), 202 => ::std::option::Option::Some(ErrorCode::OutOfBound), + 400 => ::std::option::Option::Some(ErrorCode::DuplicateRevision), 999 => ::std::option::Option::Some(ErrorCode::UserUnauthorized), 1000 => ::std::option::Option::Some(ErrorCode::InternalError), _ => ::std::option::Option::None @@ -252,6 +254,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode { ErrorCode::UndoFail, ErrorCode::RedoFail, ErrorCode::OutOfBound, + ErrorCode::DuplicateRevision, ErrorCode::UserUnauthorized, ErrorCode::InternalError, ]; @@ -284,35 +287,38 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x0cerrors.proto\"<\n\x08DocError\x12\x1e\n\x04code\x18\x01\x20\x01(\ \x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*\ - \x9c\x01\n\tErrorCode\x12\x10\n\x0cDocIdInvalid\x10\0\x12\x0f\n\x0bDocNo\ + \xb4\x01\n\tErrorCode\x12\x10\n\x0cDocIdInvalid\x10\0\x12\x0f\n\x0bDocNo\ tfound\x10\x01\x12\x12\n\x0eWsConnectError\x10\n\x12\r\n\x08UndoFail\x10\ \xc8\x01\x12\r\n\x08RedoFail\x10\xc9\x01\x12\x0f\n\nOutOfBound\x10\xca\ - \x01\x12\x15\n\x10UserUnauthorized\x10\xe7\x07\x12\x12\n\rInternalError\ - \x10\xe8\x07J\xf8\x03\n\x06\x12\x04\0\0\x0f\x01\n\x08\n\x01\x0c\x12\x03\ - \0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\ - \x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\ - \x04\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\ - \x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\ - \x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\ - \x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\ - \x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x0f\ - \x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\ - \x03\x07\x04\x15\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x10\n\x0c\n\ - \x05\x05\0\x02\0\x02\x12\x03\x07\x13\x14\n\x0b\n\x04\x05\0\x02\x01\x12\ - \x03\x08\x04\x14\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x0f\n\x0c\ - \n\x05\x05\0\x02\x01\x02\x12\x03\x08\x12\x13\n\x0b\n\x04\x05\0\x02\x02\ - \x12\x03\t\x04\x18\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\t\x04\x12\n\x0c\ - \n\x05\x05\0\x02\x02\x02\x12\x03\t\x15\x17\n\x0b\n\x04\x05\0\x02\x03\x12\ - \x03\n\x04\x13\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\n\x04\x0c\n\x0c\n\ - \x05\x05\0\x02\x03\x02\x12\x03\n\x0f\x12\n\x0b\n\x04\x05\0\x02\x04\x12\ - \x03\x0b\x04\x13\n\x0c\n\x05\x05\0\x02\x04\x01\x12\x03\x0b\x04\x0c\n\x0c\ - \n\x05\x05\0\x02\x04\x02\x12\x03\x0b\x0f\x12\n\x0b\n\x04\x05\0\x02\x05\ - \x12\x03\x0c\x04\x15\n\x0c\n\x05\x05\0\x02\x05\x01\x12\x03\x0c\x04\x0e\n\ - \x0c\n\x05\x05\0\x02\x05\x02\x12\x03\x0c\x11\x14\n\x0b\n\x04\x05\0\x02\ - \x06\x12\x03\r\x04\x1b\n\x0c\n\x05\x05\0\x02\x06\x01\x12\x03\r\x04\x14\n\ - \x0c\n\x05\x05\0\x02\x06\x02\x12\x03\r\x17\x1a\n\x0b\n\x04\x05\0\x02\x07\ - \x12\x03\x0e\x04\x19\n\x0c\n\x05\x05\0\x02\x07\x01\x12\x03\x0e\x04\x11\n\ - \x0c\n\x05\x05\0\x02\x07\x02\x12\x03\x0e\x14\x18b\x06proto3\ + \x01\x12\x16\n\x11DuplicateRevision\x10\x90\x03\x12\x15\n\x10UserUnautho\ + rized\x10\xe7\x07\x12\x12\n\rInternalError\x10\xe8\x07J\xa1\x04\n\x06\ + \x12\x04\0\0\x10\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\ + \x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\ + \x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\ + \x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\ + \x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\ + \x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\ + \x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\ + \x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x10\x01\n\n\n\x03\x05\0\x01\ + \x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x15\n\x0c\n\ + \x05\x05\0\x02\0\x01\x12\x03\x07\x04\x10\n\x0c\n\x05\x05\0\x02\0\x02\x12\ + \x03\x07\x13\x14\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x14\n\x0c\n\ + \x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x02\ + \x12\x03\x08\x12\x13\n\x0b\n\x04\x05\0\x02\x02\x12\x03\t\x04\x18\n\x0c\n\ + \x05\x05\0\x02\x02\x01\x12\x03\t\x04\x12\n\x0c\n\x05\x05\0\x02\x02\x02\ + \x12\x03\t\x15\x17\n\x0b\n\x04\x05\0\x02\x03\x12\x03\n\x04\x13\n\x0c\n\ + \x05\x05\0\x02\x03\x01\x12\x03\n\x04\x0c\n\x0c\n\x05\x05\0\x02\x03\x02\ + \x12\x03\n\x0f\x12\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x0b\x04\x13\n\x0c\n\ + \x05\x05\0\x02\x04\x01\x12\x03\x0b\x04\x0c\n\x0c\n\x05\x05\0\x02\x04\x02\ + \x12\x03\x0b\x0f\x12\n\x0b\n\x04\x05\0\x02\x05\x12\x03\x0c\x04\x15\n\x0c\ + \n\x05\x05\0\x02\x05\x01\x12\x03\x0c\x04\x0e\n\x0c\n\x05\x05\0\x02\x05\ + \x02\x12\x03\x0c\x11\x14\n\x0b\n\x04\x05\0\x02\x06\x12\x03\r\x04\x1c\n\ + \x0c\n\x05\x05\0\x02\x06\x01\x12\x03\r\x04\x15\n\x0c\n\x05\x05\0\x02\x06\ + \x02\x12\x03\r\x18\x1b\n\x0b\n\x04\x05\0\x02\x07\x12\x03\x0e\x04\x1b\n\ + \x0c\n\x05\x05\0\x02\x07\x01\x12\x03\x0e\x04\x14\n\x0c\n\x05\x05\0\x02\ + \x07\x02\x12\x03\x0e\x17\x1a\n\x0b\n\x04\x05\0\x02\x08\x12\x03\x0f\x04\ + \x19\n\x0c\n\x05\x05\0\x02\x08\x01\x12\x03\x0f\x04\x11\n\x0c\n\x05\x05\0\ + \x02\x08\x02\x12\x03\x0f\x14\x18b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/errors.proto b/rust-lib/flowy-document/src/protobuf/proto/errors.proto index 354bf3ce1f..3732cb3073 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/errors.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/errors.proto @@ -11,6 +11,7 @@ enum ErrorCode { UndoFail = 200; RedoFail = 201; OutOfBound = 202; + DuplicateRevision = 400; UserUnauthorized = 999; InternalError = 1000; } diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index b0706f8205..299ea718ef 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -27,20 +27,20 @@ use flowy_ot::core::Delta; pub(crate) struct DocController { server: Server, doc_sql: Arc, - ws: Arc>, + ws_manager: Arc, cache: Arc, user: Arc, } impl DocController { - pub(crate) fn new(server: Server, user: Arc, ws: Arc>) -> Self { + pub(crate) fn new(server: Server, user: Arc, ws: Arc) -> Self { let doc_sql = Arc::new(DocTableSql {}); let cache = Arc::new(DocCache::new()); let controller = Self { server, doc_sql, user, - ws, + ws_manager: ws, cache: cache.clone(), }; controller @@ -74,7 +74,7 @@ impl DocController { pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> { self.cache.remove(doc_id); - self.ws.write().remove_handler(doc_id); + self.ws_manager.remove_handler(doc_id); Ok(()) } @@ -84,7 +84,7 @@ impl DocController { let _ = self.doc_sql.delete_doc(doc_id, &*conn)?; self.cache.remove(doc_id); - self.ws.write().remove_handler(doc_id); + self.ws_manager.remove_handler(doc_id); let _ = self.delete_doc_on_server(params)?; Ok(()) } @@ -118,7 +118,7 @@ impl DocController { // Opti: require upgradable_read lock and then upgrade to write lock using // RwLockUpgradableReadGuard::upgrade(xx) of ws // let doc = self.read_doc(doc_id, pool.clone()).await?; - let ws_sender = self.ws.read().sender(); + let ws = self.ws_manager.ws(); let token = self.user.token()?; let user = self.user.clone(); let server = Arc::new(RevisionServerImpl { @@ -126,8 +126,8 @@ impl DocController { server: self.server.clone(), }); - let edit_ctx = Arc::new(ClientEditDoc::new(doc_id, pool, ws_sender, server, user).await?); - self.ws.write().register_handler(doc_id, edit_ctx.clone()); + let edit_ctx = Arc::new(ClientEditDoc::new(doc_id, pool, ws, server, user).await?); + self.ws_manager.register_handler(doc_id, edit_ctx.clone()); self.cache.set(edit_ctx.clone()); Ok(edit_ctx) } diff --git a/rust-lib/flowy-document/src/services/doc/edit/actor.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs similarity index 94% rename from rust-lib/flowy-document/src/services/doc/edit/actor.rs rename to rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs index 79b18e6214..3b0a326caa 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/actor.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs @@ -62,6 +62,11 @@ impl DocumentEditActor { match msg { EditMsg::Delta { delta, ret } => { let result = self.document.write().await.compose_delta(&delta); + log::debug!( + "Compose push delta: {}. result: {}", + delta.to_json(), + self.document.read().await.to_json() + ); let _ = ret.send(result); }, EditMsg::Insert { index, data, ret } => { @@ -102,7 +107,7 @@ impl DocumentEditActor { let data = self.document.read().await.to_json(); let _ = ret.send(Ok(data)); }, - EditMsg::SaveRevision { rev_id, ret } => { + EditMsg::SaveDocument { rev_id, ret } => { let result = self.save_to_disk(rev_id).await; let _ = ret.send(result); }, diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 2064782062..077dead001 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -7,16 +7,17 @@ use crate::{ module::DocumentUser, services::{ doc::{ - edit::{actor::DocumentEditActor, message::EditMsg}, + edit::{edit_actor::DocumentEditActor, message::EditMsg}, revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor}, UndoResult, }, - ws::{WsDocumentHandler, WsDocumentSender}, + ws::{DocumentWebSocket, WsDocumentHandler}, }, }; use bytes::Bytes; use flowy_database::ConnectionPool; use flowy_ot::core::{Attribute, Delta, Interval}; +use flowy_ws::WsState; use std::{convert::TryFrom, sync::Arc}; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; @@ -33,7 +34,7 @@ impl ClientEditDoc { pub(crate) async fn new( doc_id: &str, pool: Arc, - ws: Arc, + ws: Arc, server: Arc, user: Arc, ) -> DocResult { @@ -64,7 +65,7 @@ impl ClientEditDoc { let _ = self.document.send(msg); let delta_data = rx.await.map_err(internal_error)??.to_bytes(); let rev_id = self.mk_revision(&delta_data).await?; - save(rev_id.into(), self.document.clone()).await + save_document(self.document.clone(), rev_id.into()).await } pub async fn delete(&self, interval: Interval) -> Result<(), DocError> { @@ -158,7 +159,7 @@ impl ClientEditDoc { let _ = rx.await.map_err(internal_error)??; let rev_id = self.mk_revision(&data).await?; - save(rev_id, self.document.clone()).await + save_document(self.document.clone(), rev_id).await } #[cfg(feature = "flowy_test")] @@ -182,7 +183,7 @@ impl WsDocumentHandler for ClientEditDoc { }, WsDataType::PullRev => { let range = RevisionRange::try_from(bytes)?; - let _ = rev_manager.send_revisions(range)?; + let _ = rev_manager.send_revisions(range).await?; }, WsDataType::NewDocUser => {}, WsDataType::Acked => { @@ -200,11 +201,12 @@ impl WsDocumentHandler for ClientEditDoc { } }); } + fn state_changed(&self, state: &WsState) { let _ = self.rev_manager.handle_ws_state_changed(state); } } -async fn save(rev_id: RevId, document: UnboundedSender) -> DocResult<()> { +async fn save_document(document: UnboundedSender, rev_id: RevId) -> DocResult<()> { let (ret, rx) = oneshot::channel::>(); - let _ = document.send(EditMsg::SaveRevision { rev_id, ret }); + let _ = document.send(EditMsg::SaveDocument { rev_id, ret }); let result = rx.await.map_err(internal_error)?; result } @@ -215,24 +217,16 @@ async fn handle_push_rev( document: UnboundedSender, ) -> DocResult<()> { let revision = Revision::try_from(rev_bytes)?; - let _ = rev_manager.add_revision(revision).await?; - match rev_manager.next_compose_revision() { - None => Ok(()), - Some(revision) => { - let delta = Delta::from_bytes(&revision.delta_data)?; - let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Delta { delta, ret }; - let _ = document.send(msg); + let _ = rev_manager.add_revision(revision.clone()).await?; - match rx.await.map_err(internal_error)? { - Ok(_) => save(revision.rev_id.into(), document).await, - Err(e) => { - rev_manager.push_compose_revision(revision); - Err(e) - }, - } - }, - } + let delta = Delta::from_bytes(&revision.delta_data)?; + let (ret, rx) = oneshot::channel::>(); + let msg = EditMsg::Delta { delta, ret }; + let _ = document.send(msg); + let _ = rx.await.map_err(internal_error)??; + + save_document(document, revision.rev_id.into()).await; + Ok(()) } fn spawn_rev_store_actor( diff --git a/rust-lib/flowy-document/src/services/doc/edit/message.rs b/rust-lib/flowy-document/src/services/doc/edit/message.rs index 2ce7cd81a5..6ea40668c5 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/message.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/message.rs @@ -45,7 +45,7 @@ pub enum EditMsg { Doc { ret: Ret, }, - SaveRevision { + SaveDocument { rev_id: RevId, ret: Ret<()>, }, diff --git a/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/rust-lib/flowy-document/src/services/doc/edit/mod.rs index 7c75a6f473..afb459ee6c 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,4 +1,4 @@ -mod actor; +mod edit_actor; mod edit_doc; mod message; diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 43ee987e86..5c83a0f0ad 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -1,13 +1,13 @@ use crate::{ entities::doc::{RevId, RevType, Revision, RevisionRange}, - errors::{DocError, DocResult}, + errors::{internal_error, DocError}, services::{ doc::revision::{ - actor::{RevisionCmd, RevisionStoreActor}, + store_actor::{RevisionCmd, RevisionStoreActor}, util::NotifyOpenDocAction, }, util::RevIdCounter, - ws::WsDocumentSender, + ws::DocumentWebSocket, }, }; use flowy_infra::{ @@ -15,6 +15,7 @@ use flowy_infra::{ retry::{ExponentialBackoff, Retry}, }; use flowy_ot::core::Delta; +use flowy_ws::WsState; use parking_lot::RwLock; use std::{collections::VecDeque, sync::Arc}; use tokio::sync::{mpsc, oneshot}; @@ -32,9 +33,8 @@ pub struct RevisionManager { doc_id: String, user_id: String, rev_id_counter: RevIdCounter, - ws: Arc, + ws: Arc, rev_store: mpsc::Sender, - pending_revs: RwLock>, } impl RevisionManager { @@ -42,45 +42,37 @@ impl RevisionManager { doc_id: &str, user_id: &str, rev_id: RevId, - ws: Arc, + ws: Arc, rev_store: mpsc::Sender, ) -> Self { notify_open_doc(&ws, user_id, doc_id, &rev_id); - let rev_id_counter = RevIdCounter::new(rev_id.into()); - let pending_revs = RwLock::new(VecDeque::new()); Self { doc_id: doc_id.to_string(), user_id: user_id.to_string(), rev_id_counter, ws, - pending_revs, rev_store, } } - pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); } - - pub fn next_compose_revision(&self) -> Option { self.pending_revs.write().pop_front() } - #[tracing::instrument(level = "debug", skip(self))] pub async fn add_revision(&self, revision: Revision) -> Result<(), DocError> { + let (ret, rx) = oneshot::channel(); let cmd = RevisionCmd::Revision { revision: revision.clone(), + ret, }; let _ = self.rev_store.send(cmd).await; - - match revision.ty { - RevType::Local => match self.ws.send(revision.into()) { + let result = rx.await.map_err(internal_error)?; + if result.is_ok() && revision.ty.is_local() { + match self.ws.send(revision.into()) { Ok(_) => {}, Err(e) => log::error!("Send delta failed: {:?}", e), - }, - RevType::Remote => { - self.pending_revs.write().push_back(revision); - }, + }; } - Ok(()) + result } pub fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> { @@ -99,23 +91,41 @@ impl RevisionManager { (cur, next) } - pub fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> { + pub async fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> { debug_assert!(&range.doc_id == &self.doc_id); - let (ret, _rx) = oneshot::channel(); + let (ret, rx) = oneshot::channel(); let sender = self.rev_store.clone(); - - tokio::spawn(async move { - let _ = sender.send(RevisionCmd::SendRevisions { range, ret }).await; - }); + let _ = sender.send(RevisionCmd::SendRevisions { range, ret }).await; + let revisions = rx.await.map_err(internal_error)??; unimplemented!() + // Ok(()) + } + + #[tracing::instrument( + level = "debug", + skip(self), + fields( + doc_id = %self.doc_id.clone(), + rev_id = %self.rev_id(), + ) + )] + pub fn handle_ws_state_changed(&self, state: &WsState) { + match state { + WsState::Init => {}, + WsState::Connected(_) => { + let rev_id: RevId = self.rev_id().into(); + notify_open_doc(&self.ws, &self.user_id, &self.doc_id, &rev_id); + }, + WsState::Disconnected(_) => {}, + } } } // FIXME: // user_id may be invalid if the user switch to another account while // theNotifyOpenDocAction is flying -fn notify_open_doc(ws: &Arc, user_id: &str, doc_id: &str, rev_id: &RevId) { +fn notify_open_doc(ws: &Arc, user_id: &str, doc_id: &str, rev_id: &RevId) { let action = NotifyOpenDocAction::new(user_id, doc_id, rev_id, ws); let strategy = ExponentialBackoff::from_millis(50).take(3); let retry = Retry::spawn(strategy, action); diff --git a/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/rust-lib/flowy-document/src/services/doc/revision/mod.rs index f3bd0f919e..49d14d5107 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/mod.rs @@ -1,6 +1,6 @@ -mod actor; mod manager; +mod store_actor; mod util; -pub use actor::*; pub use manager::*; +pub use store_actor::*; diff --git a/rust-lib/flowy-document/src/services/doc/revision/actor.rs b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs similarity index 94% rename from rust-lib/flowy-document/src/services/doc/revision/actor.rs rename to rust-lib/flowy-document/src/services/doc/revision/store_actor.rs index 08d1c3ddc2..d357b576b0 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs @@ -18,6 +18,7 @@ use tokio::{ pub enum RevisionCmd { Revision { revision: Revision, + ret: oneshot::Sender>, }, AckRevision { rev_id: RevId, @@ -76,8 +77,9 @@ impl RevisionStoreActor { async fn handle_message(&self, cmd: RevisionCmd) { match cmd { - RevisionCmd::Revision { revision } => { - self.handle_new_revision(revision).await; + RevisionCmd::Revision { revision, ret } => { + let result = self.handle_new_revision(revision).await; + let _ = ret.send(result); }, RevisionCmd::AckRevision { rev_id } => { self.handle_revision_acked(rev_id).await; @@ -93,11 +95,16 @@ impl RevisionStoreActor { } } - async fn handle_new_revision(&self, revision: Revision) { + async fn handle_new_revision(&self, revision: Revision) -> DocResult<()> { + if self.revs.contains_key(&revision.rev_id) { + return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); + } + let mut operation = RevisionOperation::new(&revision); let _receiver = operation.receiver(); self.revs.insert(revision.rev_id, operation); self.save_revisions().await; + Ok(()) } async fn handle_revision_acked(&self, rev_id: RevId) { diff --git a/rust-lib/flowy-document/src/services/doc/revision/util.rs b/rust-lib/flowy-document/src/services/doc/revision/util.rs index 6922f832a7..28bc19c4b5 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/util.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/util.rs @@ -1,7 +1,7 @@ use crate::{ entities::doc::{NewDocUser, RevId, Revision}, errors::{DocError, DocResult}, - services::ws::WsDocumentSender, + services::ws::DocumentWebSocket, sql_tables::RevState, }; use flowy_infra::retry::Action; @@ -54,11 +54,11 @@ pub(crate) struct NotifyOpenDocAction { user_id: String, rev_id: RevId, doc_id: String, - ws: Arc, + ws: Arc, } impl NotifyOpenDocAction { - pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { + pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { Self { user_id: user_id.to_owned(), rev_id: rev_id.clone(), diff --git a/rust-lib/flowy-document/src/services/ws/ws_manager.rs b/rust-lib/flowy-document/src/services/ws/ws_manager.rs index abb9fd6ce0..7aab61d5b7 100644 --- a/rust-lib/flowy-document/src/services/ws/ws_manager.rs +++ b/rust-lib/flowy-document/src/services/ws/ws_manager.rs @@ -1,43 +1,48 @@ use crate::{entities::ws::WsDocumentData, errors::DocError}; use bytes::Bytes; +use dashmap::DashMap; +use flowy_ws::WsState; use std::{collections::HashMap, convert::TryInto, sync::Arc}; +use tokio::sync::broadcast::error::RecvError; pub(crate) trait WsDocumentHandler: Send + Sync { fn receive(&self, data: WsDocumentData); + fn state_changed(&self, state: &WsState); } -pub trait WsDocumentSender: Send + Sync { +pub type WsStateReceiver = tokio::sync::broadcast::Receiver; +pub trait DocumentWebSocket: Send + Sync { fn send(&self, data: WsDocumentData) -> Result<(), DocError>; + fn state_notify(&self) -> WsStateReceiver; } pub struct WsDocumentManager { - sender: Arc, + ws: Arc, // key: the document id - ws_handlers: HashMap>, + handlers: Arc>>, } impl WsDocumentManager { - pub fn new(sender: Arc) -> Self { - Self { - sender, - ws_handlers: HashMap::new(), - } + pub fn new(ws: Arc) -> Self { + let handlers: Arc>> = Arc::new(DashMap::new()); + listen_ws_state_changed(ws.clone(), handlers.clone()); + + Self { ws, handlers } } - pub(crate) fn register_handler(&mut self, id: &str, handler: Arc) { - if self.ws_handlers.contains_key(id) { + pub(crate) fn register_handler(&self, id: &str, handler: Arc) { + if self.handlers.contains_key(id) { log::error!("Duplicate handler registered for {:?}", id); } - - self.ws_handlers.insert(id.to_string(), handler); + self.handlers.insert(id.to_string(), handler); } - pub(crate) fn remove_handler(&mut self, id: &str) { self.ws_handlers.remove(id); } + pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); } - pub fn receive_data(&self, data: Bytes) { + pub fn handle_ws_data(&self, data: Bytes) { let data: WsDocumentData = data.try_into().unwrap(); - match self.ws_handlers.get(&data.doc_id) { + match self.handlers.get(&data.doc_id) { None => { log::error!("Can't find any source handler for {:?}", data.doc_id); }, @@ -47,5 +52,25 @@ impl WsDocumentManager { } } - pub fn sender(&self) -> Arc { self.sender.clone() } + pub fn ws(&self) -> Arc { self.ws.clone() } +} + +#[tracing::instrument(level = "debug", skip(ws, handlers))] +fn listen_ws_state_changed(ws: Arc, handlers: Arc>>) { + let mut notify = ws.state_notify(); + tokio::spawn(async move { + loop { + match notify.recv().await { + Ok(state) => { + handlers.iter().for_each(|handle| { + handle.value().state_changed(&state); + }); + }, + Err(e) => { + log::error!("Websocket state notify error: {:?}", e); + break; + }, + } + } + }); } diff --git a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 7158f7e60c..67a3039697 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -2,10 +2,10 @@ use bytes::Bytes; use flowy_document::{ errors::DocError, module::DocumentUser, - prelude::{WsDocumentManager, WsDocumentSender}, + prelude::{DocumentWebSocket, WsDocumentManager}, }; -use flowy_document::entities::ws::WsDocumentData; +use flowy_document::{entities::ws::WsDocumentData, errors::internal_error, services::ws::WsStateReceiver}; use flowy_user::{errors::ErrorCode, services::user::UserSession}; use flowy_ws::{WsMessage, WsMessageHandler, WsModule}; use parking_lot::RwLock; @@ -18,7 +18,7 @@ pub struct DocumentDepsResolver { impl DocumentDepsResolver { pub fn new(user_session: Arc) -> Self { Self { user_session } } - pub fn split_into(self) -> (Arc, Arc>) { + pub fn split_into(self) -> (Arc, Arc) { let user = Arc::new(DocumentUserImpl { user: self.user_session.clone(), }); @@ -27,7 +27,7 @@ impl DocumentDepsResolver { user: self.user_session.clone(), }); - let ws_manager = Arc::new(RwLock::new(WsDocumentManager::new(sender))); + let ws_manager = Arc::new(WsDocumentManager::new(sender)); let ws_handler = Arc::new(WsDocumentReceiver { inner: ws_manager.clone(), @@ -73,19 +73,19 @@ struct WsSenderImpl { user: Arc, } -impl WsDocumentSender for WsSenderImpl { +impl DocumentWebSocket for WsSenderImpl { fn send(&self, data: WsDocumentData) -> Result<(), DocError> { let msg: WsMessage = data.into(); - let _ = self - .user - .send_ws_msg(msg) - .map_err(|e| DocError::internal().context(e))?; + let sender = self.user.ws_controller.sender().map_err(internal_error)?; + sender.send_msg(msg).map_err(internal_error)?; Ok(()) } + + fn state_notify(&self) -> WsStateReceiver { self.user.ws_controller.state_subscribe() } } struct WsDocumentReceiver { - inner: Arc>, + inner: Arc, } impl WsMessageHandler for WsDocumentReceiver { @@ -93,6 +93,6 @@ impl WsMessageHandler for WsDocumentReceiver { fn receive_message(&self, msg: WsMessage) { let data = Bytes::from(msg.data); - self.inner.read().receive_data(data); + self.inner.handle_ws_data(data); } } diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index ba8ca208a3..f76f999236 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -16,10 +16,10 @@ use flowy_database::{ ExpressionMethods, UserDatabaseConnection, }; -use flowy_infra::{future::wrap_future, kv::KV}; +use flowy_infra::kv::KV; use flowy_net::config::ServerConfig; use flowy_sqlite::ConnectionPool; -use flowy_ws::{WsController, WsMessage, WsMessageHandler}; +use flowy_ws::{WsController, WsMessage, WsMessageHandler, WsState}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -50,7 +50,7 @@ pub struct UserSession { #[allow(dead_code)] server: Server, session: RwLock>, - ws_controller: Arc, + pub ws_controller: Arc, status_callback: SessionStatusCallback, } @@ -185,12 +185,6 @@ impl UserSession { pub fn add_ws_handler(&self, handler: Arc) { let _ = self.ws_controller.add_handler(handler); } - - pub fn send_ws_msg>(&self, msg: T) -> Result<(), UserError> { - let sender = self.ws_controller.sender()?; - sender.send_msg(msg)?; - Ok(()) - } } impl UserSession { diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index 971e63ab2f..89e3f5c5f0 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -6,18 +6,16 @@ use crate::{ }; use bytes::Bytes; use dashmap::DashMap; -use flowy_infra::{ - future::{wrap_future, FnFuture}, - retry::{Action, ExponentialBackoff, Retry}, -}; +use flowy_infra::retry::{Action, ExponentialBackoff, Retry}; use flowy_net::errors::ServerError; -use futures::future::BoxFuture; + use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_core::{ready, Stream}; use parking_lot::RwLock; use pin_project::pin_project; use std::{ convert::TryFrom, + fmt::Formatter, future::Future, pin::Pin, sync::Arc, @@ -45,6 +43,20 @@ pub enum WsState { Disconnected(WsError), } +impl std::fmt::Display for WsState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + WsState::Init => f.write_str("Init"), + WsState::Connected(_) => f.write_str("Connected"), + WsState::Disconnected(_) => f.write_str("Disconnected"), + } + } +} + +impl std::fmt::Debug for WsState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(&format!("{}", self)) } +} + pub struct WsController { handlers: Handlers, state_notify: Arc>,