diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index edf24ac920..e8384b58ec 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -3,16 +3,19 @@ use crate::{ doc::editor::ServerDocUser, util::{md5, parse_from_bytes}, }, - web_socket::{entities::Socket, WsClientData, WsUser}, + web_socket::WsClientData, }; use actix_rt::task::spawn_blocking; use actix_web::web::Data; use async_stream::stream; use backend_service::errors::{internal_error, Result, ServerError}; use flowy_collaboration::{ - core::sync::ServerDocManager, + core::sync::{RevisionUser, ServerDocManager, SyncResponse}, + entities::ws::DocumentWSDataBuilder, protobuf::{DocumentWSData, DocumentWSDataType}, }; + +use flowy_collaboration::protobuf::NewDocumentUser; use futures::stream::StreamExt; use lib_ot::protobuf::Revision; use sqlx::PgPool; @@ -66,7 +69,7 @@ impl DocWsActor { } } - async fn handle_client_data(&self, client_data: WsClientData, pool: Data) -> Result<()> { + async fn handle_client_data(&self, client_data: WsClientData, pg_pool: Data) -> Result<()> { let WsClientData { user, socket, data } = client_data; let document_data = spawn_blocking(move || { let document_data: DocumentWSData = parse_from_bytes(&data)?; @@ -75,23 +78,29 @@ impl DocWsActor { .await .map_err(internal_error)??; - let data = document_data.data; - - match document_data.ty { - DocumentWSDataType::Acked => Ok(()), - DocumentWSDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await, + let user = Arc::new(ServerDocUser { user, socket, pg_pool }); + match &document_data.ty { + DocumentWSDataType::Ack => Ok(()), + DocumentWSDataType::PushRev => self.handle_pushed_rev(user, document_data.data).await, DocumentWSDataType::PullRev => Ok(()), - DocumentWSDataType::UserConnect => Ok(()), + DocumentWSDataType::UserConnect => self.handle_user_connect(user, document_data).await, } } - async fn apply_pushed_rev( - &self, - user: Arc, - socket: Socket, - data: Vec, - pg_pool: Data, - ) -> Result<()> { + async fn handle_user_connect(&self, user: Arc, document_data: DocumentWSData) -> Result<()> { + let id = document_data.id.clone(); + let new_user = spawn_blocking(move || parse_from_bytes::(&document_data.data)) + .await + .map_err(internal_error)??; + + user.recv(SyncResponse::Ack(DocumentWSDataBuilder::build_ack_message( + &new_user.doc_id, + &id, + ))); + Ok(()) + } + + async fn handle_pushed_rev(&self, user: Arc, data: Vec) -> Result<()> { let mut revision_pb = spawn_blocking(move || { let revision: Revision = parse_from_bytes(&data)?; let _ = verify_md5(&revision)?; @@ -110,7 +119,6 @@ impl DocWsActor { Some(handler) => handler, }; - let user = Arc::new(ServerDocUser { user, socket, pg_pool }); handler.apply_revision(user, revision).await.map_err(internal_error)?; Ok(()) } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart index 53eff22b3b..b38a10b674 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart @@ -14,22 +14,12 @@ import 'ws.pbenum.dart'; export 'ws.pbenum.dart'; -enum DocumentWSData_OneOfId { - id, - notSet -} - class DocumentWSData extends $pb.GeneratedMessage { - static const $core.Map<$core.int, DocumentWSData_OneOfId> _DocumentWSData_OneOfIdByTag = { - 4 : DocumentWSData_OneOfId.id, - 0 : DocumentWSData_OneOfId.notSet - }; static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentWSData', createEmptyInstance: create) - ..oo(0, [4]) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') - ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentWSDataType.Acked, valueOf: DocumentWSDataType.valueOf, enumValues: DocumentWSDataType.values) + ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentWSDataType.Ack, valueOf: DocumentWSDataType.valueOf, enumValues: DocumentWSDataType.values) ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) - ..aInt64(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') + ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') ..hasRequiredFields = false ; @@ -38,7 +28,7 @@ class DocumentWSData extends $pb.GeneratedMessage { $core.String? docId, DocumentWSDataType? ty, $core.List<$core.int>? data, - $fixnum.Int64? id, + $core.String? id, }) { final _result = create(); if (docId != null) { @@ -76,9 +66,6 @@ class DocumentWSData extends $pb.GeneratedMessage { static DocumentWSData getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); static DocumentWSData? _defaultInstance; - DocumentWSData_OneOfId whichOneOfId() => _DocumentWSData_OneOfIdByTag[$_whichOneof(0)]!; - void clearOneOfId() => clearField($_whichOneof(0)); - @$pb.TagNumber(1) $core.String get docId => $_getSZ(0); @$pb.TagNumber(1) @@ -107,25 +94,25 @@ class DocumentWSData extends $pb.GeneratedMessage { void clearData() => clearField(3); @$pb.TagNumber(4) - $fixnum.Int64 get id => $_getI64(3); + $core.String get id => $_getSZ(3); @$pb.TagNumber(4) - set id($fixnum.Int64 v) { $_setInt64(3, v); } + set id($core.String v) { $_setString(3, v); } @$pb.TagNumber(4) $core.bool hasId() => $_has(3); @$pb.TagNumber(4) void clearId() => clearField(4); } -class DocumentConnected extends $pb.GeneratedMessage { - static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentConnected', createEmptyInstance: create) +class NewDocumentUser extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'NewDocumentUser', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'userId') ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId') ..hasRequiredFields = false ; - DocumentConnected._() : super(); - factory DocumentConnected({ + NewDocumentUser._() : super(); + factory NewDocumentUser({ $core.String? userId, $core.String? docId, $fixnum.Int64? revId, @@ -142,26 +129,26 @@ class DocumentConnected extends $pb.GeneratedMessage { } return _result; } - factory DocumentConnected.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory DocumentConnected.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + factory NewDocumentUser.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory NewDocumentUser.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' 'Will be removed in next major version') - DocumentConnected clone() => DocumentConnected()..mergeFromMessage(this); + NewDocumentUser clone() => NewDocumentUser()..mergeFromMessage(this); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' 'Will be removed in next major version') - DocumentConnected copyWith(void Function(DocumentConnected) updates) => super.copyWith((message) => updates(message as DocumentConnected)) as DocumentConnected; // ignore: deprecated_member_use + NewDocumentUser copyWith(void Function(NewDocumentUser) updates) => super.copyWith((message) => updates(message as NewDocumentUser)) as NewDocumentUser; // ignore: deprecated_member_use $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static DocumentConnected create() => DocumentConnected._(); - DocumentConnected createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); + static NewDocumentUser create() => NewDocumentUser._(); + NewDocumentUser createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); @$core.pragma('dart2js:noInline') - static DocumentConnected getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static DocumentConnected? _defaultInstance; + static NewDocumentUser getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static NewDocumentUser? _defaultInstance; @$pb.TagNumber(1) $core.String get userId => $_getSZ(0); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart index 83be5c1784..43dc8c25ba 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart @@ -10,13 +10,13 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; class DocumentWSDataType extends $pb.ProtobufEnum { - static const DocumentWSDataType Acked = DocumentWSDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); + static const DocumentWSDataType Ack = DocumentWSDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Ack'); static const DocumentWSDataType PushRev = DocumentWSDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev'); static const DocumentWSDataType PullRev = DocumentWSDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev'); static const DocumentWSDataType UserConnect = DocumentWSDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserConnect'); static const $core.List values = [ - Acked, + Ack, PushRev, PullRev, UserConnect, diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart index 2bcc352adf..2ee4eac6a7 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart @@ -12,7 +12,7 @@ import 'dart:typed_data' as $typed_data; const DocumentWSDataType$json = const { '1': 'DocumentWSDataType', '2': const [ - const {'1': 'Acked', '2': 0}, + const {'1': 'Ack', '2': 0}, const {'1': 'PushRev', '2': 1}, const {'1': 'PullRev', '2': 2}, const {'1': 'UserConnect', '2': 3}, @@ -20,7 +20,7 @@ const DocumentWSDataType$json = const { }; /// Descriptor for `DocumentWSDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List documentWSDataTypeDescriptor = $convert.base64Decode('ChJEb2N1bWVudFdTRGF0YVR5cGUSCQoFQWNrZWQQABILCgdQdXNoUmV2EAESCwoHUHVsbFJldhACEg8KC1VzZXJDb25uZWN0EAM='); +final $typed_data.Uint8List documentWSDataTypeDescriptor = $convert.base64Decode('ChJEb2N1bWVudFdTRGF0YVR5cGUSBwoDQWNrEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIPCgtVc2VyQ29ubmVjdBAD'); @$core.Deprecated('Use documentWSDataDescriptor instead') const DocumentWSData$json = const { '1': 'DocumentWSData', @@ -28,18 +28,15 @@ const DocumentWSData$json = const { const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentWSDataType', '10': 'ty'}, const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, - const {'1': 'id', '3': 4, '4': 1, '5': 3, '9': 0, '10': 'id'}, - ], - '8': const [ - const {'1': 'one_of_id'}, + const {'1': 'id', '3': 4, '4': 1, '5': 9, '10': 'id'}, ], }; /// Descriptor for `DocumentWSData`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List documentWSDataDescriptor = $convert.base64Decode('Cg5Eb2N1bWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEiMKAnR5GAIgASgOMhMuRG9jdW1lbnRXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEhAKAmlkGAQgASgDSABSAmlkQgsKCW9uZV9vZl9pZA=='); -@$core.Deprecated('Use documentConnectedDescriptor instead') -const DocumentConnected$json = const { - '1': 'DocumentConnected', +final $typed_data.Uint8List documentWSDataDescriptor = $convert.base64Decode('Cg5Eb2N1bWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEiMKAnR5GAIgASgOMhMuRG9jdW1lbnRXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEg4KAmlkGAQgASgJUgJpZA=='); +@$core.Deprecated('Use newDocumentUserDescriptor instead') +const NewDocumentUser$json = const { + '1': 'NewDocumentUser', '2': const [ const {'1': 'user_id', '3': 1, '4': 1, '5': 9, '10': 'userId'}, const {'1': 'doc_id', '3': 2, '4': 1, '5': 9, '10': 'docId'}, @@ -47,5 +44,5 @@ const DocumentConnected$json = const { ], }; -/// Descriptor for `DocumentConnected`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List documentConnectedDescriptor = $convert.base64Decode('ChFEb2N1bWVudENvbm5lY3RlZBIXCgd1c2VyX2lkGAEgASgJUgZ1c2VySWQSFQoGZG9jX2lkGAIgASgJUgVkb2NJZBIVCgZyZXZfaWQYAyABKANSBXJldklk'); +/// Descriptor for `NewDocumentUser`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List newDocumentUserDescriptor = $convert.base64Decode('Cg9OZXdEb2N1bWVudFVzZXISFwoHdXNlcl9pZBgBIAEoCVIGdXNlcklkEhUKBmRvY19pZBgCIAEoCVIFZG9jSWQSFQoGcmV2X2lkGAMgASgDUgVyZXZJZA=='); diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 5aa7698eb9..f179507f58 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -4,7 +4,7 @@ use flowy_collaboration::{ core::document::history::UndoResult, entities::{ doc::DocDelta, - ws::{DocumentConnected, DocumentWSData, DocumentWSDataType, WsDocumentDataBuilder}, + ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser}, }, errors::CollaborateResult, }; @@ -42,27 +42,32 @@ impl ClientDocEditor { let doc_id = doc_id.to_string(); let user_id = user.user_id()?; let rev_manager = Arc::new(rev_manager); - let sink_data_provider = Arc::new(RwLock::new(VecDeque::new())); - let data_provider = Arc::new(DocumentSinkDataProviderAdapter { - rev_manager: rev_manager.clone(), - data_provider: sink_data_provider.clone(), - }); - let stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + let combined_sink = Arc::new(CombinedSink::new(rev_manager.clone())); + let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { doc_id: doc_id.clone(), editor_cmd_sender: editor_cmd_sender.clone(), rev_manager: rev_manager.clone(), user: user.clone(), - sink_data_provider: sink_data_provider.clone(), + combined_sink: combined_sink.clone(), }); - let editor_ws = Arc::new(EditorWebSocket::new(&doc_id, ws, data_provider, stream_consumer)); - notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await; + let ws_stream_provider = Arc::new(DocumentWSSinkDataProviderAdapter(combined_sink.clone())); + let editor_ws = Arc::new(EditorWebSocket::new( + &doc_id, + ws, + ws_stream_provider, + ws_stream_consumer, + )); + // + notify_user_conn(&user_id, &doc_id, rev_manager.clone(), combined_sink.clone()).await; + + // listen_document_ws_state( &user_id, &doc_id, editor_ws.scribe_state(), rev_manager.clone(), - sink_data_provider, + combined_sink, ); let editor = Arc::new(Self { @@ -210,78 +215,12 @@ fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc, - rev_manager: Arc, - user: Arc, - sink_data_provider: SinkDataProvider, -} - -impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { - fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { - let user = self.user.clone(); - let rev_manager = self.rev_manager.clone(); - let edit_cmd_tx = self.editor_cmd_sender.clone(); - let sink_data_provider = self.sink_data_provider.clone(); - let doc_id = self.doc_id.clone(); - FutureResult::new(async move { - let user_id = user.user_id()?; - if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? { - sink_data_provider.write().await.push_back(revision.into()); - } - Ok(()) - }) - } - - fn receive_ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError> { - let rev_manager = self.rev_manager.clone(); - FutureResult::new(async move { - let _ = rev_manager.ack_revision(rev_id).await?; - Ok(()) - }) - } - - fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { - let rev_manager = self.rev_manager.clone(); - let sink_data_provider = self.sink_data_provider.clone(); - FutureResult::new(async move { - let revision = rev_manager.mk_revisions(range).await?; - sink_data_provider.write().await.push_back(revision.into()); - Ok(()) - }) - } -} - -async fn notify_user_conn( - user_id: &str, - doc_id: &str, - rev_manager: Arc, - sink_data_provider: SinkDataProvider, -) { - let need_notify = match sink_data_provider.read().await.front() { - None => true, - Some(data) => data.ty != DocumentWSDataType::UserConnect, - }; - - if need_notify { - let document_conn = DocumentConnected { - user_id: user_id.to_owned(), - doc_id: doc_id.to_owned(), - rev_id: rev_manager.latest_rev_id(), - }; - - let data = WsDocumentDataBuilder::build_document_conn_message(doc_id, document_conn); - sink_data_provider.write().await.push_front(data); - } -} - fn listen_document_ws_state( user_id: &str, doc_id: &str, mut subscriber: broadcast::Receiver, rev_manager: Arc, - sink_data_provider: SinkDataProvider, + sink_data_provider: Arc, ) { let user_id = user_id.to_owned(); let doc_id = doc_id.to_owned(); @@ -301,38 +240,79 @@ fn listen_document_ws_state( }); } -type SinkDataProvider = Arc>>; - -struct DocumentSinkDataProviderAdapter { +async fn notify_user_conn( + user_id: &str, + doc_id: &str, rev_manager: Arc, - data_provider: SinkDataProvider, + combined_sink: Arc, +) { + let need_notify = match combined_sink.front().await { + None => true, + Some(data) => data.ty != DocumentWSDataType::UserConnect, + }; + + if need_notify { + let new_connect = NewDocumentUser { + user_id: user_id.to_owned(), + doc_id: doc_id.to_owned(), + rev_id: rev_manager.latest_rev_id(), + }; + + let data = DocumentWSDataBuilder::build_new_document_user_message(doc_id, new_connect); + combined_sink.push_front(data).await; + } } -impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter { - fn next(&self) -> FutureResult, FlowyError> { - let rev_manager = self.rev_manager.clone(); - let data_provider = self.data_provider.clone(); +struct DocumentWebSocketSteamConsumerAdapter { + doc_id: String, + editor_cmd_sender: UnboundedSender, + rev_manager: Arc, + user: Arc, + combined_sink: Arc, +} +impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { + let user = self.user.clone(); + let rev_manager = self.rev_manager.clone(); + let edit_cmd_tx = self.editor_cmd_sender.clone(); + let combined_sink = self.combined_sink.clone(); + let doc_id = self.doc_id.clone(); FutureResult::new(async move { - if data_provider.read().await.is_empty() { - match rev_manager.next_sync_revision().await? { - Some(rev) => { - tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); - Ok(Some(rev.into())) - }, - None => Ok(None), - } - } else { - match data_provider.read().await.front() { - None => Ok(None), - Some(data) => { - tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); - Ok(Some(data.clone())) - }, - } + let user_id = user.user_id()?; + if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? { + combined_sink.push_back(revision.into()).await; } + Ok(()) }) } + + fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError> { + let combined_sink = self.combined_sink.clone(); + FutureResult::new(async move { combined_sink.ack(id, ty).await }) + } + + fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { + FutureResult::new(async move { Ok(()) }) + } + + fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { + let rev_manager = self.rev_manager.clone(); + let combined_sink = self.combined_sink.clone(); + FutureResult::new(async move { + let revision = rev_manager.mk_revisions(range).await?; + combined_sink.push_back(revision.into()).await; + Ok(()) + }) + } +} + +struct DocumentWSSinkDataProviderAdapter(Arc); +impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { + fn next(&self) -> FutureResult, FlowyError> { + let combined_sink = self.0.clone(); + FutureResult::new(async move { combined_sink.next().await }) + } } #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] @@ -396,6 +376,101 @@ pub(crate) async fn handle_push_rev( ))) } +#[derive(Clone)] +enum SourceType { + Shared, + Revision, +} + +#[derive(Clone)] +struct CombinedSink { + shared: Arc>>, + rev_manager: Arc, + source_ty: Arc>, +} + +impl CombinedSink { + fn new(rev_manager: Arc) -> Self { + CombinedSink { + shared: Arc::new(RwLock::new(VecDeque::new())), + rev_manager, + source_ty: Arc::new(RwLock::new(SourceType::Shared)), + } + } + + // FIXME: return Option<&DocumentWSData> would be better + async fn front(&self) -> Option { self.shared.read().await.front().cloned() } + + async fn push_front(&self, data: DocumentWSData) { self.shared.write().await.push_front(data); } + + async fn push_back(&self, data: DocumentWSData) { self.shared.write().await.push_back(data); } + + async fn next(&self) -> FlowyResult> { + let source_ty = self.source_ty.read().await.clone(); + match source_ty { + SourceType::Shared => match self.shared.read().await.front() { + None => { + *self.source_ty.write().await = SourceType::Revision; + Ok(None) + }, + Some(data) => { + tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); + Ok(Some(data.clone())) + }, + }, + SourceType::Revision => { + if !self.shared.read().await.is_empty() { + *self.source_ty.write().await = SourceType::Shared; + return Ok(None); + } + + match self.rev_manager.next_sync_revision().await? { + Some(rev) => { + tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); + Ok(Some(rev.into())) + }, + None => Ok(None), + } + }, + } + } + + async fn ack(&self, id: String, _ty: DocumentWSDataType) -> FlowyResult<()> { + // let _ = self.rev_manager.ack_revision(id).await?; + let source_ty = self.source_ty.read().await.clone(); + match source_ty { + SourceType::Shared => { + let should_pop = match self.shared.read().await.front() { + None => false, + Some(val) => { + if val.id == id { + true + } else { + tracing::error!("The front element's {} is not equal to the {}", val.id, id); + false + } + }, + }; + if should_pop { + let _ = self.shared.write().await.pop_front(); + } + }, + SourceType::Revision => { + match id.parse::() { + Ok(rev_id) => { + let _ = self.rev_manager.ack_revision(rev_id).await?; + }, + Err(e) => { + tracing::error!("Parse rev_id from {} failed. {}", id, e); + }, + }; + }, + } + + Ok(()) + } +} + #[cfg(feature = "flowy_unit_test")] impl ClientDocEditor { pub async fn doc_json(&self) -> FlowyResult { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs index 1f8ba327f1..d4315b230f 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs @@ -1,11 +1,11 @@ use crate::services::doc::{DocumentWebSocket, DocumentWsHandler, SYNC_INTERVAL_IN_MILLIS}; use async_stream::stream; use bytes::Bytes; -use flowy_collaboration::entities::ws::{DocumentWSData, DocumentWSDataType}; +use flowy_collaboration::entities::ws::{DocumentWSData, DocumentWSDataType, NewDocumentUser}; use flowy_error::{internal_error, FlowyError, FlowyResult}; use futures::stream::StreamExt; use lib_infra::future::FutureResult; -use lib_ot::revision::{RevId, RevisionRange}; +use lib_ot::revision::RevisionRange; use lib_ws::WSConnectState; use std::{convert::TryFrom, sync::Arc}; use tokio::{ @@ -20,8 +20,8 @@ use tokio::{ pub(crate) struct EditorWebSocket { doc_id: String, - data_provider: Arc, - stream_consumer: Arc, + data_provider: Arc, + stream_consumer: Arc, ws: Arc, ws_msg_tx: UnboundedSender, ws_msg_rx: Option>, @@ -33,8 +33,8 @@ impl EditorWebSocket { pub(crate) fn new( doc_id: &str, ws: Arc, - data_provider: Arc, - stream_consumer: Arc, + data_provider: Arc, + stream_consumer: Arc, ) -> Self { let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); @@ -97,15 +97,16 @@ impl DocumentWsHandler for EditorWebSocket { } } -pub trait DocumentWebSocketSteamConsumer: Send + Sync { +pub trait DocumentWSSteamConsumer: Send + Sync { fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>; - fn receive_ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError>; + fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError>; + fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>; fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>; } pub(crate) struct DocumentWebSocketStream { doc_id: String, - consumer: Arc, + consumer: Arc, ws_msg_rx: Option>, stop_rx: Option, } @@ -113,7 +114,7 @@ pub(crate) struct DocumentWebSocketStream { impl DocumentWebSocketStream { pub(crate) fn new( doc_id: &str, - consumer: Arc, + consumer: Arc, ws_msg_rx: mpsc::UnboundedReceiver, stop_rx: SinkStopRx, ) -> Self { @@ -166,7 +167,7 @@ impl DocumentWebSocketStream { doc_id: _, ty, data, - id: _, + id, } = msg; let bytes = spawn_blocking(move || Bytes::from(data)) .await @@ -181,11 +182,13 @@ impl DocumentWebSocketStream { let range = RevisionRange::try_from(bytes)?; let _ = self.consumer.send_revision_in_range(range).await?; }, - DocumentWSDataType::Acked => { - let rev_id = RevId::try_from(bytes)?; - let _ = self.consumer.receive_ack_revision(rev_id.into()).await; + DocumentWSDataType::Ack => { + // let rev_id = RevId::try_from(bytes)?; + let _ = self.consumer.receive_ack(id, ty).await; }, DocumentWSDataType::UserConnect => { + let new_user = NewDocumentUser::try_from(bytes)?; + let _ = self.consumer.receive_new_user_connect(new_user).await; // Notify the user that someone has connected to this document }, } @@ -198,12 +201,12 @@ pub(crate) type Tick = (); pub(crate) type SinkStopRx = broadcast::Receiver<()>; pub(crate) type SinkStopTx = broadcast::Sender<()>; -pub trait DocumentSinkDataProvider: Send + Sync { +pub trait DocumentWSSinkDataProvider: Send + Sync { fn next(&self) -> FutureResult, FlowyError>; } pub(crate) struct DocumentWebSocketSink { - provider: Arc, + provider: Arc, ws_sender: Arc, stop_rx: Option, doc_id: String, @@ -212,7 +215,7 @@ pub(crate) struct DocumentWebSocketSink { impl DocumentWebSocketSink { pub(crate) fn new( doc_id: &str, - provider: Arc, + provider: Arc, ws_sender: Arc, stop_rx: SinkStopRx, ) -> Self { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs index b3d12b56ae..267845f6ab 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs @@ -1,24 +1,27 @@ use crate::{ errors::FlowyError, - services::doc::revision::{ - cache::{ - disk::{Persistence, RevisionDiskCache}, - memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, - sync::RevisionSyncSeq, - }, - RevisionRecord, + services::doc::revision::cache::{ + disk::{Persistence, RevisionDiskCache}, + memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, }, sql_tables::{RevChangeset, RevTableState}, }; +use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyResult}; use lib_infra::future::FutureResult; -use lib_ot::revision::{RevState, Revision, RevisionRange}; -use std::sync::{ - atomic::{AtomicI64, Ordering::SeqCst}, - Arc, +use lib_ot::{ + errors::OTError, + revision::{RevState, Revision, RevisionRange}, }; -use tokio::task::spawn_blocking; +use std::{ + collections::VecDeque, + sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, + }, +}; +use tokio::{sync::RwLock, task::spawn_blocking}; type DocRevisionDiskCache = dyn RevisionDiskCache; @@ -157,9 +160,79 @@ impl RevisionMemoryCacheDelegate for Arc { } } -#[cfg(feature = "flowy_unit_test")] -impl RevisionCache { - pub fn disk_cache(&self) -> Arc { self.disk_cache.clone() } - - pub fn memory_cache(&self) -> Arc { self.sync_seq.clone() } +#[derive(Clone)] +pub struct RevisionRecord { + pub revision: Revision, + pub state: RevState, +} + +impl RevisionRecord { + pub fn ack(&mut self) { self.state = RevState::Acked; } +} + +struct RevisionSyncSeq { + revs_map: Arc>, + local_revs: Arc>>, +} + +impl std::default::Default for RevisionSyncSeq { + fn default() -> Self { + let local_revs = Arc::new(RwLock::new(VecDeque::new())); + RevisionSyncSeq { + revs_map: Arc::new(DashMap::new()), + local_revs, + } + } +} + +impl RevisionSyncSeq { + fn new() -> Self { RevisionSyncSeq::default() } + + async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> { + // The last revision's rev_id must be greater than the new one. + if let Some(rev_id) = self.local_revs.read().await.back() { + if *rev_id >= record.revision.rev_id { + return Err(OTError::revision_id_conflict() + .context(format!("The new revision's id must be greater than {}", rev_id))); + } + } + self.local_revs.write().await.push_back(record.revision.rev_id); + self.revs_map.insert(record.revision.rev_id, record); + Ok(()) + } + + async fn ack_revision(&self, rev_id: &i64) -> FlowyResult<()> { + if let Some(pop_rev_id) = self.next_sync_rev_id().await { + if &pop_rev_id != rev_id { + let desc = format!( + "The ack rev_id:{} is not equal to the current rev_id:{}", + rev_id, pop_rev_id + ); + // tracing::error!("{}", desc); + return Err(FlowyError::internal().context(desc)); + } + + tracing::debug!("pop revision {}", pop_rev_id); + self.revs_map.remove(&pop_rev_id); + let _ = self.local_revs.write().await.pop_front(); + } + Ok(()) + } + + async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> { + match self.local_revs.read().await.front() { + None => None, + Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), + } + } + + async fn next_sync_rev_id(&self) -> Option { self.local_revs.read().await.front().copied() } +} + +#[cfg(feature = "flowy_unit_test")] +impl RevisionSyncSeq { + #[allow(dead_code)] + pub fn revs_map(&self) -> Arc> { self.revs_map.clone() } + #[allow(dead_code)] + pub fn pending_revs(&self) -> Arc>> { self.local_revs.clone() } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs index 916b334e4c..431c0af4c7 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs @@ -2,8 +2,5 @@ mod cache; mod disk; mod memory; -mod model; -mod sync; pub use cache::*; -pub use model::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/model.rs deleted file mode 100644 index f19b351fd9..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/model.rs +++ /dev/null @@ -1,15 +0,0 @@ -use lib_ot::revision::{RevState, Revision}; -use tokio::sync::broadcast; - -pub type RevIdReceiver = broadcast::Receiver; -pub type RevIdSender = broadcast::Sender; - -#[derive(Clone)] -pub struct RevisionRecord { - pub revision: Revision, - pub state: RevState, -} - -impl RevisionRecord { - pub fn ack(&mut self) { self.state = RevState::Acked; } -} diff --git a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs index ea06008889..45b405b1f9 100644 --- a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs +++ b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs @@ -5,7 +5,7 @@ use flowy_collaboration::{ core::sync::{RevisionUser, ServerDocManager, ServerDocPersistence, SyncResponse}, entities::{ doc::Doc, - ws::{DocumentWSData, DocumentWSDataType}, + ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser}, }, errors::CollaborateError, Revision, @@ -111,7 +111,7 @@ impl MockDocServer { async fn handle_ws_data(&self, ws_data: DocumentWSData) -> mpsc::Receiver { let bytes = Bytes::from(ws_data.data); match ws_data.ty { - DocumentWSDataType::Acked => { + DocumentWSDataType::Ack => { unimplemented!() }, DocumentWSDataType::PushRev => { @@ -133,7 +133,16 @@ impl MockDocServer { unimplemented!() }, DocumentWSDataType::UserConnect => { - unimplemented!() + let new_user = NewDocumentUser::try_from(bytes).unwrap(); + let (tx, rx) = mpsc::channel(1); + let data = DocumentWSDataBuilder::build_ack_message(&new_user.doc_id, &ws_data.id); + let user = Arc::new(MockDocUser { + user_id: new_user.user_id, + tx, + }) as Arc; + + user.recv(SyncResponse::Ack(data)); + rx }, } } diff --git a/frontend/rust-lib/flowy-test/src/doc_script.rs b/frontend/rust-lib/flowy-test/src/doc_script.rs index d5cb59825a..984534a5f9 100644 --- a/frontend/rust-lib/flowy-test/src/doc_script.rs +++ b/frontend/rust-lib/flowy-test/src/doc_script.rs @@ -46,8 +46,6 @@ impl EditorTest { async fn run_script(&mut self, script: EditorScript) { let rev_manager = self.editor.rev_manager(); let cache = rev_manager.revision_cache(); - let _memory_cache = cache.memory_cache(); - let _disk_cache = cache.disk_cache(); let doc_id = self.editor.doc_id.clone(); let _user_id = self.sdk.user_session.user_id().unwrap(); let ws_manager = self.sdk.ws_manager.clone(); diff --git a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs index 3c01cb3a22..8593fb1c90 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs @@ -1,6 +1,6 @@ use crate::{ core::document::Document, - entities::ws::{DocumentWSData, WsDocumentDataBuilder}, + entities::ws::{DocumentWSData, DocumentWSDataBuilder}, }; use lib_ot::{ core::OperationTransformable, @@ -59,9 +59,9 @@ impl RevisionSynchronizer { if server_base_rev_id == revision.base_rev_id || server_rev_id == revision.rev_id { // The rev is in the right order, just compose it. let _ = self.compose_revision(&revision)?; - user.recv(SyncResponse::Ack(WsDocumentDataBuilder::build_acked_message( + user.recv(SyncResponse::Ack(DocumentWSDataBuilder::build_ack_message( &revision.doc_id, - revision.rev_id, + &revision.rev_id.to_string(), ))); let rev_id = revision.rev_id; let doc_id = self.doc_id.clone(); @@ -78,14 +78,14 @@ impl RevisionSynchronizer { start: server_rev_id, end: revision.rev_id, }; - let msg = WsDocumentDataBuilder::build_push_pull_message(&self.doc_id, range); + let msg = DocumentWSDataBuilder::build_push_pull_message(&self.doc_id, range); user.recv(SyncResponse::Pull(msg)); } }, Ordering::Equal => { // Do nothing log::warn!("Applied revision rev_id is the same as cur_rev_id"); - let data = WsDocumentDataBuilder::build_acked_message(&revision.doc_id, revision.rev_id); + let data = DocumentWSDataBuilder::build_ack_message(&revision.doc_id, &revision.rev_id.to_string()); user.recv(SyncResponse::Ack(data)); }, Ordering::Greater => { @@ -93,7 +93,7 @@ impl RevisionSynchronizer { // send the prime delta to the client. Client should compose the this prime // delta. let cli_revision = self.transform_revision(&revision)?; - let data = WsDocumentDataBuilder::build_push_rev_message(&self.doc_id, cli_revision); + let data = DocumentWSDataBuilder::build_push_rev_message(&self.doc_id, cli_revision); user.recv(SyncResponse::Push(data)); }, } diff --git a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs index 818a07cbf3..edbfd11ba1 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs @@ -1,13 +1,14 @@ use crate::errors::CollaborateError; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; -use lib_ot::revision::{RevId, Revision, RevisionRange}; +use lib_infra::uuid; +use lib_ot::revision::{Revision, RevisionRange}; use std::convert::{TryFrom, TryInto}; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] pub enum DocumentWSDataType { // The frontend receives the Acked means the backend has accepted the revision - Acked = 0, + Ack = 0, // The frontend receives the PushRev event means the backend is pushing the new revision to frontend PushRev = 1, // The fronted receives the PullRev event means the backend try to pull the revision from frontend @@ -25,7 +26,7 @@ impl DocumentWSDataType { } impl std::default::Default for DocumentWSDataType { - fn default() -> Self { DocumentWSDataType::Acked } + fn default() -> Self { DocumentWSDataType::Ack } } #[derive(ProtoBuf, Default, Debug, Clone)] @@ -39,8 +40,8 @@ pub struct DocumentWSData { #[pb(index = 3)] pub data: Vec, - #[pb(index = 4, one_of)] - pub id: Option, + #[pb(index = 4)] + pub id: String, } impl std::convert::From for DocumentWSData { @@ -52,13 +53,13 @@ impl std::convert::From for DocumentWSData { doc_id, ty: DocumentWSDataType::PushRev, data: bytes.to_vec(), - id: Some(rev_id), + id: rev_id.to_string(), } } } -pub struct WsDocumentDataBuilder(); -impl WsDocumentDataBuilder { +pub struct DocumentWSDataBuilder(); +impl DocumentWSDataBuilder { // DocumentWSDataType::PushRev -> Revision pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> DocumentWSData { let rev_id = revision.rev_id; @@ -67,7 +68,7 @@ impl WsDocumentDataBuilder { doc_id: doc_id.to_string(), ty: DocumentWSDataType::PushRev, data: bytes.to_vec(), - id: Some(rev_id), + id: rev_id.to_string(), } } @@ -78,39 +79,35 @@ impl WsDocumentDataBuilder { doc_id: doc_id.to_string(), ty: DocumentWSDataType::PullRev, data: bytes.to_vec(), - id: None, + id: uuid(), } } - // DocumentWSDataType::Acked -> RevId - pub fn build_acked_message(doc_id: &str, rev_id: i64) -> DocumentWSData { - let cloned_rev_id = rev_id; - let rev_id: RevId = rev_id.into(); - let bytes: Bytes = rev_id.try_into().unwrap(); - + // DocumentWSDataType::Ack -> RevId + pub fn build_ack_message(doc_id: &str, id: &str) -> DocumentWSData { DocumentWSData { doc_id: doc_id.to_string(), - ty: DocumentWSDataType::Acked, - data: bytes.to_vec(), - id: Some(cloned_rev_id), + ty: DocumentWSDataType::Ack, + data: vec![], + id: id.to_string(), } } // DocumentWSDataType::UserConnect -> DocumentConnected - pub fn build_document_conn_message(doc_id: &str, document_conn: DocumentConnected) -> DocumentWSData { - let rev_id = document_conn.rev_id; - let bytes: Bytes = document_conn.try_into().unwrap(); + pub fn build_new_document_user_message(doc_id: &str, new_document_user: NewDocumentUser) -> DocumentWSData { + let id = new_document_user.user_id.clone(); + let bytes: Bytes = new_document_user.try_into().unwrap(); DocumentWSData { doc_id: doc_id.to_string(), ty: DocumentWSDataType::UserConnect, data: bytes.to_vec(), - id: Some(rev_id), + id, } } } #[derive(ProtoBuf, Default, Debug, Clone)] -pub struct DocumentConnected { +pub struct NewDocumentUser { #[pb(index = 1)] pub user_id: String, diff --git a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs index 703c6b305f..878b20fb0e 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs +++ b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs @@ -29,8 +29,7 @@ pub struct DocumentWSData { pub doc_id: ::std::string::String, pub ty: DocumentWSDataType, pub data: ::std::vec::Vec, - // message oneof groups - pub one_of_id: ::std::option::Option, + pub id: ::std::string::String, // special fields pub unknown_fields: ::protobuf::UnknownFields, pub cached_size: ::protobuf::CachedSize, @@ -42,11 +41,6 @@ impl<'a> ::std::default::Default for &'a DocumentWSData { } } -#[derive(Clone,PartialEq,Debug)] -pub enum DocumentWSData_oneof_one_of_id { - id(i64), -} - impl DocumentWSData { pub fn new() -> DocumentWSData { ::std::default::Default::default() @@ -85,7 +79,7 @@ impl DocumentWSData { self.ty } pub fn clear_ty(&mut self) { - self.ty = DocumentWSDataType::Acked; + self.ty = DocumentWSDataType::Ack; } // Param is passed by value, moved @@ -119,29 +113,30 @@ impl DocumentWSData { ::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) } - // int64 id = 4; + // string id = 4; - pub fn get_id(&self) -> i64 { - match self.one_of_id { - ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(v)) => v, - _ => 0, - } + pub fn get_id(&self) -> &str { + &self.id } pub fn clear_id(&mut self) { - self.one_of_id = ::std::option::Option::None; - } - - pub fn has_id(&self) -> bool { - match self.one_of_id { - ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(..)) => true, - _ => false, - } + self.id.clear(); } // Param is passed by value, moved - pub fn set_id(&mut self, v: i64) { - self.one_of_id = ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(v)) + pub fn set_id(&mut self, v: ::std::string::String) { + self.id = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_id(&mut self) -> &mut ::std::string::String { + &mut self.id + } + + // Take field + pub fn take_id(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.id, ::std::string::String::new()) } } @@ -164,10 +159,7 @@ impl ::protobuf::Message for DocumentWSData { ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; }, 4 => { - if wire_type != ::protobuf::wire_format::WireTypeVarint { - return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); - } - self.one_of_id = ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(is.read_int64()?)); + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?; }, _ => { ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; @@ -184,18 +176,14 @@ impl ::protobuf::Message for DocumentWSData { if !self.doc_id.is_empty() { my_size += ::protobuf::rt::string_size(1, &self.doc_id); } - if self.ty != DocumentWSDataType::Acked { + if self.ty != DocumentWSDataType::Ack { my_size += ::protobuf::rt::enum_size(2, self.ty); } if !self.data.is_empty() { my_size += ::protobuf::rt::bytes_size(3, &self.data); } - if let ::std::option::Option::Some(ref v) = self.one_of_id { - match v { - &DocumentWSData_oneof_one_of_id::id(v) => { - my_size += ::protobuf::rt::value_size(4, v, ::protobuf::wire_format::WireTypeVarint); - }, - }; + if !self.id.is_empty() { + my_size += ::protobuf::rt::string_size(4, &self.id); } my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); self.cached_size.set(my_size); @@ -206,18 +194,14 @@ impl ::protobuf::Message for DocumentWSData { if !self.doc_id.is_empty() { os.write_string(1, &self.doc_id)?; } - if self.ty != DocumentWSDataType::Acked { + if self.ty != DocumentWSDataType::Ack { os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?; } if !self.data.is_empty() { os.write_bytes(3, &self.data)?; } - if let ::std::option::Option::Some(ref v) = self.one_of_id { - match v { - &DocumentWSData_oneof_one_of_id::id(v) => { - os.write_int64(4, v)?; - }, - }; + if !self.id.is_empty() { + os.write_string(4, &self.id)?; } os.write_unknown_fields(self.get_unknown_fields())?; ::std::result::Result::Ok(()) @@ -272,10 +256,10 @@ impl ::protobuf::Message for DocumentWSData { |m: &DocumentWSData| { &m.data }, |m: &mut DocumentWSData| { &mut m.data }, )); - fields.push(::protobuf::reflect::accessor::make_singular_i64_accessor::<_>( + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "id", - DocumentWSData::has_id, - DocumentWSData::get_id, + |m: &DocumentWSData| { &m.id }, + |m: &mut DocumentWSData| { &mut m.id }, )); ::protobuf::reflect::MessageDescriptor::new_pb_name::( "DocumentWSData", @@ -294,9 +278,9 @@ impl ::protobuf::Message for DocumentWSData { impl ::protobuf::Clear for DocumentWSData { fn clear(&mut self) { self.doc_id.clear(); - self.ty = DocumentWSDataType::Acked; + self.ty = DocumentWSDataType::Ack; self.data.clear(); - self.one_of_id = ::std::option::Option::None; + self.id.clear(); self.unknown_fields.clear(); } } @@ -314,7 +298,7 @@ impl ::protobuf::reflect::ProtobufValue for DocumentWSData { } #[derive(PartialEq,Clone,Default)] -pub struct DocumentConnected { +pub struct NewDocumentUser { // message fields pub user_id: ::std::string::String, pub doc_id: ::std::string::String, @@ -324,14 +308,14 @@ pub struct DocumentConnected { pub cached_size: ::protobuf::CachedSize, } -impl<'a> ::std::default::Default for &'a DocumentConnected { - fn default() -> &'a DocumentConnected { - ::default_instance() +impl<'a> ::std::default::Default for &'a NewDocumentUser { + fn default() -> &'a NewDocumentUser { + ::default_instance() } } -impl DocumentConnected { - pub fn new() -> DocumentConnected { +impl NewDocumentUser { + pub fn new() -> NewDocumentUser { ::std::default::Default::default() } @@ -403,7 +387,7 @@ impl DocumentConnected { } } -impl ::protobuf::Message for DocumentConnected { +impl ::protobuf::Message for NewDocumentUser { fn is_initialized(&self) -> bool { true } @@ -491,8 +475,8 @@ impl ::protobuf::Message for DocumentConnected { Self::descriptor_static() } - fn new() -> DocumentConnected { - DocumentConnected::new() + fn new() -> NewDocumentUser { + NewDocumentUser::new() } fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { @@ -501,34 +485,34 @@ impl ::protobuf::Message for DocumentConnected { let mut fields = ::std::vec::Vec::new(); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "user_id", - |m: &DocumentConnected| { &m.user_id }, - |m: &mut DocumentConnected| { &mut m.user_id }, + |m: &NewDocumentUser| { &m.user_id }, + |m: &mut NewDocumentUser| { &mut m.user_id }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "doc_id", - |m: &DocumentConnected| { &m.doc_id }, - |m: &mut DocumentConnected| { &mut m.doc_id }, + |m: &NewDocumentUser| { &m.doc_id }, + |m: &mut NewDocumentUser| { &mut m.doc_id }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( "rev_id", - |m: &DocumentConnected| { &m.rev_id }, - |m: &mut DocumentConnected| { &mut m.rev_id }, + |m: &NewDocumentUser| { &m.rev_id }, + |m: &mut NewDocumentUser| { &mut m.rev_id }, )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "DocumentConnected", + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "NewDocumentUser", fields, file_descriptor_proto() ) }) } - fn default_instance() -> &'static DocumentConnected { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(DocumentConnected::new) + fn default_instance() -> &'static NewDocumentUser { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(NewDocumentUser::new) } } -impl ::protobuf::Clear for DocumentConnected { +impl ::protobuf::Clear for NewDocumentUser { fn clear(&mut self) { self.user_id.clear(); self.doc_id.clear(); @@ -537,13 +521,13 @@ impl ::protobuf::Clear for DocumentConnected { } } -impl ::std::fmt::Debug for DocumentConnected { +impl ::std::fmt::Debug for NewDocumentUser { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { ::protobuf::text_format::fmt(self, f) } } -impl ::protobuf::reflect::ProtobufValue for DocumentConnected { +impl ::protobuf::reflect::ProtobufValue for NewDocumentUser { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Message(self) } @@ -551,7 +535,7 @@ impl ::protobuf::reflect::ProtobufValue for DocumentConnected { #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum DocumentWSDataType { - Acked = 0, + Ack = 0, PushRev = 1, PullRev = 2, UserConnect = 3, @@ -564,7 +548,7 @@ impl ::protobuf::ProtobufEnum for DocumentWSDataType { fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(DocumentWSDataType::Acked), + 0 => ::std::option::Option::Some(DocumentWSDataType::Ack), 1 => ::std::option::Option::Some(DocumentWSDataType::PushRev), 2 => ::std::option::Option::Some(DocumentWSDataType::PullRev), 3 => ::std::option::Option::Some(DocumentWSDataType::UserConnect), @@ -574,7 +558,7 @@ impl ::protobuf::ProtobufEnum for DocumentWSDataType { fn values() -> &'static [Self] { static values: &'static [DocumentWSDataType] = &[ - DocumentWSDataType::Acked, + DocumentWSDataType::Ack, DocumentWSDataType::PushRev, DocumentWSDataType::PullRev, DocumentWSDataType::UserConnect, @@ -595,7 +579,7 @@ impl ::std::marker::Copy for DocumentWSDataType { impl ::std::default::Default for DocumentWSDataType { fn default() -> Self { - DocumentWSDataType::Acked + DocumentWSDataType::Ack } } @@ -606,47 +590,46 @@ impl ::protobuf::reflect::ProtobufValue for DocumentWSDataType { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x08ws.proto\"\x7f\n\x0eDocumentWSData\x12\x15\n\x06doc_id\x18\x01\x20\ + \n\x08ws.proto\"p\n\x0eDocumentWSData\x12\x15\n\x06doc_id\x18\x01\x20\ \x01(\tR\x05docId\x12#\n\x02ty\x18\x02\x20\x01(\x0e2\x13.DocumentWSDataT\ - ypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\x12\x10\n\x02\ - id\x18\x04\x20\x01(\x03H\0R\x02idB\x0b\n\tone_of_id\"Z\n\x11DocumentConn\ - ected\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\n\x06do\ - c_id\x18\x02\x20\x01(\tR\x05docId\x12\x15\n\x06rev_id\x18\x03\x20\x01(\ - \x03R\x05revId*J\n\x12DocumentWSDataType\x12\t\n\x05Acked\x10\0\x12\x0b\ - \n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\x12\x0f\n\x0bUserConn\ - ect\x10\x03J\x9a\x05\n\x06\x12\x04\0\0\x12\x01\n\x08\n\x01\x0c\x12\x03\0\ - \0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x07\x01\n\n\n\x03\x04\0\x01\x12\x03\ - \x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\ - \0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\ - \x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\ - \0\x02\x01\x12\x03\x04\x04\x1e\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\ - \x04\x16\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x17\x19\n\x0c\n\x05\ - \x04\0\x02\x01\x03\x12\x03\x04\x1c\x1d\n\x0b\n\x04\x04\0\x02\x02\x12\x03\ - \x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\ - \x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\ - \x03\x05\x11\x12\n\x0b\n\x04\x04\0\x08\0\x12\x03\x06\x04%\n\x0c\n\x05\ - \x04\0\x08\0\x01\x12\x03\x06\n\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\ - \x16#\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x16\x1b\n\x0c\n\x05\x04\ - \0\x02\x03\x01\x12\x03\x06\x1c\x1e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ - \x06!\"\n\n\n\x02\x04\x01\x12\x04\x08\0\x0c\x01\n\n\n\x03\x04\x01\x01\ - \x12\x03\x08\x08\x19\n\x0b\n\x04\x04\x01\x02\0\x12\x03\t\x04\x17\n\x0c\n\ - \x05\x04\x01\x02\0\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\ - \x03\t\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\t\x15\x16\n\x0b\n\ - \x04\x04\x01\x02\x01\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\x01\x05\ - \x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\n\x0b\x11\n\x0c\ - \n\x05\x04\x01\x02\x01\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\x01\x02\x02\ - \x12\x03\x0b\x04\x15\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0b\x04\t\n\ - \x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x0b\n\x10\n\x0c\n\x05\x04\x01\x02\ - \x02\x03\x12\x03\x0b\x13\x14\n\n\n\x02\x05\0\x12\x04\r\0\x12\x01\n\n\n\ - \x03\x05\0\x01\x12\x03\r\x05\x17\n\x0b\n\x04\x05\0\x02\0\x12\x03\x0e\x04\ - \x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x0e\x04\t\n\x0c\n\x05\x05\0\x02\ - \0\x02\x12\x03\x0e\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x0f\x04\x10\n\ - \x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x0f\x04\x0b\n\x0c\n\x05\x05\0\x02\ - \x01\x02\x12\x03\x0f\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x10\x04\ - \x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x10\x04\x0b\n\x0c\n\x05\x05\0\ - \x02\x02\x02\x12\x03\x10\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x11\ - \x04\x14\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x11\x04\x0f\n\x0c\n\x05\ - \x05\0\x02\x03\x02\x12\x03\x11\x12\x13b\x06proto3\ + ypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\x12\x0e\n\x02\ + id\x18\x04\x20\x01(\tR\x02id\"X\n\x0fNewDocumentUser\x12\x17\n\x07user_i\ + d\x18\x01\x20\x01(\tR\x06userId\x12\x15\n\x06doc_id\x18\x02\x20\x01(\tR\ + \x05docId\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId*H\n\x12Docu\ + mentWSDataType\x12\x07\n\x03Ack\x10\0\x12\x0b\n\x07PushRev\x10\x01\x12\ + \x0b\n\x07PullRev\x10\x02\x12\x0f\n\x0bUserConnect\x10\x03J\xff\x04\n\ + \x06\x12\x04\0\0\x12\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\ + \x12\x04\x02\0\x07\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\ + \x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\ + \x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\n\x05\ + \x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\ + \x04\x04\x1e\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x16\n\x0c\n\ + \x05\x04\0\x02\x01\x01\x12\x03\x04\x17\x19\n\x0c\n\x05\x04\0\x02\x01\x03\ + \x12\x03\x04\x1c\x1d\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\ + \n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\ + \x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\ + \x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x12\n\x0c\n\x05\x04\0\x02\x03\ + \x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\r\n\ + \x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x10\x11\n\n\n\x02\x04\x01\x12\ + \x04\x08\0\x0c\x01\n\n\n\x03\x04\x01\x01\x12\x03\x08\x08\x17\n\x0b\n\x04\ + \x04\x01\x02\0\x12\x03\t\x04\x17\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\t\ + \x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\t\x0b\x12\n\x0c\n\x05\x04\ + \x01\x02\0\x03\x12\x03\t\x15\x16\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\n\ + \x04\x16\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\ + \x01\x02\x01\x01\x12\x03\n\x0b\x11\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\ + \x03\n\x14\x15\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0b\x04\x15\n\x0c\n\ + \x05\x04\x01\x02\x02\x05\x12\x03\x0b\x04\t\n\x0c\n\x05\x04\x01\x02\x02\ + \x01\x12\x03\x0b\n\x10\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\x0b\x13\ + \x14\n\n\n\x02\x05\0\x12\x04\r\0\x12\x01\n\n\n\x03\x05\0\x01\x12\x03\r\ + \x05\x17\n\x0b\n\x04\x05\0\x02\0\x12\x03\x0e\x04\x0c\n\x0c\n\x05\x05\0\ + \x02\0\x01\x12\x03\x0e\x04\x07\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x0e\n\ + \x0b\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x0f\x04\x10\n\x0c\n\x05\x05\0\x02\ + \x01\x01\x12\x03\x0f\x04\x0b\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x0f\ + \x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x10\x04\x10\n\x0c\n\x05\x05\0\ + \x02\x02\x01\x12\x03\x10\x04\x0b\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\ + \x10\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x11\x04\x14\n\x0c\n\x05\ + \x05\0\x02\x03\x01\x12\x03\x11\x04\x0f\n\x0c\n\x05\x05\0\x02\x03\x02\x12\ + \x03\x11\x12\x13b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto index 9d0517d576..9101964b8a 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto +++ b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto @@ -4,15 +4,15 @@ message DocumentWSData { string doc_id = 1; DocumentWSDataType ty = 2; bytes data = 3; - oneof one_of_id { int64 id = 4; }; + string id = 4; } -message DocumentConnected { +message NewDocumentUser { string user_id = 1; string doc_id = 2; int64 rev_id = 3; } enum DocumentWSDataType { - Acked = 0; + Ack = 0; PushRev = 1; PullRev = 2; UserConnect = 3; diff --git a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs index 21677dfc6d..f37c5353c4 100644 --- a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -61,7 +61,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "NewDocUser" | "DocIdentifier" | "DocumentWSData" - | "DocumentConnected" + | "NewDocumentUser" | "WSError" | "WSMessage" | "Revision" diff --git a/shared-lib/lib-ws/src/msg.rs b/shared-lib/lib-ws/src/msg.rs index dc61309e26..bc5f53f697 100644 --- a/shared-lib/lib-ws/src/msg.rs +++ b/shared-lib/lib-ws/src/msg.rs @@ -3,7 +3,6 @@ use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use std::convert::TryInto; use tokio_tungstenite::tungstenite::Message as TokioMessage; -// Opti: using four bytes of the data to represent the source #[derive(ProtoBuf, Debug, Clone, Default)] pub struct WSMessage { #[pb(index = 1)]