From f84fb6f87fdee10a35a47ae66d131e6d33a15d83 Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 6 Oct 2021 23:21:57 +0800 Subject: [PATCH] [server]: fix pull rev id range bug --- .../protobuf/flowy-document/revision.pb.dart | 32 ++-- .../flowy-document/revision.pbjson.dart | 6 +- backend/src/service/doc/crud.rs | 4 +- backend/src/service/doc/edit/edit_doc.rs | 34 ++-- .../src/entities/doc/revision.rs | 41 +++-- .../src/protobuf/model/revision.rs | 150 +++++++++--------- .../src/protobuf/proto/revision.proto | 4 +- .../src/services/doc/edit/edit_doc.rs | 48 +++--- .../src/services/doc/edit/model.rs | 6 +- .../src/services/doc/revision/manager.rs | 4 +- .../src/services/doc/revision/store_actor.rs | 15 +- .../src/sql_tables/doc/rev_sql.rs | 4 +- 12 files changed, 185 insertions(+), 163 deletions(-) diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart index c6eeaed0fd..979820558d 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart @@ -181,26 +181,26 @@ class Revision extends $pb.GeneratedMessage { class RevisionRange extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') - ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'fromRevId') - ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'toRevId') + ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'start') + ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'end') ..hasRequiredFields = false ; RevisionRange._() : super(); factory RevisionRange({ $core.String? docId, - $fixnum.Int64? fromRevId, - $fixnum.Int64? toRevId, + $fixnum.Int64? start, + $fixnum.Int64? end, }) { final _result = create(); if (docId != null) { _result.docId = docId; } - if (fromRevId != null) { - _result.fromRevId = fromRevId; + if (start != null) { + _result.start = start; } - if (toRevId != null) { - _result.toRevId = toRevId; + if (end != null) { + _result.end = end; } return _result; } @@ -235,21 +235,21 @@ class RevisionRange extends $pb.GeneratedMessage { void clearDocId() => clearField(1); @$pb.TagNumber(2) - $fixnum.Int64 get fromRevId => $_getI64(1); + $fixnum.Int64 get start => $_getI64(1); @$pb.TagNumber(2) - set fromRevId($fixnum.Int64 v) { $_setInt64(1, v); } + set start($fixnum.Int64 v) { $_setInt64(1, v); } @$pb.TagNumber(2) - $core.bool hasFromRevId() => $_has(1); + $core.bool hasStart() => $_has(1); @$pb.TagNumber(2) - void clearFromRevId() => clearField(2); + void clearStart() => clearField(2); @$pb.TagNumber(3) - $fixnum.Int64 get toRevId => $_getI64(2); + $fixnum.Int64 get end => $_getI64(2); @$pb.TagNumber(3) - set toRevId($fixnum.Int64 v) { $_setInt64(2, v); } + set end($fixnum.Int64 v) { $_setInt64(2, v); } @$pb.TagNumber(3) - $core.bool hasToRevId() => $_has(2); + $core.bool hasEnd() => $_has(2); @$pb.TagNumber(3) - void clearToRevId() => clearField(3); + void clearEnd() => clearField(3); } diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart index 38fd8c0ccf..08933d02c7 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart @@ -49,10 +49,10 @@ const RevisionRange$json = const { '1': 'RevisionRange', '2': const [ const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, - const {'1': 'from_rev_id', '3': 2, '4': 1, '5': 3, '10': 'fromRevId'}, - const {'1': 'to_rev_id', '3': 3, '4': 1, '5': 3, '10': 'toRevId'}, + const {'1': 'start', '3': 2, '4': 1, '5': 3, '10': 'start'}, + const {'1': 'end', '3': 3, '4': 1, '5': 3, '10': 'end'}, ], }; /// Descriptor for `RevisionRange`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhUKBmRvY19pZBgBIAEoCVIFZG9jSWQSHgoLZnJvbV9yZXZfaWQYAiABKANSCWZyb21SZXZJZBIaCgl0b19yZXZfaWQYAyABKANSB3RvUmV2SWQ='); +final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhUKBmRvY19pZBgBIAEoCVIFZG9jSWQSFAoFc3RhcnQYAiABKANSBXN0YXJ0EhAKA2VuZBgDIAEoA1IDZW5k'); diff --git a/backend/src/service/doc/crud.rs b/backend/src/service/doc/crud.rs index 96a2ccb55f..04e2dafd7b 100644 --- a/backend/src/service/doc/crud.rs +++ b/backend/src/service/doc/crud.rs @@ -49,7 +49,7 @@ pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result Result<(), ServerError> { let doc_id = Uuid::parse_str(¶ms.doc_id)?; let mut transaction = pool @@ -59,6 +59,8 @@ pub async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<() let data = Some(params.take_data()); + tracing::Span::current().record("result", &data.as_ref().unwrap_or(&"".to_owned()).as_str()); + let (sql, args) = SqlBuilder::update(DOC_TABLE) .add_some_arg("data", data) .add_arg("rev_id", params.rev_id) diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index eb8080293c..755a85c550 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -61,11 +61,10 @@ impl ServerEditDoc { pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> { self.users.insert(user.id(), user.clone()); let cur_rev_id = self.rev_id.load(SeqCst); - match cur_rev_id.cmp(&rev_id) { Ordering::Less => { user.socket - .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, rev_id)) + .do_send(mk_pull_rev_ws_message(&self.doc_id, next(cur_rev_id), rev_id)) .map_err(internal_error)?; }, Ordering::Equal => {}, @@ -84,9 +83,9 @@ impl ServerEditDoc { level = "debug", skip(self, user, pg_pool, revision), fields( - rev_id = %self.rev_id.load(SeqCst), - revision_rev_id = %revision.rev_id, - revision_base_rev_id = %revision.base_rev_id + cur_rev_id = %self.rev_id.load(SeqCst), + base_rev_id = %revision.base_rev_id, + rev_id = %revision.rev_id, ) )] pub async fn apply_revision( @@ -100,18 +99,19 @@ impl ServerEditDoc { let cur_rev_id = self.rev_id.load(SeqCst); match cur_rev_id.cmp(&revision.rev_id) { Ordering::Less => { - if cur_rev_id != revision.base_rev_id { - // The server document is outdated, try to get the missing revision from the - // client. - user.socket - .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) - .map_err(internal_error)?; - } else { + let next_rev_id = next(cur_rev_id); + if next_rev_id == revision.base_rev_id || cur_rev_id == revision.base_rev_id { let _ = self.compose_revision(&revision, pg_pool).await?; user.socket .do_send(mk_acked_ws_message(&revision)) .map_err(internal_error)?; } + + // The server document is outdated, try to get the missing revision from the + // client. + user.socket + .do_send(mk_pull_rev_ws_message(&self.doc_id, next_rev_id, revision.rev_id)) + .map_err(internal_error)?; }, Ordering::Equal => {}, Ordering::Greater => { @@ -168,12 +168,11 @@ impl ServerEditDoc { level = "debug", skip(self, delta), fields( - delta = %delta.to_json(), + revision_delta = %delta.to_json(), result, ) )] fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> { - // Opti: push each revision into queue and process it one by one. match self.document.try_write_for(Duration::from_millis(300)) { None => { log::error!("Failed to acquire write lock of document"); @@ -211,8 +210,8 @@ fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { let range = RevisionRange { doc_id: doc_id.to_string(), - from_rev_id, - to_rev_id, + start: from_rev_id, + end: to_rev_id, ..Default::default() }; @@ -247,3 +246,6 @@ fn mk_ws_message>(data: T) -> WsMessageAdaptor { let bytes: Bytes = msg.try_into().unwrap(); WsMessageAdaptor(bytes) } + +#[inline] +fn next(rev_id: i64) -> i64 { rev_id + 1 } diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index 4fc8fcc983..f0bc89a9eb 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -3,7 +3,7 @@ use crate::services::util::md5; use crate::entities::doc::Doc; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use flowy_ot::core::Delta; -use std::fmt::Formatter; +use std::{fmt::Formatter, ops::RangeInclusive}; #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] pub enum RevType { @@ -97,18 +97,22 @@ impl std::fmt::Debug for Revision { } impl Revision { - pub fn new, T2: Into>( - base_rev_id: T1, - rev_id: T2, - delta_data: Vec, - doc_id: &str, - ty: RevType, - ) -> Revision { - let md5 = md5(&delta_data); + pub fn new(base_rev_id: T1, rev_id: T2, delta: D, doc_id: &str, ty: RevType) -> Revision + where + T1: Into, + T2: Into, + D: AsRef<[u8]>, + { + let md5 = md5(&delta); let doc_id = doc_id.to_owned(); + let delta_data = delta.as_ref().to_vec(); + let base_rev_id = base_rev_id.into(); + let rev_id = rev_id.into(); + debug_assert!(base_rev_id != rev_id); + Self { - base_rev_id: base_rev_id.into(), - rev_id: rev_id.into(), + base_rev_id, + rev_id, delta_data, md5, doc_id, @@ -135,19 +139,24 @@ pub struct RevisionRange { pub doc_id: String, #[pb(index = 2)] - pub from_rev_id: i64, + pub start: i64, #[pb(index = 3)] - pub to_rev_id: i64, + pub end: i64, } impl RevisionRange { pub fn len(&self) -> i64 { - debug_assert!(self.to_rev_id >= self.from_rev_id); - if self.to_rev_id >= self.from_rev_id { - self.to_rev_id - self.from_rev_id + debug_assert!(self.end >= self.start); + if self.end >= self.start { + self.end - self.start } else { 0 } } + + pub fn iter(&self) -> RangeInclusive { + debug_assert!(self.start != self.end); + RangeInclusive::new(self.start, self.end) + } } diff --git a/rust-lib/flowy-document/src/protobuf/model/revision.rs b/rust-lib/flowy-document/src/protobuf/model/revision.rs index a6378c2c4f..83bd5e7ad0 100644 --- a/rust-lib/flowy-document/src/protobuf/model/revision.rs +++ b/rust-lib/flowy-document/src/protobuf/model/revision.rs @@ -523,8 +523,8 @@ impl ::protobuf::reflect::ProtobufValue for Revision { pub struct RevisionRange { // message fields pub doc_id: ::std::string::String, - pub from_rev_id: i64, - pub to_rev_id: i64, + pub start: i64, + pub end: i64, // special fields pub unknown_fields: ::protobuf::UnknownFields, pub cached_size: ::protobuf::CachedSize, @@ -567,34 +567,34 @@ impl RevisionRange { ::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) } - // int64 from_rev_id = 2; + // int64 start = 2; - pub fn get_from_rev_id(&self) -> i64 { - self.from_rev_id + pub fn get_start(&self) -> i64 { + self.start } - pub fn clear_from_rev_id(&mut self) { - self.from_rev_id = 0; + pub fn clear_start(&mut self) { + self.start = 0; } // Param is passed by value, moved - pub fn set_from_rev_id(&mut self, v: i64) { - self.from_rev_id = v; + pub fn set_start(&mut self, v: i64) { + self.start = v; } - // int64 to_rev_id = 3; + // int64 end = 3; - pub fn get_to_rev_id(&self) -> i64 { - self.to_rev_id + pub fn get_end(&self) -> i64 { + self.end } - pub fn clear_to_rev_id(&mut self) { - self.to_rev_id = 0; + pub fn clear_end(&mut self) { + self.end = 0; } // Param is passed by value, moved - pub fn set_to_rev_id(&mut self, v: i64) { - self.to_rev_id = v; + pub fn set_end(&mut self, v: i64) { + self.end = v; } } @@ -615,14 +615,14 @@ impl ::protobuf::Message for RevisionRange { return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); } let tmp = is.read_int64()?; - self.from_rev_id = tmp; + self.start = tmp; }, 3 => { if wire_type != ::protobuf::wire_format::WireTypeVarint { return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); } let tmp = is.read_int64()?; - self.to_rev_id = tmp; + self.end = tmp; }, _ => { ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; @@ -639,11 +639,11 @@ impl ::protobuf::Message for RevisionRange { if !self.doc_id.is_empty() { my_size += ::protobuf::rt::string_size(1, &self.doc_id); } - if self.from_rev_id != 0 { - my_size += ::protobuf::rt::value_size(2, self.from_rev_id, ::protobuf::wire_format::WireTypeVarint); + if self.start != 0 { + my_size += ::protobuf::rt::value_size(2, self.start, ::protobuf::wire_format::WireTypeVarint); } - if self.to_rev_id != 0 { - my_size += ::protobuf::rt::value_size(3, self.to_rev_id, ::protobuf::wire_format::WireTypeVarint); + if self.end != 0 { + my_size += ::protobuf::rt::value_size(3, self.end, ::protobuf::wire_format::WireTypeVarint); } my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); self.cached_size.set(my_size); @@ -654,11 +654,11 @@ impl ::protobuf::Message for RevisionRange { if !self.doc_id.is_empty() { os.write_string(1, &self.doc_id)?; } - if self.from_rev_id != 0 { - os.write_int64(2, self.from_rev_id)?; + if self.start != 0 { + os.write_int64(2, self.start)?; } - if self.to_rev_id != 0 { - os.write_int64(3, self.to_rev_id)?; + if self.end != 0 { + os.write_int64(3, self.end)?; } os.write_unknown_fields(self.get_unknown_fields())?; ::std::result::Result::Ok(()) @@ -704,14 +704,14 @@ impl ::protobuf::Message for RevisionRange { |m: &mut RevisionRange| { &mut m.doc_id }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( - "from_rev_id", - |m: &RevisionRange| { &m.from_rev_id }, - |m: &mut RevisionRange| { &mut m.from_rev_id }, + "start", + |m: &RevisionRange| { &m.start }, + |m: &mut RevisionRange| { &mut m.start }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( - "to_rev_id", - |m: &RevisionRange| { &m.to_rev_id }, - |m: &mut RevisionRange| { &mut m.to_rev_id }, + "end", + |m: &RevisionRange| { &m.end }, + |m: &mut RevisionRange| { &mut m.end }, )); ::protobuf::reflect::MessageDescriptor::new_pb_name::( "RevisionRange", @@ -730,8 +730,8 @@ impl ::protobuf::Message for RevisionRange { impl ::protobuf::Clear for RevisionRange { fn clear(&mut self) { self.doc_id.clear(); - self.from_rev_id = 0; - self.to_rev_id = 0; + self.start = 0; + self.end = 0; self.unknown_fields.clear(); } } @@ -804,48 +804,48 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\ evId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\n\ \x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01(\ - \tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"b\ + \tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"N\ \n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\ - \x1e\n\x0bfrom_rev_id\x18\x02\x20\x01(\x03R\tfromRevId\x12\x1a\n\tto_rev\ - _id\x18\x03\x20\x01(\x03R\x07toRevId*\x20\n\x07RevType\x12\t\n\x05Local\ - \x10\0\x12\n\n\x06Remote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\ - \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\ - \n\x03\x04\0\x01\x12\x03\x02\x08\r\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\ - \x04\x14\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\ - \x02\0\x01\x12\x03\x03\n\x0f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x12\ - \x13\n\n\n\x02\x04\x01\x12\x04\x05\0\x0c\x01\n\n\n\x03\x04\x01\x01\x12\ - \x03\x05\x08\x10\n\x0b\n\x04\x04\x01\x02\0\x12\x03\x06\x04\x1a\n\x0c\n\ - \x05\x04\x01\x02\0\x05\x12\x03\x06\x04\t\n\x0c\n\x05\x04\x01\x02\0\x01\ - \x12\x03\x06\n\x15\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x06\x18\x19\n\ - \x0b\n\x04\x04\x01\x02\x01\x12\x03\x07\x04\x15\n\x0c\n\x05\x04\x01\x02\ - \x01\x05\x12\x03\x07\x04\t\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x07\n\ - \x10\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x07\x13\x14\n\x0b\n\x04\x04\ - \x01\x02\x02\x12\x03\x08\x04\x19\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\ - \x08\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x08\n\x14\n\x0c\n\x05\ - \x04\x01\x02\x02\x03\x12\x03\x08\x17\x18\n\x0b\n\x04\x04\x01\x02\x03\x12\ - \x03\t\x04\x13\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\t\x04\n\n\x0c\n\ - \x05\x04\x01\x02\x03\x01\x12\x03\t\x0b\x0e\n\x0c\n\x05\x04\x01\x02\x03\ - \x03\x12\x03\t\x11\x12\n\x0b\n\x04\x04\x01\x02\x04\x12\x03\n\x04\x16\n\ - \x0c\n\x05\x04\x01\x02\x04\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\ - \x04\x01\x12\x03\n\x0b\x11\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\n\x14\ - \x15\n\x0b\n\x04\x04\x01\x02\x05\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\ - \x02\x05\x06\x12\x03\x0b\x04\x0b\n\x0c\n\x05\x04\x01\x02\x05\x01\x12\x03\ - \x0b\x0c\x0e\n\x0c\n\x05\x04\x01\x02\x05\x03\x12\x03\x0b\x11\x12\n\n\n\ - \x02\x04\x02\x12\x04\r\0\x11\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x15\ - \n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\ - \x05\x12\x03\x0e\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\ - \n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\ - \x02\x01\x12\x03\x0f\x04\x1a\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x0f\ - \x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\x0f\n\x15\n\x0c\n\x05\x04\ - \x02\x02\x01\x03\x12\x03\x0f\x18\x19\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\ - \x10\x04\x18\n\x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x10\x04\t\n\x0c\n\ - \x05\x04\x02\x02\x02\x01\x12\x03\x10\n\x13\n\x0c\n\x05\x04\x02\x02\x02\ - \x03\x12\x03\x10\x16\x17\n\n\n\x02\x05\0\x12\x04\x12\0\x15\x01\n\n\n\x03\ - \x05\0\x01\x12\x03\x12\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x13\x04\ - \x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x13\x04\t\n\x0c\n\x05\x05\0\x02\ - \0\x02\x12\x03\x13\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x14\x04\x0f\n\ - \x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x14\x04\n\n\x0c\n\x05\x05\0\x02\x01\ - \x02\x12\x03\x14\r\x0eb\x06proto3\ + \x14\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\ + \x20\x01(\x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\ + \x06Remote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\ + \x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\n\x03\x04\0\ + \x01\x12\x03\x02\x08\r\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x14\n\x0c\ + \n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\ + \x03\x03\n\x0f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x12\x13\n\n\n\x02\ + \x04\x01\x12\x04\x05\0\x0c\x01\n\n\n\x03\x04\x01\x01\x12\x03\x05\x08\x10\ + \n\x0b\n\x04\x04\x01\x02\0\x12\x03\x06\x04\x1a\n\x0c\n\x05\x04\x01\x02\0\ + \x05\x12\x03\x06\x04\t\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x06\n\x15\n\ + \x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x06\x18\x19\n\x0b\n\x04\x04\x01\x02\ + \x01\x12\x03\x07\x04\x15\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x07\x04\ + \t\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x07\n\x10\n\x0c\n\x05\x04\x01\ + \x02\x01\x03\x12\x03\x07\x13\x14\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x08\ + \x04\x19\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x08\x04\t\n\x0c\n\x05\ + \x04\x01\x02\x02\x01\x12\x03\x08\n\x14\n\x0c\n\x05\x04\x01\x02\x02\x03\ + \x12\x03\x08\x17\x18\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\t\x04\x13\n\x0c\ + \n\x05\x04\x01\x02\x03\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x03\ + \x01\x12\x03\t\x0b\x0e\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\t\x11\x12\ + \n\x0b\n\x04\x04\x01\x02\x04\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\ + \x04\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\x04\x01\x12\x03\n\x0b\ + \x11\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\ + \x01\x02\x05\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\x02\x05\x06\x12\x03\ + \x0b\x04\x0b\n\x0c\n\x05\x04\x01\x02\x05\x01\x12\x03\x0b\x0c\x0e\n\x0c\n\ + \x05\x04\x01\x02\x05\x03\x12\x03\x0b\x11\x12\n\n\n\x02\x04\x02\x12\x04\r\ + \0\x11\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x15\n\x0b\n\x04\x04\x02\ + \x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\ + \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\ + \x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\ + \x04\x14\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\ + \x04\x02\x02\x01\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\ + \x12\x03\x0f\x12\x13\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x10\x04\x12\n\ + \x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x10\x04\t\n\x0c\n\x05\x04\x02\x02\ + \x02\x01\x12\x03\x10\n\r\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x10\x10\ + \x11\n\n\n\x02\x05\0\x12\x04\x12\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\ + \x12\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x13\x04\x0e\n\x0c\n\x05\x05\ + \0\x02\0\x01\x12\x03\x13\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x13\ + \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x14\x04\x0f\n\x0c\n\x05\x05\0\ + \x02\x01\x01\x12\x03\x14\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x14\ + \r\x0eb\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/revision.proto b/rust-lib/flowy-document/src/protobuf/proto/revision.proto index 800e5de955..44a3137bc1 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/revision.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/revision.proto @@ -13,8 +13,8 @@ message Revision { } message RevisionRange { string doc_id = 1; - int64 from_rev_id = 2; - int64 to_rev_id = 3; + int64 start = 2; + int64 end = 3; } enum RevType { Local = 0; diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 3c248eed5d..0731f97f98 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -10,7 +10,7 @@ use crate::{ edit::{ doc_actor::DocumentActor, message::{DocumentMsg, TransformDeltas}, - model::NotifyOpenDocAction, + model::OpenDocAction, }, revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor}, UndoResult, @@ -71,8 +71,8 @@ impl ClientEditDoc { ret, }; let _ = self.document.send(msg); - let delta_data = rx.await.map_err(internal_error)??.to_bytes(); - let rev_id = self.mk_revision(&delta_data).await?; + let delta = rx.await.map_err(internal_error)??; + let rev_id = self.mk_revision(delta).await?; save_document(self.document.clone(), rev_id.into()).await } @@ -80,8 +80,8 @@ impl ClientEditDoc { let (ret, rx) = oneshot::channel::>(); let msg = DocumentMsg::Delete { interval, ret }; let _ = self.document.send(msg); - let delta_data = rx.await.map_err(internal_error)??.to_bytes(); - let _ = self.mk_revision(&delta_data).await?; + let delta = rx.await.map_err(internal_error)??; + let _ = self.mk_revision(delta).await?; Ok(()) } @@ -93,8 +93,8 @@ impl ClientEditDoc { ret, }; let _ = self.document.send(msg); - let delta_data = rx.await.map_err(internal_error)??.to_bytes(); - let _ = self.mk_revision(&delta_data).await?; + let delta = rx.await.map_err(internal_error)??; + let _ = self.mk_revision(delta).await?; Ok(()) } @@ -106,8 +106,8 @@ impl ClientEditDoc { ret, }; let _ = self.document.send(msg); - let delta_data = rx.await.map_err(internal_error)??.to_bytes(); - let _ = self.mk_revision(&delta_data).await?; + let delta = rx.await.map_err(internal_error)??; + let _ = self.mk_revision(delta).await?; Ok(()) } @@ -151,14 +151,23 @@ impl ClientEditDoc { }) } - async fn mk_revision(&self, delta_data: &Bytes) -> Result { + #[tracing::instrument(level = "debug", skip(self, delta), fields(revision_delta = %delta.to_json(), send_state, base_rev_id, rev_id))] + async fn mk_revision(&self, delta: Delta) -> Result { + let delta_data = delta.to_bytes(); let (base_rev_id, rev_id) = self.rev_manager.next_rev_id(); + tracing::Span::current().record("base_rev_id", &base_rev_id); + tracing::Span::current().record("rev_id", &rev_id); + let delta_data = delta_data.to_vec(); let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local); let _ = self.rev_manager.add_revision(&revision).await?; match self.ws.send(revision.into()) { - Ok(_) => {}, - Err(e) => log::error!("Send delta failed: {:?}", e), + Ok(_) => { + tracing::Span::current().record("send_state", &"success"); + }, + Err(e) => { + tracing::Span::current().record("send_state", &format!("failed: {:?}", e).as_str()); + }, }; Ok(rev_id.into()) @@ -168,11 +177,14 @@ impl ClientEditDoc { pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> { let delta = Delta::from_bytes(&data)?; let (ret, rx) = oneshot::channel::>(); - let msg = DocumentMsg::Delta { delta, ret }; + let msg = DocumentMsg::Delta { + delta: delta.clone(), + ret, + }; let _ = self.document.send(msg); let _ = rx.await.map_err(internal_error)??; - let rev_id = self.mk_revision(&data).await?; + let rev_id = self.mk_revision(delta).await?; save_document(self.document.clone(), rev_id).await } @@ -189,7 +201,7 @@ impl ClientEditDoc { let rev_id: RevId = self.rev_manager.rev_id().into(); if let Ok(user_id) = self.user.user_id() { - let action = NotifyOpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws); + let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws); let strategy = ExponentialBackoff::from_millis(50).take(3); let retry = Retry::spawn(strategy, action); tokio::spawn(async move { @@ -228,11 +240,11 @@ impl ClientEditDoc { // update rev id self.rev_manager.set_rev_id(server_rev_id.clone().into()); - let (_, local_rev_id) = self.rev_manager.next_rev_id(); + let (local_base_rev_id, local_rev_id) = self.rev_manager.next_rev_id(); // save the revision let revision = Revision::new( - server_rev_id.value, + local_base_rev_id, local_rev_id, client_prime.to_bytes().to_vec(), &self.doc_id, @@ -242,7 +254,7 @@ impl ClientEditDoc { // send the server_prime delta let revision = Revision::new( - server_rev_id.value, + local_base_rev_id, local_rev_id, server_prime.to_bytes().to_vec(), &self.doc_id, diff --git a/rust-lib/flowy-document/src/services/doc/edit/model.rs b/rust-lib/flowy-document/src/services/doc/edit/model.rs index cd760628d1..c869c37a4b 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/model.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/model.rs @@ -7,14 +7,14 @@ use flowy_infra::retry::Action; use futures::future::BoxFuture; use std::{future, sync::Arc}; -pub(crate) struct NotifyOpenDocAction { +pub(crate) struct OpenDocAction { user_id: String, rev_id: RevId, doc_id: String, ws: Arc, } -impl NotifyOpenDocAction { +impl OpenDocAction { pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { Self { user_id: user_id.to_owned(), @@ -25,7 +25,7 @@ impl NotifyOpenDocAction { } } -impl Action for NotifyOpenDocAction { +impl Action for OpenDocAction { type Future = BoxFuture<'static, Result>; type Item = (); type Error = DocError; diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index a28ca7d19d..714d2cacb7 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -84,8 +84,8 @@ impl RevisionManager { let delta_data = new_delta.to_bytes(); let revision = Revision::new( - range.from_rev_id, - range.to_rev_id, + range.start, + range.end, delta_data.to_vec(), &self.doc_id, RevType::Remote, diff --git a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs index 7bb4593673..a8b9003bc2 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs @@ -153,14 +153,11 @@ impl RevisionStoreActor { } async fn revs_in_range(&self, range: RevisionRange) -> DocResult> { - let iter_range = range.from_rev_id..=range.to_rev_id; - let revs = iter_range - .flat_map(|rev_id| { - // - match self.revs.get(&rev_id) { - None => None, - Some(rev) => Some((&*(*rev)).clone()), - } + let revs = range + .iter() + .flat_map(|rev_id| match self.revs.get(&rev_id) { + None => None, + Some(rev) => Some((&*(*rev)).clone()), }) .collect::>(); @@ -189,7 +186,7 @@ impl RevisionStoreActor { let doc = self.server.fetch_document_from_remote(&self.doc_id).await?; let revision = revision_from_doc(doc.clone(), RevType::Remote); - self.handle_new_revision(revision); + let _ = self.handle_new_revision(revision).await?; Ok(doc) } } diff --git a/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index a6b193ba94..128c44b7b1 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -80,8 +80,8 @@ impl RevTableSql { conn: &SqliteConnection, ) -> Result, DocError> { let rev_tables = dsl::rev_table - .filter(rev_id.ge(range.from_rev_id)) - .filter(rev_id.le(range.to_rev_id)) + .filter(rev_id.ge(range.start)) + .filter(rev_id.le(range.end)) .filter(doc_id.eq(doc_id_s)) .order(rev_id.asc()) .load::(conn)?;