diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 2c9571cd95..28b92efedb 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -54,11 +54,13 @@ sql-builder = "3.1.1" lazy_static = "1.4" tokio = { version = "1", features = ["full"] } parking_lot = "0.11" +md5 = "0.7.0" flowy-user = { path = "../rust-lib/flowy-user" } flowy-workspace = { path = "../rust-lib/flowy-workspace" } flowy-document = { path = "../rust-lib/flowy-document" } flowy-ws = { path = "../rust-lib/flowy-ws" } +flowy-ot = { path = "../rust-lib/flowy-ot" } flowy-net = { path = "../rust-lib/flowy-net", features = ["http_server"] } ormx = { version = "0.7", features = ["postgres"]} diff --git a/backend/src/service/doc/doc.rs b/backend/src/service/doc/doc.rs index 45dcd454f1..4ba0077f1e 100644 --- a/backend/src/service/doc/doc.rs +++ b/backend/src/service/doc/doc.rs @@ -23,10 +23,7 @@ pub(crate) async fn create_doc( Ok(()) } -pub(crate) async fn read_doc( - pool: &PgPool, - params: QueryDocParams, -) -> Result { +pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result { let doc_id = Uuid::parse_str(¶ms.doc_id)?; let mut transaction = pool .begin() @@ -50,7 +47,7 @@ pub(crate) async fn read_doc( .await .context("Failed to commit SQL transaction to read doc.")?; - FlowyResponse::success().pb(doc) + Ok(doc) } pub(crate) async fn update_doc( diff --git a/backend/src/service/doc/router.rs b/backend/src/service/doc/router.rs index 177aae14a6..a22ecab835 100644 --- a/backend/src/service/doc/router.rs +++ b/backend/src/service/doc/router.rs @@ -11,13 +11,15 @@ use crate::service::{ doc::{read_doc, update_doc}, util::parse_from_payload, }; +use flowy_net::response::FlowyResponse; pub async fn read_handler( payload: Payload, pool: Data, ) -> Result { let params: QueryDocParams = parse_from_payload(payload).await?; - let response = read_doc(pool.get_ref(), params).await?; + let doc = read_doc(pool.get_ref(), params).await?; + let response = FlowyResponse::success().pb(doc)?; Ok(response.into()) } diff --git a/backend/src/service/doc/ws_handler.rs b/backend/src/service/doc/ws_handler.rs index d39123efc1..9ba9d82e02 100644 --- a/backend/src/service/doc/ws_handler.rs +++ b/backend/src/service/doc/ws_handler.rs @@ -1,12 +1,14 @@ -use crate::service::{util::parse_from_bytes, ws::WsBizHandler}; +use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler}; use actix_web::web::Data; use bytes::Bytes; -use dashmap::DashMap; +use dashmap::{mapref::one::Ref, DashMap}; use flowy_document::{ - protobuf::{Revision, WsDataType, WsDocumentData}, + protobuf::{Doc, QueryDocParams, Revision, WsDataType, WsDocumentData}, services::doc::Document, }; -use parking_lot::RwLock; +use flowy_net::errors::{internal_error, ServerError}; +use flowy_ot::core::Delta; +use parking_lot::{RawRwLock, RwLock}; use protobuf::Message; use sqlx::PgPool; use std::sync::Arc; @@ -22,33 +24,103 @@ use std::sync::Arc; // WsDocumentData────▶WsMessage ────▶ Message ─────▶WsMessage ─────▶WsDocumentData pub struct DocWsBizHandler { + inner: Arc, +} + +struct Inner { pg_pool: Data, - edit_docs: DashMap>>, + edited_docs: DashMap>>, } impl DocWsBizHandler { pub fn new(pg_pool: Data) -> Self { Self { - edit_docs: DashMap::new(), - pg_pool, + inner: Arc::new(Inner { + edited_docs: DashMap::new(), + pg_pool, + }), } } } +async fn handle_document_data(inner: Arc, data: Bytes) -> Result<(), ServerError> { + let document_data: WsDocumentData = parse_from_bytes(&data)?; + match document_data.ty { + WsDataType::Command => {}, + WsDataType::Delta => { + let revision: Revision = parse_from_bytes(&document_data.data).unwrap(); + let edited_doc = get_edit_doc(inner, &revision.doc_id).await?; + let _ = edited_doc.write().apply_revision(revision)?; + }, + } + + Ok(()) +} + +async fn get_edit_doc( + inner: Arc, + doc_id: &str, +) -> Result>, ServerError> { + let pg_pool = inner.pg_pool.clone(); + + if let Some(doc) = inner.edited_docs.get(doc_id) { + return Ok(doc.clone()); + } + + let params = QueryDocParams { + doc_id: doc_id.to_string(), + ..Default::default() + }; + + let doc = read_doc(pg_pool.get_ref(), params).await?; + let edited_doc = Arc::new(RwLock::new(EditedDoc::new(doc)?)); + inner + .edited_docs + .insert(doc_id.to_string(), edited_doc.clone()); + Ok(edited_doc) +} + impl WsBizHandler for DocWsBizHandler { fn receive_data(&self, data: Bytes) { - let document_data: WsDocumentData = parse_from_bytes(&data).unwrap(); - match document_data.ty { - WsDataType::Command => {}, - WsDataType::Delta => { - let revision: Revision = parse_from_bytes(&document_data.data).unwrap(); - log::warn!("{:?}", revision); - }, - } + let inner = self.inner.clone(); + actix_rt::spawn(async { + let result = handle_document_data(inner, data).await; + match result { + Ok(_) => {}, + Err(e) => log::error!("WsBizHandler handle data error: {:?}", e), + } + }); } } -pub struct EditDoc { +struct EditedDoc { doc_id: String, document: Document, } + +impl EditedDoc { + fn new(doc: Doc) -> Result { + let delta = Delta::from_bytes(doc.data).map_err(internal_error)?; + let document = Document::from_delta(delta); + Ok(Self { + doc_id: doc.id.clone(), + document, + }) + } + + fn apply_revision(&mut self, revision: Revision) -> Result<(), ServerError> { + let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?; + let _ = self + .document + .apply_delta(delta.clone()) + .map_err(internal_error)?; + + let json = self.document.to_json(); + let md5 = format!("{:x}", md5::compute(json)); + if md5 != revision.md5 { + log::error!("Document conflict after apply delta {}", delta) + } + + Ok(()) + } +} diff --git a/rust-lib/dart-ffi/Cargo.toml b/rust-lib/dart-ffi/Cargo.toml index a3c26b0054..dbaf8eb045 100644 --- a/rust-lib/dart-ffi/Cargo.toml +++ b/rust-lib/dart-ffi/Cargo.toml @@ -7,11 +7,11 @@ edition = "2018" [lib] name = "dart_ffi" # this value will change depending on the target os -# for iOS it would be `cdylib` -# for Macos it would be `cdylib` +# for iOS it would be `rlib` +# for Macos it would be `rlib` # for android it would be `c-dylib` -# default cdylib -crate-type = ["cdylib"] +# default rlib +crate-type = ["rlib"] [dependencies] diff --git a/rust-lib/flowy-document/src/services/doc/document/document.rs b/rust-lib/flowy-document/src/services/doc/document/document.rs index 4d64a53c46..12a04e5fbf 100644 --- a/rust-lib/flowy-document/src/services/doc/document/document.rs +++ b/rust-lib/flowy-document/src/services/doc/document/document.rs @@ -53,10 +53,14 @@ impl Document { pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() } - pub fn apply_delta(&mut self, data: Bytes) -> Result<(), DocError> { + pub fn apply_delta_data(&mut self, data: Bytes) -> Result<(), DocError> { let new_delta = Delta::from_bytes(data.to_vec())?; - log::debug!("Apply delta: {}", new_delta); - let _ = self.add_delta(&new_delta)?; + self.apply_delta(new_delta) + } + + pub fn apply_delta(&mut self, delta: Delta) -> Result<(), DocError> { + log::debug!("Apply delta: {}", delta); + let _ = self.add_delta(&delta)?; log::debug!("Document: {}", self.to_json()); Ok(()) } diff --git a/rust-lib/flowy-document/src/services/doc/edit_context.rs b/rust-lib/flowy-document/src/services/doc/edit_context.rs index 8336c6929f..4796582ae5 100644 --- a/rust-lib/flowy-document/src/services/doc/edit_context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit_context.rs @@ -58,7 +58,9 @@ impl EditDocContext { let mut guard = self.document.write(); let base_rev_id = self.rev_counter.value(); let rev_id = self.rev_counter.next(); - let _ = guard.apply_delta(data.clone())?; + let delta = Delta::from_bytes(data.to_vec())?; + + let _ = guard.apply_delta(delta)?; let json = guard.to_json(); drop(guard); diff --git a/rust-lib/flowy-net/src/errors.rs b/rust-lib/flowy-net/src/errors.rs index a5088a1e56..c9cac3fb58 100644 --- a/rust-lib/flowy-net/src/errors.rs +++ b/rust-lib/flowy-net/src/errors.rs @@ -47,6 +47,13 @@ impl ServerError { pub fn is_unauthorized(&self) -> bool { self.code == ErrorCode::UserUnauthorized } } +pub fn internal_error(e: T) -> ServerError +where + T: std::fmt::Debug, +{ + ServerError::internal().context(e) +} + pub fn invalid_params(e: T) -> ServerError { ServerError::params_invalid().context(e) } impl std::fmt::Display for ServerError { diff --git a/rust-lib/flowy-ot/src/core/operation/operation_serde.rs b/rust-lib/flowy-ot/src/core/operation/operation_serde.rs index e2a28a75f2..f8e7fcc2e1 100644 --- a/rust-lib/flowy-ot/src/core/operation/operation_serde.rs +++ b/rust-lib/flowy-ot/src/core/operation/operation_serde.rs @@ -82,7 +82,9 @@ impl<'de> Deserialize<'de> for Operation { match operation { None => Err(de::Error::missing_field("operation")), Some(mut operation) => { - operation.set_attributes(attributes.unwrap_or(Attributes::default())); + if !operation.is_delete() { + operation.set_attributes(attributes.unwrap_or(Attributes::default())); + } Ok(operation) }, }