From 15d628c7503564b1f9f1b26db94526e37b425945 Mon Sep 17 00:00:00 2001 From: appflowy Date: Sun, 3 Oct 2021 21:29:56 +0800 Subject: [PATCH] push rev if client doc is outdated --- backend/src/service/doc/edit/edit_actor.rs | 19 +++- backend/src/service/doc/edit/edit_doc.rs | 107 ++++++++++++-------- backend/src/service/doc/edit/interval.rs | 57 +++++++++++ backend/src/service/doc/edit/mod.rs | 2 + backend/src/service/doc/edit/open_handle.rs | 16 ++- backend/src/service/doc/ws_actor.rs | 41 +++++--- 6 files changed, 184 insertions(+), 58 deletions(-) create mode 100644 backend/src/service/doc/edit/interval.rs diff --git a/backend/src/service/doc/edit/edit_actor.rs b/backend/src/service/doc/edit/edit_actor.rs index 94455f4c44..9deef21ada 100644 --- a/backend/src/service/doc/edit/edit_actor.rs +++ b/backend/src/service/doc/edit/edit_actor.rs @@ -35,6 +35,12 @@ pub enum EditMsg { DocumentJson { ret: oneshot::Sender>, }, + NewDocUser { + user: Arc, + socket: Socket, + rev_id: i64, + ret: oneshot::Sender>, + }, } pub struct EditDocActor { @@ -78,7 +84,6 @@ impl EditDocActor { revision, ret, } => { - // ret.send(self.handle_client_data(client_data, pool).await); let user = EditUser { user: user.clone(), socket: socket.clone(), @@ -92,6 +97,18 @@ impl EditDocActor { .map_err(internal_error); let _ = ret.send(json); }, + EditMsg::NewDocUser { + user, + socket, + rev_id, + ret, + } => { + let user = EditUser { + user: user.clone(), + socket: socket.clone(), + }; + let _ = ret.send(self.edit_doc.new_connection(user, rev_id, self.pg_pool.clone()).await); + }, } } } diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index ffa9c78dd9..3e4e822240 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -5,6 +5,7 @@ use crate::service::{ }; use actix_web::web::Data; +use crate::service::doc::edit::interval::Interval; use bytes::Bytes; use dashmap::DashMap; use flowy_document::{ @@ -22,6 +23,7 @@ use parking_lot::RwLock; use protobuf::Message; use sqlx::PgPool; use std::{ + cmp::Ordering, convert::TryInto, sync::{ atomic::{AtomicI64, Ordering::SeqCst}, @@ -52,6 +54,28 @@ impl ServerEditDoc { pub fn document_json(&self) -> String { self.document.read().to_json() } + pub async fn new_connection(&self, user: EditUser, rev_id: i64, _pg_pool: Data) -> Result<(), ServerError> { + self.users.insert(user.id(), user.clone()); + let cur_rev_id = self.rev_id.load(SeqCst); + if cur_rev_id > rev_id { + let doc_delta = self.document.read().delta().clone(); + let cli_revision = self.mk_revision(rev_id, doc_delta); + let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); + user.socket.do_send(ws_cli_revision).map_err(internal_error)?; + } + + Ok(()) + } + + #[tracing::instrument( + level = "debug", + skip(self, user, pg_pool), + fields( + rev_id = %self.rev_id.load(SeqCst), + revision_rev_id = %revision.rev_id, + revision_base_rev_id = %revision.base_rev_id + ) + )] pub async fn apply_revision( &self, user: EditUser, @@ -60,49 +84,48 @@ impl ServerEditDoc { ) -> Result<(), ServerError> { // Opti: find out another way to keep the user socket available. self.users.insert(user.id(), user.clone()); - log::debug!( - "cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}", - self.rev_id.load(SeqCst), - revision.base_rev_id, - revision.rev_id - ); - let cur_rev_id = self.rev_id.load(SeqCst); - if cur_rev_id > revision.rev_id { - // The client document is outdated. Transform the client revision delta and then - // send the prime delta to the client. Client should compose the this prime - // delta. - - let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?; - let _ = self.update_document_delta(server_prime)?; - - log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json()); - let cli_revision = self.mk_revision(revision.rev_id, cli_prime); - let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); - user.socket.do_send(ws_cli_revision).map_err(internal_error)?; - Ok(()) - } else if cur_rev_id < revision.rev_id { - if cur_rev_id != revision.base_rev_id { - // The server document is outdated, try to get the missing revision from the - // client. - user.socket - .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) - .map_err(internal_error)?; - } else { - let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?; - let _ = self.update_document_delta(delta)?; - user.socket - .do_send(mk_acked_ws_message(&revision)) - .map_err(internal_error)?; - self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); - let _ = self.save_revision(&revision, pg_pool).await?; - } - - Ok(()) - } else { - log::error!("Client rev_id should not equal to server rev_id"); - Ok(()) + match cur_rev_id.cmp(&revision.rev_id) { + Ordering::Less => { + if cur_rev_id != revision.base_rev_id { + // The server document is outdated, try to get the missing revision from the + // client. + user.socket + .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) + .map_err(internal_error)?; + } else { + let _ = self.compose_revision(&revision, pg_pool).await?; + user.socket + .do_send(mk_acked_ws_message(&revision)) + .map_err(internal_error)?; + } + }, + Ordering::Equal => {}, + Ordering::Greater => { + // The client document is outdated. Transform the client revision delta and then + // send the prime delta to the client. Client should compose the this prime + // delta. + let cli_revision = self.transform_client_revision(&revision)?; + let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); + user.socket.do_send(ws_cli_revision).map_err(internal_error)?; + }, } + Ok(()) + } + + async fn compose_revision(&self, revision: &Revision, pg_pool: Data) -> Result<(), ServerError> { + let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?; + let _ = self.compose_delta(delta)?; + let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); + let _ = self.save_revision(&revision, pg_pool).await?; + Ok(()) + } + + fn transform_client_revision(&self, revision: &Revision) -> Result { + let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?; + let _ = self.compose_delta(server_prime)?; + let cli_revision = self.mk_revision(revision.rev_id, cli_prime); + Ok(cli_revision) } fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision { @@ -133,7 +156,7 @@ impl ServerEditDoc { } #[tracing::instrument(level = "debug", skip(self), err)] - fn update_document_delta(&self, delta: Delta) -> Result<(), ServerError> { + fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> { // Opti: push each revision into queue and process it one by one. match self.document.try_write_for(Duration::from_millis(300)) { None => { diff --git a/backend/src/service/doc/edit/interval.rs b/backend/src/service/doc/edit/interval.rs new file mode 100644 index 0000000000..21d0342466 --- /dev/null +++ b/backend/src/service/doc/edit/interval.rs @@ -0,0 +1,57 @@ +use std::cmp::{max, min}; + +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct Interval { + pub start: i64, + pub end: i64, +} + +impl Interval { + /// Construct a new `Interval` representing the range [start..end). + /// It is an invariant that `start <= end`. + pub fn new(start: i64, end: i64) -> Interval { + debug_assert!(start <= end); + Interval { start, end } + } + + pub fn start(&self) -> i64 { self.start } + + pub fn end(&self) -> i64 { self.end } + + pub fn is_before(&self, val: i64) -> bool { self.end <= val } + + pub fn contains(&self, val: i64) -> bool { self.start <= val && val < self.end } + + pub fn contains_range(&self, start: i64, end: i64) -> bool { !self.intersect(Interval::new(start, end)).is_empty() } + + pub fn is_after(&self, val: i64) -> bool { self.start > val } + + pub fn is_empty(&self) -> bool { self.end <= self.start } + + pub fn intersect(&self, other: Interval) -> Interval { + let start = max(self.start, other.start); + let end = min(self.end, other.end); + Interval { + start, + end: max(start, end), + } + } + + // the first half of self - other + pub fn prefix(&self, other: Interval) -> Interval { + Interval { + start: min(self.start, other.start), + end: min(self.end, other.start), + } + } + + // the second half of self - other + pub fn suffix(&self, other: Interval) -> Interval { + Interval { + start: max(self.start, other.end), + end: max(self.end, other.end), + } + } + + pub fn size(&self) -> i64 { self.end - self.start } +} diff --git a/backend/src/service/doc/edit/mod.rs b/backend/src/service/doc/edit/mod.rs index 7c279b1ac8..8fda81fea4 100644 --- a/backend/src/service/doc/edit/mod.rs +++ b/backend/src/service/doc/edit/mod.rs @@ -1,6 +1,8 @@ mod edit_actor; mod edit_doc; +mod interval; mod open_handle; +pub use edit_actor::*; pub use edit_doc::*; pub use open_handle::*; diff --git a/backend/src/service/doc/edit/open_handle.rs b/backend/src/service/doc/edit/open_handle.rs index ec85e45a43..2693e962d3 100644 --- a/backend/src/service/doc/edit/open_handle.rs +++ b/backend/src/service/doc/edit/open_handle.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; pub struct DocHandle { - sender: mpsc::Sender, + pub sender: mpsc::Sender, } impl DocHandle { @@ -22,6 +22,18 @@ impl DocHandle { Ok(Self { sender }) } + pub async fn handle_new_user(&self, user: Arc, rev_id: i64, socket: Socket) -> Result<(), ServerError> { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::NewDocUser { + user, + socket, + rev_id, + ret, + }; + let _ = self.send(msg, rx).await?; + Ok(()) + } + #[tracing::instrument(level = "debug", skip(self, user, socket, revision))] pub async fn apply_revision( &self, @@ -46,7 +58,7 @@ impl DocHandle { self.send(msg, rx).await? } - async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { + pub(crate) async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { let _ = self.sender.send(msg).await.map_err(internal_error)?; let result = rx.await?; Ok(result) diff --git a/backend/src/service/doc/ws_actor.rs b/backend/src/service/doc/ws_actor.rs index 5d8565b960..fdd815204e 100644 --- a/backend/src/service/doc/ws_actor.rs +++ b/backend/src/service/doc/ws_actor.rs @@ -1,5 +1,5 @@ use crate::service::{ - doc::doc::DocManager, + doc::{doc::DocManager, edit::DocHandle}, util::{md5, parse_from_bytes}, ws::{entities::Socket, WsClientData, WsUser}, }; @@ -74,21 +74,29 @@ impl DocWsActor { match document_data.ty { WsDataType::Acked => Ok(()), WsDataType::PushRev => self.handle_push_rev(user, socket, data, pool).await, - WsDataType::NewDocUser => self.handle_new_doc_user(socket, data).await, + WsDataType::NewDocUser => self.handle_new_doc_user(user, socket, data, pool).await, WsDataType::PullRev => Ok(()), WsDataType::Conflict => Ok(()), } } - async fn handle_new_doc_user(&self, _socket: Socket, data: Vec) -> DocResult<()> { - let _user = spawn_blocking(move || { + async fn handle_new_doc_user( + &self, + user: Arc, + socket: Socket, + data: Vec, + pool: Data, + ) -> DocResult<()> { + let doc_user = spawn_blocking(move || { let user: NewDocUser = parse_from_bytes(&data)?; DocResult::Ok(user) }) .await .map_err(internal_error)??; - - unimplemented!() + if let Some(handle) = self.doc_handle(&doc_user.doc_id, pool).await { + handle.handle_new_user(user, doc_user.rev_id, socket).await?; + } + Ok(()) } async fn handle_push_rev( @@ -105,15 +113,22 @@ impl DocWsActor { }) .await .map_err(internal_error)??; + if let Some(handle) = self.doc_handle(&revision.doc_id, pool).await { + handle.apply_revision(user, socket, revision).await?; + } + Ok(()) + } - match self.doc_manager.get(&revision.doc_id, pool).await? { - Some(edit_doc) => { - edit_doc.apply_revision(user, socket, revision).await?; - Ok(()) + async fn doc_handle(&self, doc_id: &str, pool: Data) -> Option> { + match self.doc_manager.get(doc_id, pool).await { + Ok(Some(edit_doc)) => Some(edit_doc), + Ok(None) => { + log::error!("Document with id: {} not exist", doc_id); + None }, - None => { - log::error!("Document with id: {} not exist", &revision.doc_id); - Ok(()) + Err(e) => { + log::error!("Get doc handle failed: {:?}", e); + None }, } }