diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index 3e47ce53c8..7001214af4 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -68,6 +68,7 @@ impl ServerEditDoc { if cur_rev_id > rev_id { let doc_delta = self.document.read().delta().clone(); let cli_revision = self.mk_revision(rev_id, doc_delta); + log::debug!("Server push rev"); let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); user.socket.do_send(ws_cli_revision).map_err(internal_error)?; } @@ -98,6 +99,7 @@ impl ServerEditDoc { if cur_rev_id != revision.base_rev_id { // The server document is outdated, try to get the missing revision from the // client. + log::debug!("Server push rev"); user.socket .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) .map_err(internal_error)?; diff --git a/backend/tests/document/edit.rs b/backend/tests/document/edit.rs index f7599d7d9c..039c69ef31 100644 --- a/backend/tests/document/edit.rs +++ b/backend/tests/document/edit.rs @@ -45,7 +45,7 @@ async fn sync_open_empty_doc_and_sync_from_server_using_ws() { DocScript::OpenDoc, DocScript::SetServerDocument(json, 3), DocScript::ConnectWs, - DocScript::AssertClient(r#"[{"insert":"123\n\n"}]"#), + DocScript::AssertClient(r#"[{"insert":"\n123\n"}]"#), ]) .await; } @@ -62,8 +62,8 @@ async fn sync_open_non_empty_doc_and_sync_with_sever() { DocScript::SetServerDocument(json, 3), DocScript::SendText(0, "abc"), DocScript::ConnectWs, - DocScript::AssertClient(r#"[{"insert":"123\nabc\n"}]"#), - // DocScript::AssertServer(r#"[{"insert":"123\nabc\n"}]"#), + DocScript::AssertClient(r#"[{"insert":"abc\n123\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"abc\n123\n"}]"#), ]) .await; } diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index 40605ea030..f6a9bc28bc 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -39,7 +39,7 @@ impl std::default::Default for RevType { #[derive(Clone, Debug, ProtoBuf, Default)] pub struct RevId { #[pb(index = 1)] - inner: i64, + pub inner: i64, } impl AsRef for RevId { diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index 299ea718ef..a272cda42f 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -14,7 +14,7 @@ use crate::{ services::{ cache::DocCache, doc::{ - edit::ClientEditDoc, + edit::{ClientEditDoc, EditDocWsHandler}, revision::{DocRevision, RevisionServer}, }, server::Server, @@ -127,7 +127,8 @@ impl DocController { }); let edit_ctx = Arc::new(ClientEditDoc::new(doc_id, pool, ws, server, user).await?); - self.ws_manager.register_handler(doc_id, edit_ctx.clone()); + let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone())); + self.ws_manager.register_handler(doc_id, ws_handler); self.cache.set(edit_ctx.clone()); Ok(edit_ctx) } diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs index 3b0a326caa..17c96c416c 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_actor.rs @@ -1,17 +1,20 @@ use crate::{ - entities::doc::RevId, + entities::doc::{RevId, Revision}, errors::{internal_error, DocResult}, services::doc::{ - edit::{message::EditMsg, DocId}, + edit::{ + message::{EditMsg, TransformDeltas}, + DocId, + }, Document, }, sql_tables::{DocTableChangeset, DocTableSql}, }; use async_stream::stream; use flowy_database::ConnectionPool; -use flowy_ot::core::Delta; +use flowy_ot::core::{Delta, OperationTransformable}; use futures::stream::StreamExt; -use std::sync::Arc; +use std::{convert::TryFrom, sync::Arc}; use tokio::sync::{mpsc, RwLock}; pub struct DocumentEditActor { @@ -61,14 +64,21 @@ impl DocumentEditActor { async fn handle_message(&self, msg: EditMsg) -> DocResult<()> { match msg { EditMsg::Delta { delta, ret } => { - let result = self.document.write().await.compose_delta(&delta); - log::debug!( - "Compose push delta: {}. result: {}", - delta.to_json(), - self.document.read().await.to_json() - ); + let result = self.compose_delta(delta).await; let _ = ret.send(result); }, + EditMsg::RemoteRevision { bytes, ret } => { + let revision = Revision::try_from(bytes)?; + let delta = Delta::from_bytes(&revision.delta_data)?; + let rev_id: RevId = revision.rev_id.into(); + let (server_prime, client_prime) = self.document.read().await.delta().transform(&delta)?; + let transform_delta = TransformDeltas { + client_prime, + server_prime, + server_rev_id: rev_id, + }; + let _ = ret.send(Ok(transform_delta)); + }, EditMsg::Insert { index, data, ret } => { let delta = self.document.write().await.insert(index, data); let _ = ret.send(delta); @@ -115,6 +125,16 @@ impl DocumentEditActor { Ok(()) } + async fn compose_delta(&self, delta: Delta) -> DocResult<()> { + let result = self.document.write().await.compose_delta(&delta); + log::debug!( + "Compose push delta: {}. result: {}", + delta.to_json(), + self.document.read().await.to_json() + ); + result + } + #[tracing::instrument(level = "debug", skip(self, rev_id), err)] async fn save_to_disk(&self, rev_id: RevId) -> DocResult<()> { let data = self.document.read().await.to_json(); diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 077dead001..8bcea83c58 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -7,7 +7,11 @@ use crate::{ module::DocumentUser, services::{ doc::{ - edit::{edit_actor::DocumentEditActor, message::EditMsg}, + edit::{ + edit_actor::DocumentEditActor, + message::{EditMsg, TransformDeltas}, + model::NotifyOpenDocAction, + }, revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor}, UndoResult, }, @@ -16,6 +20,7 @@ use crate::{ }; use bytes::Bytes; use flowy_database::ConnectionPool; +use flowy_infra::retry::{ExponentialBackoff, Retry}; use flowy_ot::core::{Attribute, Delta, Interval}; use flowy_ws::WsState; use std::{convert::TryFrom, sync::Arc}; @@ -27,7 +32,9 @@ pub struct ClientEditDoc { pub doc_id: DocId, rev_manager: Arc, document: UnboundedSender, + ws: Arc, pool: Arc, + user: Arc, } impl ClientEditDoc { @@ -38,21 +45,23 @@ impl ClientEditDoc { server: Arc, user: Arc, ) -> DocResult { - let user_id = user.user_id()?; let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone()); let DocRevision { rev_id, delta } = fetch_document(rev_store.clone()).await?; - log::info!("😁 Document delta: {:?}", delta); - let rev_manager = Arc::new(RevisionManager::new(doc_id, &user_id, rev_id, ws, rev_store)); + let rev_manager = Arc::new(RevisionManager::new(doc_id, rev_id, rev_store)); let document = spawn_doc_edit_actor(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); - Ok(Self { + let edit_doc = Self { doc_id, rev_manager, document, pool, - }) + ws, + user, + }; + edit_doc.notify_open_doc(); + Ok(edit_doc) } pub async fn insert(&self, index: usize, data: T) -> Result<(), DocError> { @@ -146,7 +155,12 @@ impl ClientEditDoc { let (base_rev_id, rev_id) = self.rev_manager.next_rev_id(); let delta_data = delta_data.to_vec(); let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local); - let _ = self.rev_manager.add_revision(revision).await?; + let _ = self.rev_manager.add_revision(&revision).await?; + match self.ws.send(revision.into()) { + Ok(_) => {}, + Err(e) => log::error!("Send delta failed: {:?}", e), + }; + Ok(rev_id.into()) } @@ -169,39 +183,117 @@ impl ClientEditDoc { let _ = self.document.send(msg); rx.await.map_err(internal_error)? } + + #[tracing::instrument(level = "debug", skip(self))] + fn notify_open_doc(&self) { + let rev_id: RevId = self.rev_manager.rev_id().into(); + + if let Ok(user_id) = self.user.user_id() { + let action = NotifyOpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws); + let strategy = ExponentialBackoff::from_millis(50).take(3); + let retry = Retry::spawn(strategy, action); + tokio::spawn(async move { + match retry.await { + Ok(_) => {}, + Err(e) => log::error!("Notify open doc failed: {}", e), + } + }); + } + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> { + // Transform the revision + let (ret, rx) = oneshot::channel::>(); + let _ = self.document.send(EditMsg::RemoteRevision { bytes, ret }); + let TransformDeltas { + client_prime, + server_prime, + server_rev_id, + } = rx.await.map_err(internal_error)??; + + if self.rev_manager.rev_id() >= server_rev_id.0 { + // Ignore this push revision if local_rev_id >= server_rev_id + return Ok(()); + } + + // compose delta + let (ret, rx) = oneshot::channel::>(); + let msg = EditMsg::Delta { + delta: client_prime.clone(), + ret, + }; + let _ = self.document.send(msg); + let _ = rx.await.map_err(internal_error)??; + + // update rev id + self.rev_manager.update_rev_id(server_rev_id.clone().into()); + let (_, local_rev_id) = self.rev_manager.next_rev_id(); + + // save the revision + let revision = Revision::new( + server_rev_id.0, + local_rev_id, + client_prime.to_bytes().to_vec(), + &self.doc_id, + RevType::Remote, + ); + let _ = self.rev_manager.add_revision(&revision).await?; + + // send the server_prime delta + let revision = Revision::new( + server_rev_id.0, + local_rev_id, + server_prime.to_bytes().to_vec(), + &self.doc_id, + RevType::Remote, + ); + self.ws.send(revision.into()); + + save_document(self.document.clone(), local_rev_id.into()).await; + Ok(()) + } + + async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> { + let bytes = Bytes::from(doc_data.data); + match doc_data.ty { + WsDataType::PushRev => { + let _ = self.handle_push_rev(bytes).await?; + }, + WsDataType::PullRev => { + let range = RevisionRange::try_from(bytes)?; + let _ = self.rev_manager.send_revisions(range).await?; + }, + WsDataType::NewDocUser => {}, + WsDataType::Acked => { + let rev_id = RevId::try_from(bytes)?; + let _ = self.rev_manager.ack_rev(rev_id); + }, + WsDataType::Conflict => {}, + } + Ok(()) + } } -impl WsDocumentHandler for ClientEditDoc { - fn receive(&self, doc_data: WsDocumentData) { - let document = self.document.clone(); - let rev_manager = self.rev_manager.clone(); - let handle_ws_message = |doc_data: WsDocumentData| async move { - let bytes = Bytes::from(doc_data.data); - match doc_data.ty { - WsDataType::PushRev => { - let _ = handle_push_rev(bytes, rev_manager, document).await?; - }, - WsDataType::PullRev => { - let range = RevisionRange::try_from(bytes)?; - let _ = rev_manager.send_revisions(range).await?; - }, - WsDataType::NewDocUser => {}, - WsDataType::Acked => { - let rev_id = RevId::try_from(bytes)?; - let _ = rev_manager.ack_rev(rev_id); - }, - WsDataType::Conflict => {}, - } - Result::<(), DocError>::Ok(()) - }; +pub struct EditDocWsHandler(pub Arc); +impl WsDocumentHandler for EditDocWsHandler { + fn receive(&self, doc_data: WsDocumentData) { + let edit_doc = self.0.clone(); tokio::spawn(async move { - if let Err(e) = handle_ws_message(doc_data).await { + if let Err(e) = edit_doc.handle_ws_message(doc_data).await { log::error!("{:?}", e); } }); } - fn state_changed(&self, state: &WsState) { let _ = self.rev_manager.handle_ws_state_changed(state); } + + fn state_changed(&self, state: &WsState) { + match state { + WsState::Init => {}, + WsState::Connected(_) => self.0.notify_open_doc(), + WsState::Disconnected(_) => {}, + } + } } async fn save_document(document: UnboundedSender, rev_id: RevId) -> DocResult<()> { @@ -211,24 +303,6 @@ async fn save_document(document: UnboundedSender, rev_id: RevId) -> Doc result } -async fn handle_push_rev( - rev_bytes: Bytes, - rev_manager: Arc, - document: UnboundedSender, -) -> DocResult<()> { - let revision = Revision::try_from(rev_bytes)?; - let _ = rev_manager.add_revision(revision.clone()).await?; - - let delta = Delta::from_bytes(&revision.delta_data)?; - let (ret, rx) = oneshot::channel::>(); - let msg = EditMsg::Delta { delta, ret }; - let _ = document.send(msg); - let _ = rx.await.map_err(internal_error)??; - - save_document(document, revision.rev_id.into()).await; - Ok(()) -} - fn spawn_rev_store_actor( doc_id: &str, pool: Arc, diff --git a/rust-lib/flowy-document/src/services/doc/edit/message.rs b/rust-lib/flowy-document/src/services/doc/edit/message.rs index 6ea40668c5..06cd4afb9c 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/message.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/message.rs @@ -1,7 +1,8 @@ use crate::{errors::DocResult, services::doc::UndoResult}; use flowy_ot::core::{Attribute, Delta, Interval}; -use crate::entities::doc::RevId; +use crate::entities::doc::{RevId, Revision}; +use bytes::Bytes; use tokio::sync::oneshot; pub type Ret = oneshot::Sender>; @@ -10,6 +11,10 @@ pub enum EditMsg { delta: Delta, ret: Ret<()>, }, + RemoteRevision { + bytes: Bytes, + ret: Ret, + }, Insert { index: usize, data: String, @@ -50,3 +55,9 @@ pub enum EditMsg { ret: Ret<()>, }, } + +pub struct TransformDeltas { + pub client_prime: Delta, + pub server_prime: Delta, + pub server_rev_id: RevId, +} diff --git a/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/rust-lib/flowy-document/src/services/doc/edit/mod.rs index afb459ee6c..f5ba0015bb 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,5 +1,6 @@ mod edit_actor; mod edit_doc; mod message; +mod model; pub use edit_doc::*; diff --git a/rust-lib/flowy-document/src/services/doc/edit/model.rs b/rust-lib/flowy-document/src/services/doc/edit/model.rs new file mode 100644 index 0000000000..cd760628d1 --- /dev/null +++ b/rust-lib/flowy-document/src/services/doc/edit/model.rs @@ -0,0 +1,45 @@ +use crate::{ + entities::doc::{NewDocUser, RevId}, + errors::DocError, + services::ws::DocumentWebSocket, +}; +use flowy_infra::retry::Action; +use futures::future::BoxFuture; +use std::{future, sync::Arc}; + +pub(crate) struct NotifyOpenDocAction { + user_id: String, + rev_id: RevId, + doc_id: String, + ws: Arc, +} + +impl NotifyOpenDocAction { + pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { + Self { + user_id: user_id.to_owned(), + rev_id: rev_id.clone(), + doc_id: doc_id.to_owned(), + ws: ws.clone(), + } + } +} + +impl Action for NotifyOpenDocAction { + type Future = BoxFuture<'static, Result>; + type Item = (); + type Error = DocError; + + fn run(&mut self) -> Self::Future { + let new_doc_user = NewDocUser { + user_id: self.user_id.clone(), + rev_id: self.rev_id.clone().into(), + doc_id: self.doc_id.clone(), + }; + + match self.ws.send(new_doc_user.into()) { + Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))), + Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))), + } + } +} diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 5c83a0f0ad..934121b92a 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -2,10 +2,7 @@ use crate::{ entities::doc::{RevId, RevType, Revision, RevisionRange}, errors::{internal_error, DocError}, services::{ - doc::revision::{ - store_actor::{RevisionCmd, RevisionStoreActor}, - util::NotifyOpenDocAction, - }, + doc::revision::store_actor::{RevisionCmd, RevisionStoreActor}, util::RevIdCounter, ws::DocumentWebSocket, }, @@ -31,33 +28,22 @@ pub trait RevisionServer: Send + Sync { pub struct RevisionManager { doc_id: String, - user_id: String, rev_id_counter: RevIdCounter, - ws: Arc, rev_store: mpsc::Sender, } impl RevisionManager { - pub fn new( - doc_id: &str, - user_id: &str, - rev_id: RevId, - ws: Arc, - rev_store: mpsc::Sender, - ) -> Self { - notify_open_doc(&ws, user_id, doc_id, &rev_id); + pub fn new(doc_id: &str, rev_id: RevId, rev_store: mpsc::Sender) -> Self { let rev_id_counter = RevIdCounter::new(rev_id.into()); Self { doc_id: doc_id.to_string(), - user_id: user_id.to_string(), rev_id_counter, - ws, rev_store, } } #[tracing::instrument(level = "debug", skip(self))] - pub async fn add_revision(&self, revision: Revision) -> Result<(), DocError> { + pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> { let (ret, rx) = oneshot::channel(); let cmd = RevisionCmd::Revision { revision: revision.clone(), @@ -65,13 +51,6 @@ impl RevisionManager { }; let _ = self.rev_store.send(cmd).await; let result = rx.await.map_err(internal_error)?; - if result.is_ok() && revision.ty.is_local() { - match self.ws.send(revision.into()) { - Ok(_) => {}, - Err(e) => log::error!("Send delta failed: {:?}", e), - }; - } - result } @@ -91,6 +70,8 @@ impl RevisionManager { (cur, next) } + pub fn update_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); } + pub async fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> { debug_assert!(&range.doc_id == &self.doc_id); let (ret, rx) = oneshot::channel(); @@ -101,38 +82,4 @@ impl RevisionManager { unimplemented!() // Ok(()) } - - #[tracing::instrument( - level = "debug", - skip(self), - fields( - doc_id = %self.doc_id.clone(), - rev_id = %self.rev_id(), - ) - )] - pub fn handle_ws_state_changed(&self, state: &WsState) { - match state { - WsState::Init => {}, - WsState::Connected(_) => { - let rev_id: RevId = self.rev_id().into(); - notify_open_doc(&self.ws, &self.user_id, &self.doc_id, &rev_id); - }, - WsState::Disconnected(_) => {}, - } - } -} - -// FIXME: -// user_id may be invalid if the user switch to another account while -// theNotifyOpenDocAction is flying -fn notify_open_doc(ws: &Arc, user_id: &str, doc_id: &str, rev_id: &RevId) { - let action = NotifyOpenDocAction::new(user_id, doc_id, rev_id, ws); - let strategy = ExponentialBackoff::from_millis(50).take(3); - let retry = Retry::spawn(strategy, action); - tokio::spawn(async move { - match retry.await { - Ok(_) => {}, - Err(e) => log::error!("Notify open doc failed: {}", e), - } - }); } diff --git a/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/rust-lib/flowy-document/src/services/doc/revision/mod.rs index 49d14d5107..b45b634038 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/mod.rs @@ -1,6 +1,6 @@ mod manager; +mod model; mod store_actor; -mod util; pub use manager::*; pub use store_actor::*; diff --git a/rust-lib/flowy-document/src/services/doc/revision/util.rs b/rust-lib/flowy-document/src/services/doc/revision/model.rs similarity index 55% rename from rust-lib/flowy-document/src/services/doc/revision/util.rs rename to rust-lib/flowy-document/src/services/doc/revision/model.rs index 28bc19c4b5..987b14fcc2 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/util.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/model.rs @@ -49,40 +49,3 @@ impl std::ops::Deref for RevisionOperation { fn deref(&self) -> &Self::Target { &self.inner } } - -pub(crate) struct NotifyOpenDocAction { - user_id: String, - rev_id: RevId, - doc_id: String, - ws: Arc, -} - -impl NotifyOpenDocAction { - pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { - Self { - user_id: user_id.to_owned(), - rev_id: rev_id.clone(), - doc_id: doc_id.to_owned(), - ws: ws.clone(), - } - } -} - -impl Action for NotifyOpenDocAction { - type Future = BoxFuture<'static, Result>; - type Item = (); - type Error = DocError; - - fn run(&mut self) -> Self::Future { - let new_doc_user = NewDocUser { - user_id: self.user_id.clone(), - rev_id: self.rev_id.clone().into(), - doc_id: self.doc_id.clone(), - }; - - match self.ws.send(new_doc_user.into()) { - Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))), - Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))), - } - } -} diff --git a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs index d357b576b0..b58980d70a 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs @@ -1,7 +1,7 @@ use crate::{ entities::doc::{RevId, Revision, RevisionRange}, errors::{internal_error, DocError, DocResult}, - services::doc::revision::{util::RevisionOperation, DocRevision, RevisionServer}, + services::doc::revision::{model::RevisionOperation, DocRevision, RevisionServer}, sql_tables::{RevState, RevTableSql}, }; use async_stream::stream; diff --git a/rust-lib/flowy-document/src/services/util.rs b/rust-lib/flowy-document/src/services/util.rs index 2a588312aa..d20ac48cea 100644 --- a/rust-lib/flowy-document/src/services/util.rs +++ b/rust-lib/flowy-document/src/services/util.rs @@ -34,4 +34,6 @@ impl RevIdCounter { self.value() } pub fn value(&self) -> i64 { self.0.load(SeqCst) } + + pub fn set(&self, n: i64) { let _ = self.0.fetch_update(SeqCst, SeqCst, |_| Some(n)); } }