[server]: fix pull rev id range bug

This commit is contained in:
appflowy 2021-10-06 23:21:57 +08:00
parent 04f8fc38a8
commit f84fb6f87f
12 changed files with 185 additions and 163 deletions

View file

@ -181,26 +181,26 @@ class Revision extends $pb.GeneratedMessage {
class RevisionRange extends $pb.GeneratedMessage { class RevisionRange extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create) static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'fromRevId') ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'start')
..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'toRevId') ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'end')
..hasRequiredFields = false ..hasRequiredFields = false
; ;
RevisionRange._() : super(); RevisionRange._() : super();
factory RevisionRange({ factory RevisionRange({
$core.String? docId, $core.String? docId,
$fixnum.Int64? fromRevId, $fixnum.Int64? start,
$fixnum.Int64? toRevId, $fixnum.Int64? end,
}) { }) {
final _result = create(); final _result = create();
if (docId != null) { if (docId != null) {
_result.docId = docId; _result.docId = docId;
} }
if (fromRevId != null) { if (start != null) {
_result.fromRevId = fromRevId; _result.start = start;
} }
if (toRevId != null) { if (end != null) {
_result.toRevId = toRevId; _result.end = end;
} }
return _result; return _result;
} }
@ -235,21 +235,21 @@ class RevisionRange extends $pb.GeneratedMessage {
void clearDocId() => clearField(1); void clearDocId() => clearField(1);
@$pb.TagNumber(2) @$pb.TagNumber(2)
$fixnum.Int64 get fromRevId => $_getI64(1); $fixnum.Int64 get start => $_getI64(1);
@$pb.TagNumber(2) @$pb.TagNumber(2)
set fromRevId($fixnum.Int64 v) { $_setInt64(1, v); } set start($fixnum.Int64 v) { $_setInt64(1, v); }
@$pb.TagNumber(2) @$pb.TagNumber(2)
$core.bool hasFromRevId() => $_has(1); $core.bool hasStart() => $_has(1);
@$pb.TagNumber(2) @$pb.TagNumber(2)
void clearFromRevId() => clearField(2); void clearStart() => clearField(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
$fixnum.Int64 get toRevId => $_getI64(2); $fixnum.Int64 get end => $_getI64(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
set toRevId($fixnum.Int64 v) { $_setInt64(2, v); } set end($fixnum.Int64 v) { $_setInt64(2, v); }
@$pb.TagNumber(3) @$pb.TagNumber(3)
$core.bool hasToRevId() => $_has(2); $core.bool hasEnd() => $_has(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
void clearToRevId() => clearField(3); void clearEnd() => clearField(3);
} }

View file

@ -49,10 +49,10 @@ const RevisionRange$json = const {
'1': 'RevisionRange', '1': 'RevisionRange',
'2': const [ '2': const [
const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
const {'1': 'from_rev_id', '3': 2, '4': 1, '5': 3, '10': 'fromRevId'}, const {'1': 'start', '3': 2, '4': 1, '5': 3, '10': 'start'},
const {'1': 'to_rev_id', '3': 3, '4': 1, '5': 3, '10': 'toRevId'}, const {'1': 'end', '3': 3, '4': 1, '5': 3, '10': 'end'},
], ],
}; };
/// Descriptor for `RevisionRange`. Decode as a `google.protobuf.DescriptorProto`. /// Descriptor for `RevisionRange`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhUKBmRvY19pZBgBIAEoCVIFZG9jSWQSHgoLZnJvbV9yZXZfaWQYAiABKANSCWZyb21SZXZJZBIaCgl0b19yZXZfaWQYAyABKANSB3RvUmV2SWQ='); final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhUKBmRvY19pZBgBIAEoCVIFZG9jSWQSFAoFc3RhcnQYAiABKANSBXN0YXJ0EhAKA2VuZBgDIAEoA1IDZW5k');

View file

@ -49,7 +49,7 @@ pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result<Do
Ok(doc) Ok(doc)
} }
#[tracing::instrument(level = "debug", skip(pool, params), err)] #[tracing::instrument(level = "debug", skip(pool, params), fields(delta), err)]
pub async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> { pub async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> {
let doc_id = Uuid::parse_str(&params.doc_id)?; let doc_id = Uuid::parse_str(&params.doc_id)?;
let mut transaction = pool let mut transaction = pool
@ -59,6 +59,8 @@ pub async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<()
let data = Some(params.take_data()); let data = Some(params.take_data());
tracing::Span::current().record("result", &data.as_ref().unwrap_or(&"".to_owned()).as_str());
let (sql, args) = SqlBuilder::update(DOC_TABLE) let (sql, args) = SqlBuilder::update(DOC_TABLE)
.add_some_arg("data", data) .add_some_arg("data", data)
.add_arg("rev_id", params.rev_id) .add_arg("rev_id", params.rev_id)

View file

@ -61,11 +61,10 @@ impl ServerEditDoc {
pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> { pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> {
self.users.insert(user.id(), user.clone()); self.users.insert(user.id(), user.clone());
let cur_rev_id = self.rev_id.load(SeqCst); let cur_rev_id = self.rev_id.load(SeqCst);
match cur_rev_id.cmp(&rev_id) { match cur_rev_id.cmp(&rev_id) {
Ordering::Less => { Ordering::Less => {
user.socket user.socket
.do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, rev_id)) .do_send(mk_pull_rev_ws_message(&self.doc_id, next(cur_rev_id), rev_id))
.map_err(internal_error)?; .map_err(internal_error)?;
}, },
Ordering::Equal => {}, Ordering::Equal => {},
@ -84,9 +83,9 @@ impl ServerEditDoc {
level = "debug", level = "debug",
skip(self, user, pg_pool, revision), skip(self, user, pg_pool, revision),
fields( fields(
rev_id = %self.rev_id.load(SeqCst), cur_rev_id = %self.rev_id.load(SeqCst),
revision_rev_id = %revision.rev_id, base_rev_id = %revision.base_rev_id,
revision_base_rev_id = %revision.base_rev_id rev_id = %revision.rev_id,
) )
)] )]
pub async fn apply_revision( pub async fn apply_revision(
@ -100,18 +99,19 @@ impl ServerEditDoc {
let cur_rev_id = self.rev_id.load(SeqCst); let cur_rev_id = self.rev_id.load(SeqCst);
match cur_rev_id.cmp(&revision.rev_id) { match cur_rev_id.cmp(&revision.rev_id) {
Ordering::Less => { Ordering::Less => {
if cur_rev_id != revision.base_rev_id { let next_rev_id = next(cur_rev_id);
// The server document is outdated, try to get the missing revision from the if next_rev_id == revision.base_rev_id || cur_rev_id == revision.base_rev_id {
// client.
user.socket
.do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
.map_err(internal_error)?;
} else {
let _ = self.compose_revision(&revision, pg_pool).await?; let _ = self.compose_revision(&revision, pg_pool).await?;
user.socket user.socket
.do_send(mk_acked_ws_message(&revision)) .do_send(mk_acked_ws_message(&revision))
.map_err(internal_error)?; .map_err(internal_error)?;
} }
// The server document is outdated, try to get the missing revision from the
// client.
user.socket
.do_send(mk_pull_rev_ws_message(&self.doc_id, next_rev_id, revision.rev_id))
.map_err(internal_error)?;
}, },
Ordering::Equal => {}, Ordering::Equal => {},
Ordering::Greater => { Ordering::Greater => {
@ -168,12 +168,11 @@ impl ServerEditDoc {
level = "debug", level = "debug",
skip(self, delta), skip(self, delta),
fields( fields(
delta = %delta.to_json(), revision_delta = %delta.to_json(),
result, result,
) )
)] )]
fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> { fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> {
// Opti: push each revision into queue and process it one by one.
match self.document.try_write_for(Duration::from_millis(300)) { match self.document.try_write_for(Duration::from_millis(300)) {
None => { None => {
log::error!("Failed to acquire write lock of document"); log::error!("Failed to acquire write lock of document");
@ -211,8 +210,8 @@ fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor
fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
let range = RevisionRange { let range = RevisionRange {
doc_id: doc_id.to_string(), doc_id: doc_id.to_string(),
from_rev_id, start: from_rev_id,
to_rev_id, end: to_rev_id,
..Default::default() ..Default::default()
}; };
@ -247,3 +246,6 @@ fn mk_ws_message<T: Into<WsMessage>>(data: T) -> WsMessageAdaptor {
let bytes: Bytes = msg.try_into().unwrap(); let bytes: Bytes = msg.try_into().unwrap();
WsMessageAdaptor(bytes) WsMessageAdaptor(bytes)
} }
#[inline]
fn next(rev_id: i64) -> i64 { rev_id + 1 }

View file

@ -3,7 +3,7 @@ use crate::services::util::md5;
use crate::entities::doc::Doc; use crate::entities::doc::Doc;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_ot::core::Delta; use flowy_ot::core::Delta;
use std::fmt::Formatter; use std::{fmt::Formatter, ops::RangeInclusive};
#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
pub enum RevType { pub enum RevType {
@ -97,18 +97,22 @@ impl std::fmt::Debug for Revision {
} }
impl Revision { impl Revision {
pub fn new<T1: Into<i64>, T2: Into<i64>>( pub fn new<T1, T2, D>(base_rev_id: T1, rev_id: T2, delta: D, doc_id: &str, ty: RevType) -> Revision
base_rev_id: T1, where
rev_id: T2, T1: Into<i64>,
delta_data: Vec<u8>, T2: Into<i64>,
doc_id: &str, D: AsRef<[u8]>,
ty: RevType, {
) -> Revision { let md5 = md5(&delta);
let md5 = md5(&delta_data);
let doc_id = doc_id.to_owned(); let doc_id = doc_id.to_owned();
let delta_data = delta.as_ref().to_vec();
let base_rev_id = base_rev_id.into();
let rev_id = rev_id.into();
debug_assert!(base_rev_id != rev_id);
Self { Self {
base_rev_id: base_rev_id.into(), base_rev_id,
rev_id: rev_id.into(), rev_id,
delta_data, delta_data,
md5, md5,
doc_id, doc_id,
@ -135,19 +139,24 @@ pub struct RevisionRange {
pub doc_id: String, pub doc_id: String,
#[pb(index = 2)] #[pb(index = 2)]
pub from_rev_id: i64, pub start: i64,
#[pb(index = 3)] #[pb(index = 3)]
pub to_rev_id: i64, pub end: i64,
} }
impl RevisionRange { impl RevisionRange {
pub fn len(&self) -> i64 { pub fn len(&self) -> i64 {
debug_assert!(self.to_rev_id >= self.from_rev_id); debug_assert!(self.end >= self.start);
if self.to_rev_id >= self.from_rev_id { if self.end >= self.start {
self.to_rev_id - self.from_rev_id self.end - self.start
} else { } else {
0 0
} }
} }
pub fn iter(&self) -> RangeInclusive<i64> {
debug_assert!(self.start != self.end);
RangeInclusive::new(self.start, self.end)
}
} }

View file

@ -523,8 +523,8 @@ impl ::protobuf::reflect::ProtobufValue for Revision {
pub struct RevisionRange { pub struct RevisionRange {
// message fields // message fields
pub doc_id: ::std::string::String, pub doc_id: ::std::string::String,
pub from_rev_id: i64, pub start: i64,
pub to_rev_id: i64, pub end: i64,
// special fields // special fields
pub unknown_fields: ::protobuf::UnknownFields, pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize, pub cached_size: ::protobuf::CachedSize,
@ -567,34 +567,34 @@ impl RevisionRange {
::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) ::std::mem::replace(&mut self.doc_id, ::std::string::String::new())
} }
// int64 from_rev_id = 2; // int64 start = 2;
pub fn get_from_rev_id(&self) -> i64 { pub fn get_start(&self) -> i64 {
self.from_rev_id self.start
} }
pub fn clear_from_rev_id(&mut self) { pub fn clear_start(&mut self) {
self.from_rev_id = 0; self.start = 0;
} }
// Param is passed by value, moved // Param is passed by value, moved
pub fn set_from_rev_id(&mut self, v: i64) { pub fn set_start(&mut self, v: i64) {
self.from_rev_id = v; self.start = v;
} }
// int64 to_rev_id = 3; // int64 end = 3;
pub fn get_to_rev_id(&self) -> i64 { pub fn get_end(&self) -> i64 {
self.to_rev_id self.end
} }
pub fn clear_to_rev_id(&mut self) { pub fn clear_end(&mut self) {
self.to_rev_id = 0; self.end = 0;
} }
// Param is passed by value, moved // Param is passed by value, moved
pub fn set_to_rev_id(&mut self, v: i64) { pub fn set_end(&mut self, v: i64) {
self.to_rev_id = v; self.end = v;
} }
} }
@ -615,14 +615,14 @@ impl ::protobuf::Message for RevisionRange {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
} }
let tmp = is.read_int64()?; let tmp = is.read_int64()?;
self.from_rev_id = tmp; self.start = tmp;
}, },
3 => { 3 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint { if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
} }
let tmp = is.read_int64()?; let tmp = is.read_int64()?;
self.to_rev_id = tmp; self.end = tmp;
}, },
_ => { _ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
@ -639,11 +639,11 @@ impl ::protobuf::Message for RevisionRange {
if !self.doc_id.is_empty() { if !self.doc_id.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.doc_id); my_size += ::protobuf::rt::string_size(1, &self.doc_id);
} }
if self.from_rev_id != 0 { if self.start != 0 {
my_size += ::protobuf::rt::value_size(2, self.from_rev_id, ::protobuf::wire_format::WireTypeVarint); my_size += ::protobuf::rt::value_size(2, self.start, ::protobuf::wire_format::WireTypeVarint);
} }
if self.to_rev_id != 0 { if self.end != 0 {
my_size += ::protobuf::rt::value_size(3, self.to_rev_id, ::protobuf::wire_format::WireTypeVarint); my_size += ::protobuf::rt::value_size(3, self.end, ::protobuf::wire_format::WireTypeVarint);
} }
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size); self.cached_size.set(my_size);
@ -654,11 +654,11 @@ impl ::protobuf::Message for RevisionRange {
if !self.doc_id.is_empty() { if !self.doc_id.is_empty() {
os.write_string(1, &self.doc_id)?; os.write_string(1, &self.doc_id)?;
} }
if self.from_rev_id != 0 { if self.start != 0 {
os.write_int64(2, self.from_rev_id)?; os.write_int64(2, self.start)?;
} }
if self.to_rev_id != 0 { if self.end != 0 {
os.write_int64(3, self.to_rev_id)?; os.write_int64(3, self.end)?;
} }
os.write_unknown_fields(self.get_unknown_fields())?; os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(()) ::std::result::Result::Ok(())
@ -704,14 +704,14 @@ impl ::protobuf::Message for RevisionRange {
|m: &mut RevisionRange| { &mut m.doc_id }, |m: &mut RevisionRange| { &mut m.doc_id },
)); ));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
"from_rev_id", "start",
|m: &RevisionRange| { &m.from_rev_id }, |m: &RevisionRange| { &m.start },
|m: &mut RevisionRange| { &mut m.from_rev_id }, |m: &mut RevisionRange| { &mut m.start },
)); ));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
"to_rev_id", "end",
|m: &RevisionRange| { &m.to_rev_id }, |m: &RevisionRange| { &m.end },
|m: &mut RevisionRange| { &mut m.to_rev_id }, |m: &mut RevisionRange| { &mut m.end },
)); ));
::protobuf::reflect::MessageDescriptor::new_pb_name::<RevisionRange>( ::protobuf::reflect::MessageDescriptor::new_pb_name::<RevisionRange>(
"RevisionRange", "RevisionRange",
@ -730,8 +730,8 @@ impl ::protobuf::Message for RevisionRange {
impl ::protobuf::Clear for RevisionRange { impl ::protobuf::Clear for RevisionRange {
fn clear(&mut self) { fn clear(&mut self) {
self.doc_id.clear(); self.doc_id.clear();
self.from_rev_id = 0; self.start = 0;
self.to_rev_id = 0; self.end = 0;
self.unknown_fields.clear(); self.unknown_fields.clear();
} }
} }
@ -804,48 +804,48 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\ \x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\
evId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\n\ evId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\n\
\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01(\ \x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01(\
\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"b\ \tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"N\
\n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\ \n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\
\x1e\n\x0bfrom_rev_id\x18\x02\x20\x01(\x03R\tfromRevId\x12\x1a\n\tto_rev\ \x14\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\
_id\x18\x03\x20\x01(\x03R\x07toRevId*\x20\n\x07RevType\x12\t\n\x05Local\ \x20\x01(\x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\
\x10\0\x12\n\n\x06Remote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\ \x06Remote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\ \x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\n\x03\x04\0\
\n\x03\x04\0\x01\x12\x03\x02\x08\r\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\ \x01\x12\x03\x02\x08\r\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x14\n\x0c\
\x04\x14\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\ \n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\
\x02\0\x01\x12\x03\x03\n\x0f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x12\ \x03\x03\n\x0f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x12\x13\n\n\n\x02\
\x13\n\n\n\x02\x04\x01\x12\x04\x05\0\x0c\x01\n\n\n\x03\x04\x01\x01\x12\ \x04\x01\x12\x04\x05\0\x0c\x01\n\n\n\x03\x04\x01\x01\x12\x03\x05\x08\x10\
\x03\x05\x08\x10\n\x0b\n\x04\x04\x01\x02\0\x12\x03\x06\x04\x1a\n\x0c\n\ \n\x0b\n\x04\x04\x01\x02\0\x12\x03\x06\x04\x1a\n\x0c\n\x05\x04\x01\x02\0\
\x05\x04\x01\x02\0\x05\x12\x03\x06\x04\t\n\x0c\n\x05\x04\x01\x02\0\x01\ \x05\x12\x03\x06\x04\t\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x06\n\x15\n\
\x12\x03\x06\n\x15\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x06\x18\x19\n\ \x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x06\x18\x19\n\x0b\n\x04\x04\x01\x02\
\x0b\n\x04\x04\x01\x02\x01\x12\x03\x07\x04\x15\n\x0c\n\x05\x04\x01\x02\ \x01\x12\x03\x07\x04\x15\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x07\x04\
\x01\x05\x12\x03\x07\x04\t\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x07\n\ \t\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x07\n\x10\n\x0c\n\x05\x04\x01\
\x10\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x07\x13\x14\n\x0b\n\x04\x04\ \x02\x01\x03\x12\x03\x07\x13\x14\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x08\
\x01\x02\x02\x12\x03\x08\x04\x19\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\ \x04\x19\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x08\x04\t\n\x0c\n\x05\
\x08\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x08\n\x14\n\x0c\n\x05\ \x04\x01\x02\x02\x01\x12\x03\x08\n\x14\n\x0c\n\x05\x04\x01\x02\x02\x03\
\x04\x01\x02\x02\x03\x12\x03\x08\x17\x18\n\x0b\n\x04\x04\x01\x02\x03\x12\ \x12\x03\x08\x17\x18\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\t\x04\x13\n\x0c\
\x03\t\x04\x13\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\t\x04\n\n\x0c\n\ \n\x05\x04\x01\x02\x03\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x03\
\x05\x04\x01\x02\x03\x01\x12\x03\t\x0b\x0e\n\x0c\n\x05\x04\x01\x02\x03\ \x01\x12\x03\t\x0b\x0e\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\t\x11\x12\
\x03\x12\x03\t\x11\x12\n\x0b\n\x04\x04\x01\x02\x04\x12\x03\n\x04\x16\n\ \n\x0b\n\x04\x04\x01\x02\x04\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\
\x0c\n\x05\x04\x01\x02\x04\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\ \x04\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\x04\x01\x12\x03\n\x0b\
\x04\x01\x12\x03\n\x0b\x11\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\n\x14\ \x11\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\
\x15\n\x0b\n\x04\x04\x01\x02\x05\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\ \x01\x02\x05\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\x02\x05\x06\x12\x03\
\x02\x05\x06\x12\x03\x0b\x04\x0b\n\x0c\n\x05\x04\x01\x02\x05\x01\x12\x03\ \x0b\x04\x0b\n\x0c\n\x05\x04\x01\x02\x05\x01\x12\x03\x0b\x0c\x0e\n\x0c\n\
\x0b\x0c\x0e\n\x0c\n\x05\x04\x01\x02\x05\x03\x12\x03\x0b\x11\x12\n\n\n\ \x05\x04\x01\x02\x05\x03\x12\x03\x0b\x11\x12\n\n\n\x02\x04\x02\x12\x04\r\
\x02\x04\x02\x12\x04\r\0\x11\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x15\ \0\x11\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x15\n\x0b\n\x04\x04\x02\
\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\ \x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\
\x05\x12\x03\x0e\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\ \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\
\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\ \x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\
\x02\x01\x12\x03\x0f\x04\x1a\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x0f\ \x04\x14\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\
\x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\x0f\n\x15\n\x0c\n\x05\x04\ \x04\x02\x02\x01\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\
\x02\x02\x01\x03\x12\x03\x0f\x18\x19\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\ \x12\x03\x0f\x12\x13\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x10\x04\x12\n\
\x10\x04\x18\n\x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x10\x04\t\n\x0c\n\ \x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x10\x04\t\n\x0c\n\x05\x04\x02\x02\
\x05\x04\x02\x02\x02\x01\x12\x03\x10\n\x13\n\x0c\n\x05\x04\x02\x02\x02\ \x02\x01\x12\x03\x10\n\r\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x10\x10\
\x03\x12\x03\x10\x16\x17\n\n\n\x02\x05\0\x12\x04\x12\0\x15\x01\n\n\n\x03\ \x11\n\n\n\x02\x05\0\x12\x04\x12\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\
\x05\0\x01\x12\x03\x12\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x13\x04\ \x12\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x13\x04\x0e\n\x0c\n\x05\x05\
\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x13\x04\t\n\x0c\n\x05\x05\0\x02\ \0\x02\0\x01\x12\x03\x13\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x13\
\0\x02\x12\x03\x13\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x14\x04\x0f\n\ \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x14\x04\x0f\n\x0c\n\x05\x05\0\
\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x14\x04\n\n\x0c\n\x05\x05\0\x02\x01\ \x02\x01\x01\x12\x03\x14\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x14\
\x02\x12\x03\x14\r\x0eb\x06proto3\ \r\x0eb\x06proto3\
"; ";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View file

@ -13,8 +13,8 @@ message Revision {
} }
message RevisionRange { message RevisionRange {
string doc_id = 1; string doc_id = 1;
int64 from_rev_id = 2; int64 start = 2;
int64 to_rev_id = 3; int64 end = 3;
} }
enum RevType { enum RevType {
Local = 0; Local = 0;

View file

@ -10,7 +10,7 @@ use crate::{
edit::{ edit::{
doc_actor::DocumentActor, doc_actor::DocumentActor,
message::{DocumentMsg, TransformDeltas}, message::{DocumentMsg, TransformDeltas},
model::NotifyOpenDocAction, model::OpenDocAction,
}, },
revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor}, revision::{DocRevision, RevisionCmd, RevisionManager, RevisionServer, RevisionStoreActor},
UndoResult, UndoResult,
@ -71,8 +71,8 @@ impl ClientEditDoc {
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes(); let delta = rx.await.map_err(internal_error)??;
let rev_id = self.mk_revision(&delta_data).await?; let rev_id = self.mk_revision(delta).await?;
save_document(self.document.clone(), rev_id.into()).await save_document(self.document.clone(), rev_id.into()).await
} }
@ -80,8 +80,8 @@ impl ClientEditDoc {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>(); let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = DocumentMsg::Delete { interval, ret }; let msg = DocumentMsg::Delete { interval, ret };
let _ = self.document.send(msg); let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes(); let delta = rx.await.map_err(internal_error)??;
let _ = self.mk_revision(&delta_data).await?; let _ = self.mk_revision(delta).await?;
Ok(()) Ok(())
} }
@ -93,8 +93,8 @@ impl ClientEditDoc {
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes(); let delta = rx.await.map_err(internal_error)??;
let _ = self.mk_revision(&delta_data).await?; let _ = self.mk_revision(delta).await?;
Ok(()) Ok(())
} }
@ -106,8 +106,8 @@ impl ClientEditDoc {
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes(); let delta = rx.await.map_err(internal_error)??;
let _ = self.mk_revision(&delta_data).await?; let _ = self.mk_revision(delta).await?;
Ok(()) Ok(())
} }
@ -151,14 +151,23 @@ impl ClientEditDoc {
}) })
} }
async fn mk_revision(&self, delta_data: &Bytes) -> Result<RevId, DocError> { #[tracing::instrument(level = "debug", skip(self, delta), fields(revision_delta = %delta.to_json(), send_state, base_rev_id, rev_id))]
async fn mk_revision(&self, delta: Delta) -> Result<RevId, DocError> {
let delta_data = delta.to_bytes();
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id(); let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
tracing::Span::current().record("base_rev_id", &base_rev_id);
tracing::Span::current().record("rev_id", &rev_id);
let delta_data = delta_data.to_vec(); let delta_data = delta_data.to_vec();
let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local); let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local);
let _ = self.rev_manager.add_revision(&revision).await?; let _ = self.rev_manager.add_revision(&revision).await?;
match self.ws.send(revision.into()) { match self.ws.send(revision.into()) {
Ok(_) => {}, Ok(_) => {
Err(e) => log::error!("Send delta failed: {:?}", e), tracing::Span::current().record("send_state", &"success");
},
Err(e) => {
tracing::Span::current().record("send_state", &format!("failed: {:?}", e).as_str());
},
}; };
Ok(rev_id.into()) Ok(rev_id.into())
@ -168,11 +177,14 @@ impl ClientEditDoc {
pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> { pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> {
let delta = Delta::from_bytes(&data)?; let delta = Delta::from_bytes(&data)?;
let (ret, rx) = oneshot::channel::<DocResult<()>>(); let (ret, rx) = oneshot::channel::<DocResult<()>>();
let msg = DocumentMsg::Delta { delta, ret }; let msg = DocumentMsg::Delta {
delta: delta.clone(),
ret,
};
let _ = self.document.send(msg); let _ = self.document.send(msg);
let _ = rx.await.map_err(internal_error)??; let _ = rx.await.map_err(internal_error)??;
let rev_id = self.mk_revision(&data).await?; let rev_id = self.mk_revision(delta).await?;
save_document(self.document.clone(), rev_id).await save_document(self.document.clone(), rev_id).await
} }
@ -189,7 +201,7 @@ impl ClientEditDoc {
let rev_id: RevId = self.rev_manager.rev_id().into(); let rev_id: RevId = self.rev_manager.rev_id().into();
if let Ok(user_id) = self.user.user_id() { if let Ok(user_id) = self.user.user_id() {
let action = NotifyOpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws); let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws);
let strategy = ExponentialBackoff::from_millis(50).take(3); let strategy = ExponentialBackoff::from_millis(50).take(3);
let retry = Retry::spawn(strategy, action); let retry = Retry::spawn(strategy, action);
tokio::spawn(async move { tokio::spawn(async move {
@ -228,11 +240,11 @@ impl ClientEditDoc {
// update rev id // update rev id
self.rev_manager.set_rev_id(server_rev_id.clone().into()); self.rev_manager.set_rev_id(server_rev_id.clone().into());
let (_, local_rev_id) = self.rev_manager.next_rev_id(); let (local_base_rev_id, local_rev_id) = self.rev_manager.next_rev_id();
// save the revision // save the revision
let revision = Revision::new( let revision = Revision::new(
server_rev_id.value, local_base_rev_id,
local_rev_id, local_rev_id,
client_prime.to_bytes().to_vec(), client_prime.to_bytes().to_vec(),
&self.doc_id, &self.doc_id,
@ -242,7 +254,7 @@ impl ClientEditDoc {
// send the server_prime delta // send the server_prime delta
let revision = Revision::new( let revision = Revision::new(
server_rev_id.value, local_base_rev_id,
local_rev_id, local_rev_id,
server_prime.to_bytes().to_vec(), server_prime.to_bytes().to_vec(),
&self.doc_id, &self.doc_id,

View file

@ -7,14 +7,14 @@ use flowy_infra::retry::Action;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use std::{future, sync::Arc}; use std::{future, sync::Arc};
pub(crate) struct NotifyOpenDocAction { pub(crate) struct OpenDocAction {
user_id: String, user_id: String,
rev_id: RevId, rev_id: RevId,
doc_id: String, doc_id: String,
ws: Arc<dyn DocumentWebSocket>, ws: Arc<dyn DocumentWebSocket>,
} }
impl NotifyOpenDocAction { impl OpenDocAction {
pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc<dyn DocumentWebSocket>) -> Self { pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc<dyn DocumentWebSocket>) -> Self {
Self { Self {
user_id: user_id.to_owned(), user_id: user_id.to_owned(),
@ -25,7 +25,7 @@ impl NotifyOpenDocAction {
} }
} }
impl Action for NotifyOpenDocAction { impl Action for OpenDocAction {
type Future = BoxFuture<'static, Result<Self::Item, Self::Error>>; type Future = BoxFuture<'static, Result<Self::Item, Self::Error>>;
type Item = (); type Item = ();
type Error = DocError; type Error = DocError;

View file

@ -84,8 +84,8 @@ impl RevisionManager {
let delta_data = new_delta.to_bytes(); let delta_data = new_delta.to_bytes();
let revision = Revision::new( let revision = Revision::new(
range.from_rev_id, range.start,
range.to_rev_id, range.end,
delta_data.to_vec(), delta_data.to_vec(),
&self.doc_id, &self.doc_id,
RevType::Remote, RevType::Remote,

View file

@ -153,14 +153,11 @@ impl RevisionStoreActor {
} }
async fn revs_in_range(&self, range: RevisionRange) -> DocResult<Vec<Revision>> { async fn revs_in_range(&self, range: RevisionRange) -> DocResult<Vec<Revision>> {
let iter_range = range.from_rev_id..=range.to_rev_id; let revs = range
let revs = iter_range .iter()
.flat_map(|rev_id| { .flat_map(|rev_id| match self.revs.get(&rev_id) {
//
match self.revs.get(&rev_id) {
None => None, None => None,
Some(rev) => Some((&*(*rev)).clone()), Some(rev) => Some((&*(*rev)).clone()),
}
}) })
.collect::<Vec<Revision>>(); .collect::<Vec<Revision>>();
@ -189,7 +186,7 @@ impl RevisionStoreActor {
let doc = self.server.fetch_document_from_remote(&self.doc_id).await?; let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
let revision = revision_from_doc(doc.clone(), RevType::Remote); let revision = revision_from_doc(doc.clone(), RevType::Remote);
self.handle_new_revision(revision); let _ = self.handle_new_revision(revision).await?;
Ok(doc) Ok(doc)
} }
} }

View file

@ -80,8 +80,8 @@ impl RevTableSql {
conn: &SqliteConnection, conn: &SqliteConnection,
) -> Result<Vec<Revision>, DocError> { ) -> Result<Vec<Revision>, DocError> {
let rev_tables = dsl::rev_table let rev_tables = dsl::rev_table
.filter(rev_id.ge(range.from_rev_id)) .filter(rev_id.ge(range.start))
.filter(rev_id.le(range.to_rev_id)) .filter(rev_id.le(range.end))
.filter(doc_id.eq(doc_id_s)) .filter(doc_id.eq(doc_id_s))
.order(rev_id.asc()) .order(rev_id.asc())
.load::<RevTable>(conn)?; .load::<RevTable>(conn)?;