From 3e3e10b3163fbd928284d5a956f1ea2e3a3fd57e Mon Sep 17 00:00:00 2001 From: appflowy Date: Thu, 23 Sep 2021 17:50:28 +0800 Subject: [PATCH] receive revision from client --- .../lib/protobuf/flowy-ws/msg.pb.dart | 12 +- .../lib/protobuf/flowy-ws/msg.pbenum.dart | 17 +++ .../lib/protobuf/flowy-ws/msg.pbjson.dart | 14 ++- backend/src/application.rs | 5 +- backend/src/service/doc/mod.rs | 1 + backend/src/service/doc/ws_handler.rs | 17 +++ backend/src/service/mod.rs | 18 +++ backend/src/service/ws/biz_handler.rs | 34 ++++++ backend/src/service/ws/mod.rs | 2 + backend/src/service/ws/router.rs | 5 +- backend/src/service/ws/ws_client.rs | 115 ++++++++++-------- .../src/protobuf/model/revision.rs | 28 ++--- .../src/protobuf/proto/revision.proto | 1 + rust-lib/flowy-document/src/services/cache.rs | 14 +-- .../src/services/ws/ws_manager.rs | 4 - .../src/deps_resolve/document_deps.rs | 8 +- rust-lib/flowy-ws/Cargo.toml | 2 +- rust-lib/flowy-ws/src/msg.rs | 37 ++++-- rust-lib/flowy-ws/src/protobuf/model/msg.rs | 108 ++++++++++------ .../flowy-ws/src/protobuf/proto/msg.proto | 5 +- rust-lib/flowy-ws/src/ws.rs | 22 ++-- 21 files changed, 320 insertions(+), 149 deletions(-) create mode 100644 backend/src/service/doc/ws_handler.rs create mode 100644 backend/src/service/ws/biz_handler.rs diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart index 1067674ec2..1b198075a5 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pb.dart @@ -9,16 +9,20 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; +import 'msg.pbenum.dart'; + +export 'msg.pbenum.dart'; + class WsMessage extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsMessage', createEmptyInstance: create) - ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'source') + ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'source', $pb.PbFieldType.OE, defaultOrMaker: WsSource.Doc, valueOf: WsSource.valueOf, enumValues: WsSource.values) ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..hasRequiredFields = false ; WsMessage._() : super(); factory WsMessage({ - $core.String? source, + WsSource? source, $core.List<$core.int>? data, }) { final _result = create(); @@ -52,9 +56,9 @@ class WsMessage extends $pb.GeneratedMessage { static WsMessage? _defaultInstance; @$pb.TagNumber(1) - $core.String get source => $_getSZ(0); + WsSource get source => $_getN(0); @$pb.TagNumber(1) - set source($core.String v) { $_setString(0, v); } + set source(WsSource v) { setField(1, v); } @$pb.TagNumber(1) $core.bool hasSource() => $_has(0); @$pb.TagNumber(1) diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart index 59dcf67a9f..bd39d1cbd7 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbenum.dart @@ -5,3 +5,20 @@ // @dart = 2.12 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields +// ignore_for_file: UNDEFINED_SHOWN_NAME +import 'dart:core' as $core; +import 'package:protobuf/protobuf.dart' as $pb; + +class WsSource extends $pb.ProtobufEnum { + static const WsSource Doc = WsSource._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc'); + + static const $core.List values = [ + Doc, + ]; + + static final $core.Map<$core.int, WsSource> _byValue = $pb.ProtobufEnum.initByValue(values); + static WsSource? valueOf($core.int value) => _byValue[value]; + + const WsSource._($core.int v, $core.String n) : super(v, n); +} + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart index 8c19e9c91c..6f0c3e285e 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/msg.pbjson.dart @@ -8,14 +8,24 @@ import 'dart:core' as $core; import 'dart:convert' as $convert; import 'dart:typed_data' as $typed_data; +@$core.Deprecated('Use wsSourceDescriptor instead') +const WsSource$json = const { + '1': 'WsSource', + '2': const [ + const {'1': 'Doc', '2': 0}, + ], +}; + +/// Descriptor for `WsSource`. Decode as a `google.protobuf.EnumDescriptorProto`. +final $typed_data.Uint8List wsSourceDescriptor = $convert.base64Decode('CghXc1NvdXJjZRIHCgNEb2MQAA=='); @$core.Deprecated('Use wsMessageDescriptor instead') const WsMessage$json = const { '1': 'WsMessage', '2': const [ - const {'1': 'source', '3': 1, '4': 1, '5': 9, '10': 'source'}, + const {'1': 'source', '3': 1, '4': 1, '5': 14, '6': '.WsSource', '10': 'source'}, const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'}, ], }; /// Descriptor for `WsMessage`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List wsMessageDescriptor = $convert.base64Decode('CglXc01lc3NhZ2USFgoGc291cmNlGAEgASgJUgZzb3VyY2USEgoEZGF0YRgCIAEoDFIEZGF0YQ=='); +final $typed_data.Uint8List wsMessageDescriptor = $convert.base64Decode('CglXc01lc3NhZ2USIQoGc291cmNlGAEgASgOMgkuV3NTb3VyY2VSBnNvdXJjZRISCgRkYXRhGAIgASgMUgRkYXRh'); diff --git a/backend/src/application.rs b/backend/src/application.rs index 5efbcea88d..ad1ea4e15c 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -16,6 +16,7 @@ use crate::{ service::{ app::router as app, doc::router as doc, + make_ws_biz_handlers, user::router as user, view::router as view, workspace::router as workspace, @@ -23,6 +24,7 @@ use crate::{ ws::WSServer, }, }; +use flowy_ws::WsSource; pub struct Application { port: u16, @@ -53,7 +55,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result Result Self { Self {} } +} + +impl WsBizHandler for DocWsBizHandler { + fn receive_data(&self, data: Bytes) { + let revision: Revision = parse_from_bytes(&data).unwrap(); + log::warn!("{:?}", revision); + } +} diff --git a/backend/src/service/mod.rs b/backend/src/service/mod.rs index 3a52636333..2d45c748e6 100644 --- a/backend/src/service/mod.rs +++ b/backend/src/service/mod.rs @@ -1,3 +1,8 @@ +use crate::service::{doc::ws_handler::DocWsBizHandler, ws::WsBizHandlers}; +use flowy_ws::WsSource; +use std::sync::Arc; +use tokio::sync::RwLock; + pub mod app; pub mod doc; pub(crate) mod log; @@ -6,3 +11,16 @@ pub(crate) mod util; pub mod view; pub mod workspace; pub mod ws; + +pub fn make_ws_biz_handlers() -> WsBizHandlers { + let mut ws_biz_handlers = WsBizHandlers::new(); + + // doc + let doc_biz_handler = DocWsBizHandler::new(); + ws_biz_handlers.register(WsSource::Doc, wrap(doc_biz_handler)); + + // + ws_biz_handlers +} + +fn wrap(val: T) -> Arc> { Arc::new(RwLock::new(val)) } diff --git a/backend/src/service/ws/biz_handler.rs b/backend/src/service/ws/biz_handler.rs new file mode 100644 index 0000000000..1252d2f2c7 --- /dev/null +++ b/backend/src/service/ws/biz_handler.rs @@ -0,0 +1,34 @@ +use bytes::Bytes; +use dashmap::{mapref::one::Ref, DashMap}; +use flowy_ws::WsSource; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub trait WsBizHandler: Send + Sync { + fn receive_data(&self, data: Bytes); +} + +pub type BizHandler = Arc>; + +pub struct WsBizHandlers { + inner: DashMap, +} + +impl WsBizHandlers { + pub fn new() -> Self { + Self { + inner: DashMap::new(), + } + } + + pub fn register(&self, source: WsSource, handler: BizHandler) { + self.inner.insert(source, handler); + } + + pub fn get(&self, source: &WsSource) -> Option { + match self.inner.get(source) { + None => None, + Some(handler) => Some(handler.clone()), + } + } +} diff --git a/backend/src/service/ws/mod.rs b/backend/src/service/ws/mod.rs index da27353f58..f1e8684aa4 100644 --- a/backend/src/service/ws/mod.rs +++ b/backend/src/service/ws/mod.rs @@ -1,7 +1,9 @@ +pub use biz_handler::*; pub use entities::message::*; pub use ws_client::*; pub use ws_server::*; +mod biz_handler; pub(crate) mod entities; pub mod router; mod ws_client; diff --git a/backend/src/service/ws/router.rs b/backend/src/service/ws/router.rs index 6a59952b38..4f3522927a 100644 --- a/backend/src/service/ws/router.rs +++ b/backend/src/service/ws/router.rs @@ -1,4 +1,4 @@ -use crate::service::ws::{WSClient, WSServer}; +use crate::service::ws::{WSClient, WSServer, WsBizHandlers}; use actix::Addr; use crate::service::user::LoggedUser; @@ -17,10 +17,11 @@ pub async fn establish_ws_connection( payload: Payload, token: Path, server: Data>, + biz_handlers: Data, ) -> Result { match LoggedUser::from_token(token.clone()) { Ok(user) => { - let client = WSClient::new(&user.user_id, server.get_ref().clone()); + let client = WSClient::new(&user.user_id, server.get_ref().clone(), biz_handlers); let result = ws::start(client, &request, payload); match result { Ok(response) => Ok(response.into()), diff --git a/backend/src/service/ws/ws_client.rs b/backend/src/service/ws/ws_client.rs index da2389209f..d63f87569f 100644 --- a/backend/src/service/ws/ws_client.rs +++ b/backend/src/service/ws/ws_client.rs @@ -5,41 +5,36 @@ use crate::{ ClientMessage, MessageData, WSServer, + WsBizHandler, + WsBizHandlers, }, }; -use actix::*; +use actix::{fut::wrap_future, *}; +use actix_web::web::Data; use actix_web_actors::{ws, ws::Message::Text}; -use std::time::Instant; +use bytes::Bytes; +use flowy_ws::{WsMessage, WsSource}; +use std::{convert::TryFrom, pin::Pin, time::Instant}; +use tokio::sync::RwLock; -// Frontend │ Backend -// -// │ -// ┌──────────┐ WsMessage ┌───────────┐ ClientMessage ┌──────────┐ -// │ user 1 │─────────┼────▶│ws_client_1│──────────────────▶│ws_server │ -// └──────────┘ └───────────┘ └──────────┘ -// │ │ -// WsMessage ▼ -// ┌──────────┐ │ ┌───────────┐ ClientMessage Group -// │ user 2 │◀──────────────│ws_client_2│◀───────┐ ┌───────────────┐ -// └──────────┘ │ └───────────┘ │ │ ws_user_1 │ -// │ │ │ -// │ └────────│ ws_user_2 │ -// ┌──────────┐ ┌───────────┐ │ │ -// │ user 3 │─────────┼────▶│ws_client_3│ └───────────────┘ -// └──────────┘ └───────────┘ -// │ pub struct WSClient { session_id: SessionId, server: Addr, + biz_handlers: Data, hb: Instant, } impl WSClient { - pub fn new>(session_id: T, server: Addr) -> Self { + pub fn new>( + session_id: T, + server: Addr, + biz_handlers: Data, + ) -> Self { Self { session_id: session_id.into(), - hb: Instant::now(), server, + biz_handlers, + hb: Instant::now(), } } @@ -62,36 +57,16 @@ impl WSClient { } } -impl Actor for WSClient { - type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - self.hb(ctx); - let socket = ctx.address().recipient(); - let connect = Connect { - socket, - sid: self.session_id.clone(), - }; - self.server - .send(connect) - .into_actor(self) - .then(|res, _client, _ctx| { - match res { - Ok(Ok(_)) => log::trace!("Send connect message to server success"), - Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e), - Err(e) => log::error!("Send connect message to server failed: {:?}", e), - } - fut::ready(()) - }) - .wait(ctx); - } - - fn stopping(&mut self, _: &mut Self::Context) -> Running { - self.server.do_send(Disconnect { - sid: self.session_id.clone(), - }); - - Running::Stop +async fn handle_binary_message(biz_handlers: Data, bytes: Bytes) { + let message: WsMessage = WsMessage::try_from(bytes).unwrap(); + match biz_handlers.get(&message.source) { + None => { + log::error!("Can't find the handler for {:?}", message.source); + }, + Some(handler) => handler + .write() + .await + .receive_data(Bytes::from(message.data)), } } @@ -106,9 +81,10 @@ impl StreamHandler> for WSClient { // log::debug!("Receive {} pong {:?}", &self.session_id, &msg); self.hb = Instant::now(); }, - Ok(ws::Message::Binary(bin)) => { + Ok(ws::Message::Binary(bytes)) => { log::debug!(" Receive {} binary", &self.session_id); - self.send(MessageData::Binary(bin)); + let biz_handlers = self.biz_handlers.clone(); + ctx.spawn(wrap_future(handle_binary_message(biz_handlers, bytes))); }, Ok(Text(_)) => { log::warn!("Receive unexpected text message"); @@ -145,3 +121,36 @@ impl Handler for WSClient { } } } + +impl Actor for WSClient { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + self.hb(ctx); + let socket = ctx.address().recipient(); + let connect = Connect { + socket, + sid: self.session_id.clone(), + }; + self.server + .send(connect) + .into_actor(self) + .then(|res, _client, _ctx| { + match res { + Ok(Ok(_)) => log::trace!("Send connect message to server success"), + Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e), + Err(e) => log::error!("Send connect message to server failed: {:?}", e), + } + fut::ready(()) + }) + .wait(ctx); + } + + fn stopping(&mut self, _: &mut Self::Context) -> Running { + self.server.do_send(Disconnect { + sid: self.session_id.clone(), + }); + + Running::Stop + } +} diff --git a/rust-lib/flowy-document/src/protobuf/model/revision.rs b/rust-lib/flowy-document/src/protobuf/model/revision.rs index e6bbf22838..5e18f843dd 100644 --- a/rust-lib/flowy-document/src/protobuf/model/revision.rs +++ b/rust-lib/flowy-document/src/protobuf/model/revision.rs @@ -298,20 +298,20 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \n\x0erevision.proto\"i\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\x01\ \x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\ evId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\n\x03md5\ - \x18\x04\x20\x01(\tR\x03md5J\x86\x02\n\x06\x12\x04\0\0\x06\x01\n\x08\n\ - \x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x01\0\x06\x01\n\n\n\x03\ - \x04\0\x01\x12\x03\x01\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x02\x04\ - \x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x02\x04\t\n\x0c\n\x05\x04\0\x02\ - \0\x01\x12\x03\x02\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x02\x18\x19\ - \n\x0b\n\x04\x04\0\x02\x01\x12\x03\x03\x04\x15\n\x0c\n\x05\x04\0\x02\x01\ - \x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x03\n\x10\n\ - \x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x03\x13\x14\n\x0b\n\x04\x04\0\x02\ - \x02\x12\x03\x04\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x04\x04\t\ - \n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x04\n\x0f\n\x0c\n\x05\x04\0\x02\ - \x02\x03\x12\x03\x04\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x05\x04\ - \x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x05\x04\n\n\x0c\n\x05\x04\0\ - \x02\x03\x01\x12\x03\x05\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ - \x05\x11\x12b\x06proto3\ + \x18\x04\x20\x01(\tR\x03md5J\x86\x02\n\x06\x12\x04\0\0\x07\x01\n\x08\n\ + \x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x07\x01\n\n\n\x03\ + \x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\ + \x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\ + \0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\x19\ + \n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\ + \x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\ + \x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\ + \x02\x12\x03\x05\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\ + \n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\x02\ + \x02\x03\x12\x03\x05\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\ + \x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\ + \x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ + \x06\x11\x12b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/revision.proto b/rust-lib/flowy-document/src/protobuf/proto/revision.proto index 1b88b1bf0d..1c8434d5ef 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/revision.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/revision.proto @@ -1,4 +1,5 @@ syntax = "proto3"; + message Revision { int64 base_rev_id = 1; int64 rev_id = 2; diff --git a/rust-lib/flowy-document/src/services/cache.rs b/rust-lib/flowy-document/src/services/cache.rs index abf5abb612..8e7880f1c3 100644 --- a/rust-lib/flowy-document/src/services/cache.rs +++ b/rust-lib/flowy-document/src/services/cache.rs @@ -8,23 +8,23 @@ use crate::{ }; pub(crate) struct DocCache { - doc_map: DashMap>, + inner: DashMap>, } impl DocCache { - pub(crate) fn new() -> Self { Self { doc_map: DashMap::new() } } + pub(crate) fn new() -> Self { Self { inner: DashMap::new() } } pub(crate) fn set(&self, doc: Arc) { let doc_id = doc.id.clone(); - if self.doc_map.contains_key(&doc_id) { + if self.inner.contains_key(&doc_id) { log::warn!("Doc:{} already exists in cache", doc_id.as_ref()); } - self.doc_map.insert(doc.id.clone(), doc); + self.inner.insert(doc.id.clone(), doc); } pub(crate) fn is_opened(&self, doc_id: &str) -> bool { let doc_id: DocId = doc_id.into(); - self.doc_map.get(&doc_id).is_some() + self.inner.get(&doc_id).is_some() } pub(crate) fn get(&self, doc_id: &str) -> Result, DocError> { @@ -32,13 +32,13 @@ impl DocCache { return Err(doc_not_found()); } let doc_id: DocId = doc_id.into(); - let opened_doc = self.doc_map.get(&doc_id).unwrap(); + let opened_doc = self.inner.get(&doc_id).unwrap(); Ok(opened_doc.clone()) } pub(crate) fn remove(&self, id: &str) { let doc_id: DocId = id.into(); - self.doc_map.remove(&doc_id); + self.inner.remove(&doc_id); } } diff --git a/rust-lib/flowy-document/src/services/ws/ws_manager.rs b/rust-lib/flowy-document/src/services/ws/ws_manager.rs index 1af17bb1fe..c25a3af8f1 100644 --- a/rust-lib/flowy-document/src/services/ws/ws_manager.rs +++ b/rust-lib/flowy-document/src/services/ws/ws_manager.rs @@ -7,10 +7,6 @@ pub trait WsSender: Send + Sync { fn send_data(&self, data: Bytes) -> Result<(), DocError>; } -lazy_static! { - pub static ref WS_ID: String = "Document".to_string(); -} - pub struct WsManager { pub(crate) sender: Arc, doc_handlers: HashMap>, diff --git a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index ecc97972fe..2e88794d24 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -2,11 +2,11 @@ use bytes::Bytes; use flowy_document::{ errors::DocError, module::DocumentUser, - prelude::{WsManager, WsSender, WS_ID}, + prelude::{WsManager, WsSender}, }; use flowy_user::{errors::ErrorCode, services::user::UserSession}; -use flowy_ws::{WsMessage, WsMessageHandler}; +use flowy_ws::{WsMessage, WsMessageHandler, WsSource}; use parking_lot::RwLock; use std::{path::Path, sync::Arc}; @@ -73,7 +73,7 @@ struct WsSenderImpl { impl WsSender for WsSenderImpl { fn send_data(&self, data: Bytes) -> Result<(), DocError> { let msg = WsMessage { - source: WS_ID.clone(), + source: WsSource::Doc, data: data.to_vec(), }; let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?; @@ -86,7 +86,7 @@ struct WsDocumentResolver { } impl WsMessageHandler for WsDocumentResolver { - fn source(&self) -> String { WS_ID.clone() } + fn source(&self) -> WsSource { WsSource::Doc } fn receive_message(&self, msg: WsMessage) { let data = Bytes::from(msg.data); diff --git a/rust-lib/flowy-ws/Cargo.toml b/rust-lib/flowy-ws/Cargo.toml index 557ff047d3..c0731d3d9c 100644 --- a/rust-lib/flowy-ws/Cargo.toml +++ b/rust-lib/flowy-ws/Cargo.toml @@ -14,7 +14,7 @@ futures-util = "0.3.17" futures-channel = "0.3.17" tokio = {version = "1", features = ["full"]} futures = "0.3.17" -bytes = "0.5" +bytes = "1.0" pin-project = "1.0.0" futures-core = { version = "0.3", default-features = false } paste = "1" diff --git a/rust-lib/flowy-ws/src/msg.rs b/rust-lib/flowy-ws/src/msg.rs index d95455286c..50b2db0981 100644 --- a/rust-lib/flowy-ws/src/msg.rs +++ b/rust-lib/flowy-ws/src/msg.rs @@ -1,34 +1,51 @@ use bytes::Bytes; -use flowy_derive::ProtoBuf; +use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use std::convert::{TryFrom, TryInto}; -use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite::Message as TokioMessage; #[derive(ProtoBuf, Debug, Clone, Default)] pub struct WsMessage { #[pb(index = 1)] - pub source: String, + pub source: WsSource, #[pb(index = 2)] pub data: Vec, } -impl std::convert::Into for WsMessage { - fn into(self) -> Message { +#[derive(ProtoBuf_Enum, Debug, Clone, Eq, PartialEq, Hash)] +pub enum WsSource { + Doc = 0, +} + +impl std::default::Default for WsSource { + fn default() -> Self { WsSource::Doc } +} + +impl ToString for WsSource { + fn to_string(&self) -> String { + match self { + WsSource::Doc => "0".to_string(), + } + } +} + +impl std::convert::Into for WsMessage { + fn into(self) -> TokioMessage { let result: Result = self.try_into(); match result { - Ok(bytes) => Message::Binary(bytes.to_vec()), + Ok(bytes) => TokioMessage::Binary(bytes.to_vec()), Err(e) => { log::error!("WsMessage serialize error: {:?}", e); - Message::Binary(vec![]) + TokioMessage::Binary(vec![]) }, } } } -impl std::convert::From for WsMessage { - fn from(value: Message) -> Self { +impl std::convert::From for WsMessage { + fn from(value: TokioMessage) -> Self { match value { - Message::Binary(bytes) => WsMessage::try_from(Bytes::from(bytes)).unwrap(), + TokioMessage::Binary(bytes) => WsMessage::try_from(Bytes::from(bytes)).unwrap(), _ => { log::error!("WsMessage deserialize failed. Unsupported message"); WsMessage::default() diff --git a/rust-lib/flowy-ws/src/protobuf/model/msg.rs b/rust-lib/flowy-ws/src/protobuf/model/msg.rs index 8437ba0c20..153a955bab 100644 --- a/rust-lib/flowy-ws/src/protobuf/model/msg.rs +++ b/rust-lib/flowy-ws/src/protobuf/model/msg.rs @@ -26,7 +26,7 @@ #[derive(PartialEq,Clone,Default)] pub struct WsMessage { // message fields - pub source: ::std::string::String, + pub source: WsSource, pub data: ::std::vec::Vec, // special fields pub unknown_fields: ::protobuf::UnknownFields, @@ -44,32 +44,21 @@ impl WsMessage { ::std::default::Default::default() } - // string source = 1; + // .WsSource source = 1; - pub fn get_source(&self) -> &str { - &self.source + pub fn get_source(&self) -> WsSource { + self.source } pub fn clear_source(&mut self) { - self.source.clear(); + self.source = WsSource::Doc; } // Param is passed by value, moved - pub fn set_source(&mut self, v: ::std::string::String) { + pub fn set_source(&mut self, v: WsSource) { self.source = v; } - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_source(&mut self) -> &mut ::std::string::String { - &mut self.source - } - - // Take field - pub fn take_source(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.source, ::std::string::String::new()) - } - // bytes data = 2; @@ -107,7 +96,7 @@ impl ::protobuf::Message for WsMessage { let (field_number, wire_type) = is.read_tag_unpack()?; match field_number { 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.source)?; + ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.source, 1, &mut self.unknown_fields)? }, 2 => { ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; @@ -124,8 +113,8 @@ impl ::protobuf::Message for WsMessage { #[allow(unused_variables)] fn compute_size(&self) -> u32 { let mut my_size = 0; - if !self.source.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.source); + if self.source != WsSource::Doc { + my_size += ::protobuf::rt::enum_size(1, self.source); } if !self.data.is_empty() { my_size += ::protobuf::rt::bytes_size(2, &self.data); @@ -136,8 +125,8 @@ impl ::protobuf::Message for WsMessage { } fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.source.is_empty() { - os.write_string(1, &self.source)?; + if self.source != WsSource::Doc { + os.write_enum(1, ::protobuf::ProtobufEnum::value(&self.source))?; } if !self.data.is_empty() { os.write_bytes(2, &self.data)?; @@ -180,7 +169,7 @@ impl ::protobuf::Message for WsMessage { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( "source", |m: &WsMessage| { &m.source }, |m: &mut WsMessage| { &mut m.source }, @@ -206,7 +195,7 @@ impl ::protobuf::Message for WsMessage { impl ::protobuf::Clear for WsMessage { fn clear(&mut self) { - self.source.clear(); + self.source = WsSource::Doc; self.data.clear(); self.unknown_fields.clear(); } @@ -224,17 +213,68 @@ impl ::protobuf::reflect::ProtobufValue for WsMessage { } } +#[derive(Clone,PartialEq,Eq,Debug,Hash)] +pub enum WsSource { + Doc = 0, +} + +impl ::protobuf::ProtobufEnum for WsSource { + fn value(&self) -> i32 { + *self as i32 + } + + fn from_i32(value: i32) -> ::std::option::Option { + match value { + 0 => ::std::option::Option::Some(WsSource::Doc), + _ => ::std::option::Option::None + } + } + + fn values() -> &'static [Self] { + static values: &'static [WsSource] = &[ + WsSource::Doc, + ]; + values + } + + fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsSource", file_descriptor_proto()) + }) + } +} + +impl ::std::marker::Copy for WsSource { +} + +impl ::std::default::Default for WsSource { + fn default() -> Self { + WsSource::Doc + } +} + +impl ::protobuf::reflect::ProtobufValue for WsSource { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) + } +} + static file_descriptor_proto_data: &'static [u8] = b"\ - \n\tmsg.proto\"7\n\tWsMessage\x12\x16\n\x06source\x18\x01\x20\x01(\tR\ - \x06source\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04dataJ\x98\x01\n\ - \x06\x12\x04\0\0\x05\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\ - \x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x11\n\x0b\n\ - \x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\ - \x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\n\x05\ - \x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\ - \x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\ - \x04\0\x02\x01\x01\x12\x03\x04\n\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\ - \x03\x04\x11\x12b\x06proto3\ + \n\tmsg.proto\"B\n\tWsMessage\x12!\n\x06source\x18\x01\x20\x01(\x0e2\t.W\ + sSourceR\x06source\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data*\x13\ + \n\x08WsSource\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\ + \n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\ + \n\n\x03\x04\0\x01\x12\x03\x02\x08\x11\n\x0b\n\x04\x04\0\x02\0\x12\x03\ + \x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\x0c\n\x0c\n\x05\ + \x04\0\x02\0\x01\x12\x03\x03\r\x13\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\ + \x03\x16\x17\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\ + \x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\ + \x03\x04\n\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\ + \x02\x05\0\x12\x04\x06\0\x08\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\r\n\ + \x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x0c\n\x0c\n\x05\x05\0\x02\0\x01\ + \x12\x03\x07\x04\x07\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\n\x0bb\x06p\ + roto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-ws/src/protobuf/proto/msg.proto b/rust-lib/flowy-ws/src/protobuf/proto/msg.proto index ce54b34bcd..36b405fefe 100644 --- a/rust-lib/flowy-ws/src/protobuf/proto/msg.proto +++ b/rust-lib/flowy-ws/src/protobuf/proto/msg.proto @@ -1,6 +1,9 @@ syntax = "proto3"; message WsMessage { - string source = 1; + WsSource source = 1; bytes data = 2; } +enum WsSource { + Doc = 0; +} diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index b33e77d043..595958a1c3 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -2,6 +2,7 @@ use crate::{ connect::{Retry, WsConnectionFuture}, errors::WsError, WsMessage, + WsSource, }; use flowy_net::errors::ServerError; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -23,7 +24,7 @@ use tokio_tungstenite::tungstenite::{ pub type MsgReceiver = UnboundedReceiver; pub type MsgSender = UnboundedSender; pub trait WsMessageHandler: Sync + Send + 'static { - fn source(&self) -> String; + fn source(&self) -> WsSource; fn receive_message(&self, msg: WsMessage); } @@ -50,7 +51,7 @@ pub enum WsState { } pub struct WsController { - handlers: HashMap>, + handlers: HashMap>, state_notify: Arc>, #[allow(dead_code)] addr: Option, @@ -83,7 +84,7 @@ impl WsController { pub fn add_handler(&mut self, handler: Arc) -> Result<(), WsError> { let source = handler.source(); if self.handlers.contains_key(&source) { - log::error!("{} source is already registered", source); + log::error!("WsSource's {:?} is already registered", source); } self.handlers.insert(source, handler); Ok(()) @@ -163,11 +164,11 @@ impl WsController { pub struct WsHandlerFuture { #[pin] msg_rx: MsgReceiver, - handlers: HashMap>, + handlers: HashMap>, } impl WsHandlerFuture { - fn new(handlers: HashMap>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } + fn new(handlers: HashMap>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } } impl Future for WsHandlerFuture { @@ -203,19 +204,16 @@ impl WsSender { Ok(()) } - pub fn send_text(&self, source: &str, text: &str) -> Result<(), WsError> { + pub fn send_text(&self, source: WsSource, text: &str) -> Result<(), WsError> { let msg = WsMessage { - source: source.to_string(), + source, data: text.as_bytes().to_vec(), }; self.send_msg(msg) } - pub fn send_binary(&self, source: &str, bytes: Vec) -> Result<(), WsError> { - let msg = WsMessage { - source: source.to_string(), - data: bytes, - }; + pub fn send_binary(&self, source: WsSource, bytes: Vec) -> Result<(), WsError> { + let msg = WsMessage { source, data: bytes }; self.send_msg(msg) }