From cb4398eab08f35ed2cdace68d5ce734ffde81a37 Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 22 Dec 2021 18:53:52 +0800 Subject: [PATCH] config document kv store --- backend/migrations/20210909115140_doc.sql | 2 - backend/src/application.rs | 9 +- backend/src/context.rs | 42 ++- backend/src/entities/doc.rs | 20 -- backend/src/entities/mod.rs | 1 - backend/src/services/core/view/controller.rs | 7 +- backend/src/services/core/view/router.rs | 12 +- .../document/{manager.rs => controller.rs} | 59 ++-- backend/src/services/document/mod.rs | 11 +- .../src/services/document/persistence/kv.rs | 48 +++ .../src/services/document/persistence/mod.rs | 5 + .../{crud.rs => persistence/postgres.rs} | 70 +++-- backend/src/services/document/router.rs | 7 +- backend/src/services/document/ws_actor.rs | 18 +- backend/src/services/kv_store/kv.rs | 4 +- backend/src/services/web_socket/ws_client.rs | 4 +- backend/tests/api_test/kv_test.rs | 6 +- .../lib/protobuf/lib-ot/model.pb.dart | 41 +++ .../lib/protobuf/lib-ot/model.pbenum.dart | 4 +- .../lib/protobuf/lib-ot/model.pbjson.dart | 14 +- .../src/services/doc/revision/cache/cache.rs | 4 +- .../src/services/doc/revision/manager.rs | 2 +- .../src/sql_tables/doc/rev_table.rs | 4 +- .../flowy-net/src/services/mock/ws_mock.rs | 8 +- .../src/core/sync/server_editor.rs | 31 +- .../src/derive_cache/derive_cache.rs | 1 + shared-lib/lib-ot/src/protobuf/model/model.rs | 280 ++++++++++++++---- .../lib-ot/src/protobuf/proto/model.proto | 5 +- shared-lib/lib-ot/src/revision/model.rs | 22 +- 29 files changed, 503 insertions(+), 238 deletions(-) delete mode 100644 backend/src/entities/doc.rs rename backend/src/services/document/{manager.rs => controller.rs} (68%) create mode 100644 backend/src/services/document/persistence/kv.rs create mode 100644 backend/src/services/document/persistence/mod.rs rename backend/src/services/document/{crud.rs => persistence/postgres.rs} (74%) diff --git a/backend/migrations/20210909115140_doc.sql b/backend/migrations/20210909115140_doc.sql index 53bf662eb5..40782f9875 100644 --- a/backend/migrations/20210909115140_doc.sql +++ b/backend/migrations/20210909115140_doc.sql @@ -2,7 +2,5 @@ CREATE TABLE IF NOT EXISTS doc_table( id uuid NOT NULL, PRIMARY KEY (id), --- data bytea NOT NULL DEFAULT '', - data TEXT NOT NULL DEFAULT '', rev_id bigint NOT NULL DEFAULT 0 ); \ No newline at end of file diff --git a/backend/src/application.rs b/backend/src/application.rs index beb674a40d..ccf15a90ab 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -42,7 +42,7 @@ impl Application { pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result { let domain = domain(); let secret: String = secret(); - actix_rt::spawn(period_check(app_ctx.pg_pool.clone())); + actix_rt::spawn(period_check(app_ctx.persistence.pg_pool())); let server = HttpServer::new(move || { App::new() @@ -54,16 +54,17 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result) { +#[allow(dead_code)] +async fn period_check(_pool: PgPool) { let mut i = interval(Duration::from_secs(60)); loop { i.tick().await; diff --git a/backend/src/context.rs b/backend/src/context.rs index 9a3940a024..1d58e99c82 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -1,10 +1,11 @@ use crate::services::{ - document::manager::DocumentManager, kv_store::{KVStore, PostgresKV}, web_socket::{WSServer, WebSocketReceivers}, }; use actix::Addr; use actix_web::web::Data; + +use crate::services::document::{controller::make_document_ws_receiver, persistence::DocumentKVPersistence}; use lib_ws::WSModule; use sqlx::PgPool; use std::sync::Arc; @@ -12,30 +13,41 @@ use std::sync::Arc; #[derive(Clone)] pub struct AppContext { pub ws_server: Data>, - pub pg_pool: Data, + pub persistence: Data>, pub ws_receivers: Data, - pub document_mng: Data>, - pub kv_store: Data>, } impl AppContext { - pub fn new(ws_server: Addr, db_pool: PgPool) -> Self { + pub fn new(ws_server: Addr, pg_pool: PgPool) -> Self { let ws_server = Data::new(ws_server); - let pg_pool = Data::new(db_pool); - let mut ws_receivers = WebSocketReceivers::new(); - let document_mng = Arc::new(DocumentManager::new(pg_pool.clone())); - ws_receivers.set(WSModule::Doc, document_mng.clone()); - let kv_store = Arc::new(PostgresKV { - pg_pool: pg_pool.clone(), - }); + let kv_store = make_document_kv_store(pg_pool.clone()); + let persistence = Arc::new(FlowyPersistence { pg_pool, kv_store }); + + let document_ws_receiver = make_document_ws_receiver(persistence.clone()); + ws_receivers.set(WSModule::Doc, document_ws_receiver); AppContext { ws_server, - pg_pool, + persistence: Data::new(persistence), ws_receivers: Data::new(ws_receivers), - document_mng: Data::new(document_mng), - kv_store: Data::new(kv_store), } } } + +fn make_document_kv_store(pg_pool: PgPool) -> Arc { + let kv_impl = Arc::new(PostgresKV { pg_pool }); + Arc::new(DocumentKVPersistence::new(kv_impl)) +} + +#[derive(Clone)] +pub struct FlowyPersistence { + pg_pool: PgPool, + kv_store: Arc, +} + +impl FlowyPersistence { + pub fn pg_pool(&self) -> PgPool { self.pg_pool.clone() } + + pub fn kv_store(&self) -> Arc { self.kv_store.clone() } +} diff --git a/backend/src/entities/doc.rs b/backend/src/entities/doc.rs deleted file mode 100644 index a6d7b9fd03..0000000000 --- a/backend/src/entities/doc.rs +++ /dev/null @@ -1,20 +0,0 @@ -use flowy_collaboration::protobuf::Doc; - -pub(crate) const DOC_TABLE: &str = "doc_table"; - -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct DocTable { - pub(crate) id: uuid::Uuid, - pub(crate) data: String, - pub(crate) rev_id: i64, -} - -impl std::convert::From for Doc { - fn from(table: DocTable) -> Self { - let mut doc = Doc::new(); - doc.set_id(table.id.to_string()); - doc.set_data(table.data); - doc.set_rev_id(table.rev_id); - doc - } -} diff --git a/backend/src/entities/mod.rs b/backend/src/entities/mod.rs index 5e55cd9ee1..70409638ca 100644 --- a/backend/src/entities/mod.rs +++ b/backend/src/entities/mod.rs @@ -1,4 +1,3 @@ -pub mod doc; pub mod logged_user; pub mod token; pub mod user; diff --git a/backend/src/services/core/view/controller.rs b/backend/src/services/core/view/controller.rs index 900b3ad64a..549f06d67c 100644 --- a/backend/src/services/core/view/controller.rs +++ b/backend/src/services/core/view/controller.rs @@ -1,8 +1,9 @@ -use crate::services::document::{create_doc_with_transaction, delete_doc}; - use crate::{ entities::logged_user::LoggedUser, - services::core::{trash::read_trash_ids, view::persistence::*}, + services::{ + core::{trash::read_trash_ids, view::persistence::*}, + document::persistence::{create_doc_with_transaction, delete_doc}, + }, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; use backend_service::errors::{invalid_params, ServerError}; diff --git a/backend/src/services/core/view/router.rs b/backend/src/services/core/view/router.rs index 917f1ad392..3813bf7790 100644 --- a/backend/src/services/core/view/router.rs +++ b/backend/src/services/core/view/router.rs @@ -1,9 +1,6 @@ use crate::{ entities::logged_user::LoggedUser, - services::{ - core::view::{create_view, delete_view, persistence::check_view_ids, read_view, update_view}, - document::manager::DocumentManager, - }, + services::core::view::{create_view, delete_view, persistence::check_view_ids, read_view, update_view}, util::serde_ext::parse_from_payload, }; use actix_web::{ @@ -20,13 +17,8 @@ use flowy_core_data_model::{ protobuf::{CreateViewParams, QueryViewRequest, UpdateViewParams, ViewIdentifier}, }; use sqlx::PgPool; -use std::sync::Arc; -pub async fn create_handler( - payload: Payload, - pool: Data, - _doc_biz: Data>, -) -> Result { +pub async fn create_handler(payload: Payload, pool: Data) -> Result { let params: CreateViewParams = parse_from_payload(payload).await?; let mut transaction = pool .begin() diff --git a/backend/src/services/document/manager.rs b/backend/src/services/document/controller.rs similarity index 68% rename from backend/src/services/document/manager.rs rename to backend/src/services/document/controller.rs index 6888eab6b5..99327b7b92 100644 --- a/backend/src/services/document/manager.rs +++ b/backend/src/services/document/controller.rs @@ -1,46 +1,53 @@ use crate::services::{ document::{ - create_doc, - read_doc, - update_doc, + persistence::{create_doc, read_doc, update_doc}, ws_actor::{DocumentWebSocketActor, WSActorMessage}, }, web_socket::{WSClientData, WebSocketReceiver}, }; -use actix_web::web::Data; + use backend_service::errors::ServerError; + +use crate::context::FlowyPersistence; use flowy_collaboration::{ - core::sync::{ServerDocManager, ServerDocPersistence}, + core::sync::{DocumentPersistence, ServerDocumentManager}, entities::doc::Doc, errors::CollaborateError, protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams}, }; use lib_infra::future::FutureResultSend; use lib_ot::{revision::Revision, rich_text::RichTextDelta}; -use sqlx::PgPool; + use std::{convert::TryInto, sync::Arc}; use tokio::sync::{mpsc, oneshot}; -pub struct DocumentManager { - ws_sender: mpsc::Sender, - pg_pool: Data, +pub fn make_document_ws_receiver(persistence: Arc) -> Arc { + let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone())); + let document_manager = Arc::new(ServerDocumentManager::new(document_persistence)); + + let (ws_sender, rx) = tokio::sync::mpsc::channel(100); + let actor = DocumentWebSocketActor::new(rx, document_manager); + tokio::task::spawn(actor.run()); + + Arc::new(DocumentWebSocketReceiver::new(persistence, ws_sender)) } -impl DocumentManager { - pub fn new(pg_pool: Data) -> Self { - let inner = Arc::new(ServerDocManager::new(Arc::new(DocPersistenceImpl(pg_pool.clone())))); - let (ws_sender, rx) = mpsc::channel(100); - let actor = DocumentWebSocketActor::new(rx, inner); - tokio::task::spawn(actor.run()); - Self { ws_sender, pg_pool } +pub struct DocumentWebSocketReceiver { + ws_sender: mpsc::Sender, + persistence: Arc, +} + +impl DocumentWebSocketReceiver { + pub fn new(persistence: Arc, ws_sender: mpsc::Sender) -> Self { + Self { ws_sender, persistence } } } -impl WebSocketReceiver for DocumentManager { +impl WebSocketReceiver for DocumentWebSocketReceiver { fn receive(&self, data: WSClientData) { let (ret, rx) = oneshot::channel(); let sender = self.ws_sender.clone(); - let pool = self.pg_pool.clone(); + let pool = self.persistence.pg_pool(); actix_rt::spawn(async move { let msg = WSActorMessage::ClientData { @@ -60,10 +67,10 @@ impl WebSocketReceiver for DocumentManager { } } -struct DocPersistenceImpl(Data); -impl ServerDocPersistence for DocPersistenceImpl { +struct DocumentPersistenceImpl(Arc); +impl DocumentPersistence for DocumentPersistenceImpl { fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> { - let pg_pool = self.0.clone(); + let pg_pool = self.0.pg_pool(); let mut params = UpdateDocParams::new(); let doc_json = delta.to_json(); params.set_doc_id(doc_id.to_string()); @@ -71,7 +78,7 @@ impl ServerDocPersistence for DocPersistenceImpl { params.set_rev_id(rev_id); FutureResultSend::new(async move { - let _ = update_doc(pg_pool.get_ref(), params) + let _ = update_doc(&pg_pool, params) .await .map_err(server_error_to_collaborate_error)?; Ok(()) @@ -83,9 +90,9 @@ impl ServerDocPersistence for DocPersistenceImpl { doc_id: doc_id.to_string(), ..Default::default() }; - let pg_pool = self.0.clone(); + let pg_pool = self.0.pg_pool(); FutureResultSend::new(async move { - let mut pb_doc = read_doc(pg_pool.get_ref(), params) + let mut pb_doc = read_doc(&pg_pool, params) .await .map_err(server_error_to_collaborate_error)?; let doc = (&mut pb_doc) @@ -96,7 +103,7 @@ impl ServerDocPersistence for DocPersistenceImpl { } fn create_doc(&self, revision: Revision) -> FutureResultSend { - let pg_pool = self.0.clone(); + let pg_pool = self.0.pg_pool(); FutureResultSend::new(async move { let delta = RichTextDelta::from_bytes(&revision.delta_data)?; let doc_json = delta.to_json(); @@ -108,7 +115,7 @@ impl ServerDocPersistence for DocPersistenceImpl { cached_size: Default::default(), }; - let _ = create_doc(pg_pool.get_ref(), params) + let _ = create_doc(&pg_pool, params) .await .map_err(server_error_to_collaborate_error)?; let doc: Doc = revision.try_into()?; diff --git a/backend/src/services/document/mod.rs b/backend/src/services/document/mod.rs index 361de5c8b5..1c27265fce 100644 --- a/backend/src/services/document/mod.rs +++ b/backend/src/services/document/mod.rs @@ -1,9 +1,6 @@ #![allow(clippy::module_inception)] -pub(crate) use crud::*; -pub use router::*; - -pub mod crud; -pub mod manager; -pub mod router; -mod ws_actor; +pub(crate) mod controller; +pub(crate) mod persistence; +pub(crate) mod router; +pub(crate) mod ws_actor; diff --git a/backend/src/services/document/persistence/kv.rs b/backend/src/services/document/persistence/kv.rs new file mode 100644 index 0000000000..85eb85fc3b --- /dev/null +++ b/backend/src/services/document/persistence/kv.rs @@ -0,0 +1,48 @@ +use crate::{services::kv_store::KVStore, util::serde_ext::parse_from_bytes}; +use backend_service::errors::ServerError; +use bytes::Bytes; +use lib_ot::protobuf::{RepeatedRevision, Revision}; +use protobuf::Message; +use std::sync::Arc; + +pub struct DocumentKVPersistence { + inner: Arc, +} + +impl std::ops::Deref for DocumentKVPersistence { + type Target = Arc; + + fn deref(&self) -> &Self::Target { &self.inner } +} + +impl std::ops::DerefMut for DocumentKVPersistence { + fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } +} + +impl DocumentKVPersistence { + pub(crate) fn new(kv_store: Arc) -> Self { DocumentKVPersistence { inner: kv_store } } + + pub(crate) async fn set_revision(&self, revision: Revision) -> Result<(), ServerError> { + let key = revision.rev_id.to_string(); + let bytes = revision.write_to_bytes()?; + let _ = self.inner.set(&key, Bytes::from(bytes)).await?; + Ok(()) + } + + pub(crate) async fn batch_get_revisions(&self, rev_ids: Vec) -> Result { + let keys = rev_ids + .into_iter() + .map(|rev_id| rev_id.to_string()) + .collect::>(); + + let items = self.inner.batch_get(keys).await?; + let revisions = items + .into_iter() + .filter_map(|kv| parse_from_bytes::(&kv.value).ok()) + .collect::>(); + + let mut repeated_revision = RepeatedRevision::new(); + repeated_revision.set_items(revisions.into()); + Ok(repeated_revision) + } +} diff --git a/backend/src/services/document/persistence/mod.rs b/backend/src/services/document/persistence/mod.rs new file mode 100644 index 0000000000..2f958e7d67 --- /dev/null +++ b/backend/src/services/document/persistence/mod.rs @@ -0,0 +1,5 @@ +mod kv; +mod postgres; + +pub use kv::*; +pub use postgres::*; diff --git a/backend/src/services/document/crud.rs b/backend/src/services/document/persistence/postgres.rs similarity index 74% rename from backend/src/services/document/crud.rs rename to backend/src/services/document/persistence/postgres.rs index d003adbba2..a649ccd1c5 100644 --- a/backend/src/services/document/crud.rs +++ b/backend/src/services/document/persistence/postgres.rs @@ -1,20 +1,29 @@ use crate::{ - entities::doc::{DocTable, DOC_TABLE}, + services::kv_store::KVStore, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; use anyhow::Context; use backend_service::errors::ServerError; use flowy_collaboration::protobuf::{CreateDocParams, Doc, DocIdentifier, UpdateDocParams}; +use protobuf::Message; use sqlx::{postgres::PgArguments, PgPool, Postgres}; use uuid::Uuid; +const DOC_TABLE: &str = "doc_table"; + #[tracing::instrument(level = "debug", skip(transaction), err)] pub(crate) async fn create_doc_with_transaction( transaction: &mut DBTransaction<'_>, params: CreateDocParams, + // kv_store: Data>, ) -> Result<(), ServerError> { let uuid = Uuid::parse_str(¶ms.id)?; - let (sql, args) = NewDocSqlBuilder::new(uuid).data(params.data).build()?; + let (sql, args) = SqlBuilder::create(DOC_TABLE) + .add_field_with_arg("id", uuid) + .add_field_with_arg("rev_id", 0) + .build()?; + + // TODO kv let _ = sqlx::query_with(&sql, args) .execute(transaction) .await @@ -51,18 +60,20 @@ pub(crate) async fn read_doc(pool: &PgPool, params: DocIdentifier) -> Result(&sql, args) + let _table = sqlx::query_as_with::(&sql, args) .fetch_one(&mut transaction) .await - .map_err(map_sqlx_error)? - .into(); + .map_err(map_sqlx_error)?; - transaction - .commit() - .await - .context("Failed to commit SQL transaction to read document.")?; + // TODO: kv + panic!("") - Ok(doc) + // transaction + // .commit() + // .await + // .context("Failed to commit SQL transaction to read document.")?; + // + // Ok(doc) } #[tracing::instrument(level = "debug", skip(pool, params), fields(delta), err)] @@ -107,32 +118,17 @@ pub(crate) async fn delete_doc(transaction: &mut DBTransaction<'_>, doc_id: Uuid Ok(()) } -pub struct NewDocSqlBuilder { - table: DocTable, +#[derive(Debug, Clone, sqlx::FromRow)] +struct DocTable { + id: uuid::Uuid, + rev_id: i64, } -impl NewDocSqlBuilder { - pub fn new(id: Uuid) -> Self { - let table = DocTable { - id, - data: "".to_owned(), - rev_id: 0, - }; - Self { table } - } - - pub fn data(mut self, data: String) -> Self { - self.table.data = data; - self - } - - pub fn build(self) -> Result<(String, PgArguments), ServerError> { - let (sql, args) = SqlBuilder::create(DOC_TABLE) - .add_field_with_arg("id", self.table.id) - .add_field_with_arg("data", self.table.data) - .add_field_with_arg("rev_id", self.table.rev_id) - .build()?; - - Ok((sql, args)) - } -} +// impl std::convert::From for Doc { +// fn from(table: DocTable) -> Self { +// let mut doc = Doc::new(); +// doc.set_id(table.id.to_string()); +// doc.set_rev_id(table.rev_id); +// doc +// } +// } diff --git a/backend/src/services/document/router.rs b/backend/src/services/document/router.rs index 95f9a3146c..149ec1f487 100644 --- a/backend/src/services/document/router.rs +++ b/backend/src/services/document/router.rs @@ -1,10 +1,11 @@ -use crate::services::document::{create_doc, read_doc, update_doc}; +use crate::{ + services::document::persistence::{create_doc, read_doc, update_doc}, + util::serde_ext::parse_from_payload, +}; use actix_web::{ web::{Data, Payload}, HttpResponse, }; - -use crate::util::serde_ext::parse_from_payload; use backend_service::{errors::ServerError, response::FlowyResponse}; use flowy_collaboration::protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams}; use sqlx::PgPool; diff --git a/backend/src/services/document/ws_actor.rs b/backend/src/services/document/ws_actor.rs index 6594a0aa69..7357eb87c6 100644 --- a/backend/src/services/document/ws_actor.rs +++ b/backend/src/services/document/ws_actor.rs @@ -1,16 +1,16 @@ use crate::{ services::{ - document::update_doc, + document::persistence::update_doc, web_socket::{entities::Socket, WSClientData, WSMessageAdaptor, WSUser}, }, util::serde_ext::{md5, parse_from_bytes}, }; use actix_rt::task::spawn_blocking; -use actix_web::web::Data; + use async_stream::stream; use backend_service::errors::{internal_error, Result, ServerError}; use flowy_collaboration::{ - core::sync::{RevisionUser, ServerDocManager, SyncResponse}, + core::sync::{RevisionUser, ServerDocumentManager, SyncResponse}, protobuf::{DocumentWSData, DocumentWSDataType, NewDocumentUser, UpdateDocParams}, }; use futures::stream::StreamExt; @@ -22,18 +22,18 @@ use tokio::sync::{mpsc, oneshot}; pub enum WSActorMessage { ClientData { client_data: WSClientData, - pool: Data, + pool: PgPool, ret: oneshot::Sender>, }, } pub struct DocumentWebSocketActor { receiver: Option>, - doc_manager: Arc, + doc_manager: Arc, } impl DocumentWebSocketActor { - pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { + pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { Self { receiver: Some(receiver), doc_manager: manager, @@ -66,7 +66,7 @@ impl DocumentWebSocketActor { } } - async fn handle_client_data(&self, client_data: WSClientData, pg_pool: Data) -> Result<()> { + async fn handle_client_data(&self, client_data: WSClientData, pg_pool: PgPool) -> Result<()> { let WSClientData { user, socket, data } = client_data; let document_data = spawn_blocking(move || { let document_data: DocumentWSData = parse_from_bytes(&data)?; @@ -151,7 +151,7 @@ fn verify_md5(revision: &Revision) -> Result<()> { pub struct ServerDocUser { pub user: Arc, pub(crate) socket: Socket, - pub pg_pool: Data, + pub pg_pool: PgPool, } impl RevisionUser for ServerDocUser { @@ -182,7 +182,7 @@ impl RevisionUser for ServerDocUser { params.set_doc_id(doc_id); params.set_data(doc_json); params.set_rev_id(rev_id); - match update_doc(pg_pool.get_ref(), params).await { + match update_doc(&pg_pool, params).await { Ok(_) => {}, Err(e) => log::error!("{}", e), } diff --git a/backend/src/services/kv_store/kv.rs b/backend/src/services/kv_store/kv.rs index 138aa9cd6b..ffe7f3c415 100644 --- a/backend/src/services/kv_store/kv.rs +++ b/backend/src/services/kv_store/kv.rs @@ -2,7 +2,7 @@ use crate::{ services::kv_store::{KVStore, KeyValue}, util::sqlx_ext::{map_sqlx_error, SqlBuilder}, }; -use actix_web::web::Data; + use anyhow::Context; use backend_service::errors::ServerError; use bytes::Bytes; @@ -13,7 +13,7 @@ use sqlx::{postgres::PgArguments, Error, PgPool, Postgres, Row}; const KV_TABLE: &str = "kv_table"; pub(crate) struct PostgresKV { - pub(crate) pg_pool: Data, + pub(crate) pg_pool: PgPool, } impl KVStore for PostgresKV { diff --git a/backend/src/services/web_socket/ws_client.rs b/backend/src/services/web_socket/ws_client.rs index 7101e689d9..70021de6fe 100644 --- a/backend/src/services/web_socket/ws_client.rs +++ b/backend/src/services/web_socket/ws_client.rs @@ -29,7 +29,9 @@ impl std::default::Default for WebSocketReceivers { impl WebSocketReceivers { pub fn new() -> Self { WebSocketReceivers::default() } - pub fn set(&mut self, source: WSModule, handler: Arc) { self.inner.insert(source, handler); } + pub fn set(&mut self, source: WSModule, receiver: Arc) { + self.inner.insert(source, receiver); + } pub fn get(&self, source: &WSModule) -> Option> { self.inner.get(source).cloned() } } diff --git a/backend/tests/api_test/kv_test.rs b/backend/tests/api_test/kv_test.rs index bab205eabd..7b0491d0a6 100644 --- a/backend/tests/api_test/kv_test.rs +++ b/backend/tests/api_test/kv_test.rs @@ -5,7 +5,7 @@ use std::str; #[actix_rt::test] async fn kv_set_test() { let server = spawn_server().await; - let kv = server.app_ctx.kv_store.clone(); + let kv = server.app_ctx.persistence.kv_store(); let s1 = "123".to_string(); let key = "1"; @@ -18,7 +18,7 @@ async fn kv_set_test() { #[actix_rt::test] async fn kv_delete_test() { let server = spawn_server().await; - let kv = server.app_ctx.kv_store.clone(); + let kv = server.app_ctx.persistence.kv_store(); let s1 = "123".to_string(); let key = "1"; @@ -30,7 +30,7 @@ async fn kv_delete_test() { #[actix_rt::test] async fn kv_batch_set_test() { let server = spawn_server().await; - let kv = server.app_ctx.kv_store.clone(); + let kv = server.app_ctx.persistence.kv_store(); let kvs = vec![ KeyValue { key: "1".to_string(), diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart index 85a13ba547..7ade684beb 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart @@ -145,6 +145,47 @@ class Revision extends $pb.GeneratedMessage { void clearUserId() => clearField(7); } +class RepeatedRevision extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RepeatedRevision', createEmptyInstance: create) + ..pc(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'items', $pb.PbFieldType.PM, subBuilder: Revision.create) + ..hasRequiredFields = false + ; + + RepeatedRevision._() : super(); + factory RepeatedRevision({ + $core.Iterable? items, + }) { + final _result = create(); + if (items != null) { + _result.items.addAll(items); + } + return _result; + } + factory RepeatedRevision.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory RepeatedRevision.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + RepeatedRevision clone() => RepeatedRevision()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + RepeatedRevision copyWith(void Function(RepeatedRevision) updates) => super.copyWith((message) => updates(message as RepeatedRevision)) as RepeatedRevision; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static RepeatedRevision create() => RepeatedRevision._(); + RepeatedRevision createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static RepeatedRevision getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static RepeatedRevision? _defaultInstance; + + @$pb.TagNumber(1) + $core.List get items => $_getList(0); +} + class RevId extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevId', createEmptyInstance: create) ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'value') diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart index 633223586d..dea5bb4a27 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart @@ -26,11 +26,11 @@ class RevType extends $pb.ProtobufEnum { class RevState extends $pb.ProtobufEnum { static const RevState StateLocal = RevState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'StateLocal'); - static const RevState Acked = RevState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); + static const RevState Ack = RevState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Ack'); static const $core.List values = [ StateLocal, - Acked, + Ack, ]; static final $core.Map<$core.int, RevState> _byValue = $pb.ProtobufEnum.initByValue(values); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart index df78b21498..aeeff92034 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart @@ -24,12 +24,12 @@ const RevState$json = const { '1': 'RevState', '2': const [ const {'1': 'StateLocal', '2': 0}, - const {'1': 'Acked', '2': 1}, + const {'1': 'Ack', '2': 1}, ], }; /// Descriptor for `RevState`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List revStateDescriptor = $convert.base64Decode('CghSZXZTdGF0ZRIOCgpTdGF0ZUxvY2FsEAASCQoFQWNrZWQQAQ=='); +final $typed_data.Uint8List revStateDescriptor = $convert.base64Decode('CghSZXZTdGF0ZRIOCgpTdGF0ZUxvY2FsEAASBwoDQWNrEAE='); @$core.Deprecated('Use revisionDescriptor instead') const Revision$json = const { '1': 'Revision', @@ -46,6 +46,16 @@ const Revision$json = const { /// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSHQoKZGVsdGFfZGF0YRgDIAEoDFIJZGVsdGFEYXRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQSGAoCdHkYBiABKA4yCC5SZXZUeXBlUgJ0eRIXCgd1c2VyX2lkGAcgASgJUgZ1c2VySWQ='); +@$core.Deprecated('Use repeatedRevisionDescriptor instead') +const RepeatedRevision$json = const { + '1': 'RepeatedRevision', + '2': const [ + const {'1': 'items', '3': 1, '4': 3, '5': 11, '6': '.Revision', '10': 'items'}, + ], +}; + +/// Descriptor for `RepeatedRevision`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List repeatedRevisionDescriptor = $convert.base64Decode('ChBSZXBlYXRlZFJldmlzaW9uEh8KBWl0ZW1zGAEgAygLMgkuUmV2aXNpb25SBWl0ZW1z'); @$core.Deprecated('Use revIdDescriptor instead') const RevId$json = const { '1': 'RevId', diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs index b4c76e975a..ebef49031d 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs @@ -72,7 +72,7 @@ impl RevisionCache { let rev_id = revision.rev_id; let record = RevisionRecord { revision, - state: RevState::Acked, + state: RevState::Ack, }; self.memory_cache.add_revision(&record).await; let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); @@ -170,7 +170,7 @@ pub struct RevisionRecord { } impl RevisionRecord { - pub fn ack(&mut self) { self.state = RevState::Acked; } + pub fn ack(&mut self) { self.state = RevState::Ack; } } struct RevisionSyncSeq { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 736962a365..f2b4edbda4 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -145,7 +145,7 @@ impl RevisionLoader { Ok(_) => {}, Err(e) => tracing::error!("{}", e), }, - RevState::Acked => {}, + RevState::Ack => {}, } } revisions = records.into_iter().map(|record| record.revision).collect::<_>(); diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs index 732f4ae9a9..0723a826c0 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs @@ -50,7 +50,7 @@ impl std::convert::From for RevState { fn from(s: RevTableState) -> Self { match s { RevTableState::Local => RevState::StateLocal, - RevTableState::Acked => RevState::Acked, + RevTableState::Acked => RevState::Ack, } } } @@ -59,7 +59,7 @@ impl std::convert::From for RevTableState { fn from(s: RevState) -> Self { match s { RevState::StateLocal => RevTableState::Local, - RevState::Acked => RevTableState::Acked, + RevState::Ack => RevTableState::Acked, } } } diff --git a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs index c9acaca020..2fd9ae879e 100644 --- a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs +++ b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs @@ -2,7 +2,7 @@ use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WSConnectSt use bytes::Bytes; use dashmap::DashMap; use flowy_collaboration::{ - core::sync::{RevisionUser, ServerDocManager, ServerDocPersistence, SyncResponse}, + core::sync::{DocumentPersistence, RevisionUser, ServerDocumentManager, SyncResponse}, entities::{ doc::Doc, ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser}, @@ -96,13 +96,13 @@ lazy_static! { } struct MockDocServer { - pub manager: Arc, + pub manager: Arc, } impl std::default::Default for MockDocServer { fn default() -> Self { let persistence = Arc::new(MockDocServerPersistence::default()); - let manager = Arc::new(ServerDocManager::new(persistence)); + let manager = Arc::new(ServerDocumentManager::new(persistence)); MockDocServer { manager } } } @@ -160,7 +160,7 @@ impl std::default::Default for MockDocServerPersistence { } } -impl ServerDocPersistence for MockDocServerPersistence { +impl DocumentPersistence for MockDocServerPersistence { fn update_doc(&self, _doc_id: &str, _rev_id: i64, _delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> { unimplemented!() } diff --git a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs index 8d8bce6917..304ead351c 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs @@ -17,40 +17,19 @@ use tokio::{ task::spawn_blocking, }; -pub trait ServerDocPersistence: Send + Sync { +pub trait DocumentPersistence: Send + Sync { fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError>; fn read_doc(&self, doc_id: &str) -> FutureResultSend; fn create_doc(&self, revision: Revision) -> FutureResultSend; } -#[rustfmt::skip] -// ┌─────────────────┐ -// │ServerDocManager │ -// └─────────────────┘ -// │ 1 -// ▼ n -// ┌───────────────┐ -// │ OpenDocHandle │ -// └───────────────┘ -// │ -// ▼ -// ┌──────────────────┐ -// │ DocCommandQueue │ -// └──────────────────┘ -// │ ┌──────────────────────┐ ┌────────────┐ -// ▼ ┌────▶│ RevisionSynchronizer │────▶│ Document │ -// ┌────────────────┐ │ └──────────────────────┘ └────────────┘ -// │ServerDocEditor │─────┤ -// └────────────────┘ │ ┌────────┐ ┌────────────┐ -// └────▶│ Users │◆──────│RevisionUser│ -// └────────┘ └────────────┘ -pub struct ServerDocManager { +pub struct ServerDocumentManager { open_doc_map: DashMap>, - persistence: Arc, + persistence: Arc, } -impl ServerDocManager { - pub fn new(persistence: Arc) -> Self { +impl ServerDocumentManager { + pub fn new(persistence: Arc) -> Self { Self { open_doc_map: DashMap::new(), persistence, diff --git a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs index 5f7896e8b9..b06057421c 100644 --- a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -75,6 +75,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "WSError" | "WSMessage" | "Revision" + | "RepeatedRevision" | "RevId" | "RevisionRange" => TypeCategory::Protobuf, diff --git a/shared-lib/lib-ot/src/protobuf/model/model.rs b/shared-lib/lib-ot/src/protobuf/model/model.rs index 7453f9470c..ad9680bade 100644 --- a/shared-lib/lib-ot/src/protobuf/model/model.rs +++ b/shared-lib/lib-ot/src/protobuf/model/model.rs @@ -409,6 +409,172 @@ impl ::protobuf::reflect::ProtobufValue for Revision { } } +#[derive(PartialEq,Clone,Default)] +pub struct RepeatedRevision { + // message fields + pub items: ::protobuf::RepeatedField, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a RepeatedRevision { + fn default() -> &'a RepeatedRevision { + ::default_instance() + } +} + +impl RepeatedRevision { + pub fn new() -> RepeatedRevision { + ::std::default::Default::default() + } + + // repeated .Revision items = 1; + + + pub fn get_items(&self) -> &[Revision] { + &self.items + } + pub fn clear_items(&mut self) { + self.items.clear(); + } + + // Param is passed by value, moved + pub fn set_items(&mut self, v: ::protobuf::RepeatedField) { + self.items = v; + } + + // Mutable pointer to the field. + pub fn mut_items(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.items + } + + // Take field + pub fn take_items(&mut self) -> ::protobuf::RepeatedField { + ::std::mem::replace(&mut self.items, ::protobuf::RepeatedField::new()) + } +} + +impl ::protobuf::Message for RepeatedRevision { + fn is_initialized(&self) -> bool { + for v in &self.items { + if !v.is_initialized() { + return false; + } + }; + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.items)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + for value in &self.items { + let len = value.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + }; + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + for v in &self.items { + os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + }; + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> RepeatedRevision { + RepeatedRevision::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "items", + |m: &RepeatedRevision| { &m.items }, + |m: &mut RepeatedRevision| { &mut m.items }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "RepeatedRevision", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static RepeatedRevision { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(RepeatedRevision::new) + } +} + +impl ::protobuf::Clear for RepeatedRevision { + fn clear(&mut self) { + self.items.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for RepeatedRevision { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for RepeatedRevision { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + #[derive(PartialEq,Clone,Default)] pub struct RevId { // message fields @@ -843,7 +1009,7 @@ impl ::protobuf::reflect::ProtobufValue for RevType { #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum RevState { StateLocal = 0, - Acked = 1, + Ack = 1, } impl ::protobuf::ProtobufEnum for RevState { @@ -854,7 +1020,7 @@ impl ::protobuf::ProtobufEnum for RevState { fn from_i32(value: i32) -> ::std::option::Option { match value { 0 => ::std::option::Option::Some(RevState::StateLocal), - 1 => ::std::option::Option::Some(RevState::Acked), + 1 => ::std::option::Option::Some(RevState::Ack), _ => ::std::option::Option::None } } @@ -862,7 +1028,7 @@ impl ::protobuf::ProtobufEnum for RevState { fn values() -> &'static [Self] { static values: &'static [RevState] = &[ RevState::StateLocal, - RevState::Acked, + RevState::Ack, ]; values } @@ -896,57 +1062,63 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \x05revId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\ \n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01\ (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\ - \x12\x17\n\x07user_id\x18\x07\x20\x01(\tR\x06userId\"\x1d\n\x05RevId\x12\ - \x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\rRevisionRange\x12\ - \x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\n\x05start\x18\x02\ - \x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\x01(\x03R\x03end*\ - \x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Remote\x10\x01*%\n\ - \x08RevState\x12\x0e\n\nStateLocal\x10\0\x12\t\n\x05Acked\x10\x01J\x8b\ - \x07\n\x06\x12\x04\0\0\x1a\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\ - \x04\0\x12\x04\x02\0\n\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\ - \n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\ - \x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\ - \x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\ - \x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\ - \x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\ - \x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x19\n\x0c\n\ - \x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\ - \x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x17\x18\n\ - \x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\ - \x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\ - \n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\ - \x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\ - \n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\ - \x04\x03\x12\x03\x07\x14\x15\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\ - \x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\ - \x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\ - \x08\x11\x12\n\x0b\n\x04\x04\0\x02\x06\x12\x03\t\x04\x17\n\x0c\n\x05\x04\ - \0\x02\x06\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\0\x02\x06\x01\x12\x03\t\ - \x0b\x12\n\x0c\n\x05\x04\0\x02\x06\x03\x12\x03\t\x15\x16\n\n\n\x02\x04\ - \x01\x12\x04\x0b\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\x0b\x08\r\n\x0b\n\ - \x04\x04\x01\x02\0\x12\x03\x0c\x04\x14\n\x0c\n\x05\x04\x01\x02\0\x05\x12\ - \x03\x0c\x04\t\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0c\n\x0f\n\x0c\n\ - \x05\x04\x01\x02\0\x03\x12\x03\x0c\x12\x13\n\n\n\x02\x04\x02\x12\x04\x0e\ - \0\x12\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0e\x08\x15\n\x0b\n\x04\x04\x02\ - \x02\0\x12\x03\x0f\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\ - \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0f\x0b\x11\n\x0c\n\x05\x04\x02\ - \x02\0\x03\x12\x03\x0f\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x10\ - \x04\x14\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x10\x04\t\n\x0c\n\x05\ - \x04\x02\x02\x01\x01\x12\x03\x10\n\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\ - \x12\x03\x10\x12\x13\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x11\x04\x12\n\ - \x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x11\x04\t\n\x0c\n\x05\x04\x02\x02\ - \x02\x01\x12\x03\x11\n\r\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x11\x10\ - \x11\n\n\n\x02\x05\0\x12\x04\x13\0\x16\x01\n\n\n\x03\x05\0\x01\x12\x03\ - \x13\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x14\x04\x0e\n\x0c\n\x05\x05\ - \0\x02\0\x01\x12\x03\x14\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\ - \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x15\x04\x0f\n\x0c\n\x05\x05\0\ - \x02\x01\x01\x12\x03\x15\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x15\ - \r\x0e\n\n\n\x02\x05\x01\x12\x04\x17\0\x1a\x01\n\n\n\x03\x05\x01\x01\x12\ - \x03\x17\x05\r\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x18\x04\x13\n\x0c\n\x05\ - \x05\x01\x02\0\x01\x12\x03\x18\x04\x0e\n\x0c\n\x05\x05\x01\x02\0\x02\x12\ - \x03\x18\x11\x12\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x19\x04\x0e\n\x0c\n\ - \x05\x05\x01\x02\x01\x01\x12\x03\x19\x04\t\n\x0c\n\x05\x05\x01\x02\x01\ - \x02\x12\x03\x19\x0c\rb\x06proto3\ + \x12\x17\n\x07user_id\x18\x07\x20\x01(\tR\x06userId\"3\n\x10RepeatedRevi\ + sion\x12\x1f\n\x05items\x18\x01\x20\x03(\x0b2\t.RevisionR\x05items\"\x1d\ + \n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\rRevi\ + sionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\n\x05\ + start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\x01(\ + \x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Remote\ + \x10\x01*#\n\x08RevState\x12\x0e\n\nStateLocal\x10\0\x12\x07\n\x03Ack\ + \x10\x01J\xe8\x07\n\x06\x12\x04\0\0\x1d\x01\n\x08\n\x01\x0c\x12\x03\0\0\ + \x12\n\n\n\x02\x04\0\x12\x04\x02\0\n\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\ + \x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\ + \x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\ + \x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\ + \x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\ + \x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\ + \x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\ + \x04\x19\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\ + \0\x02\x02\x01\x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\ + \x05\x17\x18\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\ + \x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\ + \x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\ + \n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\x05\ + \x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\n\ + \x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\n\x04\x04\0\x02\ + \x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\ + \x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\ + \x02\x05\x03\x12\x03\x08\x11\x12\n\x0b\n\x04\x04\0\x02\x06\x12\x03\t\x04\ + \x17\n\x0c\n\x05\x04\0\x02\x06\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\0\x02\ + \x06\x01\x12\x03\t\x0b\x12\n\x0c\n\x05\x04\0\x02\x06\x03\x12\x03\t\x15\ + \x16\n\n\n\x02\x04\x01\x12\x04\x0b\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\ + \x0b\x08\x18\n\x0b\n\x04\x04\x01\x02\0\x12\x03\x0c\x04\x20\n\x0c\n\x05\ + \x04\x01\x02\0\x04\x12\x03\x0c\x04\x0c\n\x0c\n\x05\x04\x01\x02\0\x06\x12\ + \x03\x0c\r\x15\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0c\x16\x1b\n\x0c\n\ + \x05\x04\x01\x02\0\x03\x12\x03\x0c\x1e\x1f\n\n\n\x02\x04\x02\x12\x04\x0e\ + \0\x10\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0e\x08\r\n\x0b\n\x04\x04\x02\ + \x02\0\x12\x03\x0f\x04\x14\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\ + \t\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\ + \x02\0\x03\x12\x03\x0f\x12\x13\n\n\n\x02\x04\x03\x12\x04\x11\0\x15\x01\n\ + \n\n\x03\x04\x03\x01\x12\x03\x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\ + \x03\x12\x04\x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\ + \x05\x04\x03\x02\0\x01\x12\x03\x12\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\ + \x12\x03\x12\x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x14\n\ + \x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x13\x04\t\n\x0c\n\x05\x04\x03\x02\ + \x01\x01\x12\x03\x13\n\x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\ + \x12\x13\n\x0b\n\x04\x04\x03\x02\x02\x12\x03\x14\x04\x12\n\x0c\n\x05\x04\ + \x03\x02\x02\x05\x12\x03\x14\x04\t\n\x0c\n\x05\x04\x03\x02\x02\x01\x12\ + \x03\x14\n\r\n\x0c\n\x05\x04\x03\x02\x02\x03\x12\x03\x14\x10\x11\n\n\n\ + \x02\x05\0\x12\x04\x16\0\x19\x01\n\n\n\x03\x05\0\x01\x12\x03\x16\x05\x0c\ + \n\x0b\n\x04\x05\0\x02\0\x12\x03\x17\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\ + \x12\x03\x17\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x17\x0c\r\n\x0b\n\ + \x04\x05\0\x02\x01\x12\x03\x18\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x01\x12\ + \x03\x18\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x18\r\x0e\n\n\n\x02\ + \x05\x01\x12\x04\x1a\0\x1d\x01\n\n\n\x03\x05\x01\x01\x12\x03\x1a\x05\r\n\ + \x0b\n\x04\x05\x01\x02\0\x12\x03\x1b\x04\x13\n\x0c\n\x05\x05\x01\x02\0\ + \x01\x12\x03\x1b\x04\x0e\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x1b\x11\ + \x12\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x1c\x04\x0c\n\x0c\n\x05\x05\x01\ + \x02\x01\x01\x12\x03\x1c\x04\x07\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\ + \x1c\n\x0bb\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/lib-ot/src/protobuf/proto/model.proto b/shared-lib/lib-ot/src/protobuf/proto/model.proto index cc9e9da777..86a5494700 100644 --- a/shared-lib/lib-ot/src/protobuf/proto/model.proto +++ b/shared-lib/lib-ot/src/protobuf/proto/model.proto @@ -9,6 +9,9 @@ message Revision { RevType ty = 6; string user_id = 7; } +message RepeatedRevision { + repeated Revision items = 1; +} message RevId { int64 value = 1; } @@ -23,5 +26,5 @@ enum RevType { } enum RevState { StateLocal = 0; - Acked = 1; + Ack = 1; } diff --git a/shared-lib/lib-ot/src/revision/model.rs b/shared-lib/lib-ot/src/revision/model.rs index 5ca284cc63..de50649b41 100644 --- a/shared-lib/lib-ot/src/revision/model.rs +++ b/shared-lib/lib-ot/src/revision/model.rs @@ -92,6 +92,26 @@ impl Revision { } } +#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] +pub struct RepeatedRevision { + #[pb(index = 1)] + pub items: Vec, +} + +impl std::ops::Deref for RepeatedRevision { + type Target = Vec; + + fn deref(&self) -> &Self::Target { &self.items } +} + +impl std::ops::DerefMut for RepeatedRevision { + fn deref_mut(&mut self) -> &mut Self::Target { &mut self.items } +} + +impl RepeatedRevision { + pub fn into_inner(self) -> Vec { self.items } +} + #[derive(Clone, Debug, ProtoBuf, Default)] pub struct RevId { #[pb(index = 1)] @@ -167,5 +187,5 @@ pub fn md5>(data: T) -> String { #[derive(Debug, Clone, Eq, PartialEq)] pub enum RevState { StateLocal = 0, - Acked = 1, + Ack = 1, }