From 7e1cf1222f1f6a65bdea1a1b8a530096dbc8ab36 Mon Sep 17 00:00:00 2001 From: appflowy Date: Fri, 2 Jul 2021 20:45:51 +0800 Subject: [PATCH] [refactor]: 1. replace unbounded sender with directory call using static runtime 2. sync + send for handler --- rust-lib/flowy-sys/Cargo.toml | 2 +- rust-lib/flowy-sys/src/dispatch.rs | 174 +++++++++++++++++++ rust-lib/flowy-sys/src/error/error.rs | 8 +- rust-lib/flowy-sys/src/lib.rs | 4 +- rust-lib/flowy-sys/src/module/container.rs | 103 ++++++++++- rust-lib/flowy-sys/src/module/data.rs | 27 ++- rust-lib/flowy-sys/src/module/module.rs | 86 ++++----- rust-lib/flowy-sys/src/request/request.rs | 11 +- rust-lib/flowy-sys/src/response/data.rs | 9 +- rust-lib/flowy-sys/src/response/responder.rs | 2 +- rust-lib/flowy-sys/src/response/response.rs | 2 +- rust-lib/flowy-sys/src/rt/runtime.rs | 25 +-- rust-lib/flowy-sys/src/sender/data.rs | 99 ----------- rust-lib/flowy-sys/src/sender/mod.rs | 5 - rust-lib/flowy-sys/src/sender/sender.rs | 167 ------------------ rust-lib/flowy-sys/src/service/boxed.rs | 67 ++++--- rust-lib/flowy-sys/src/service/handler.rs | 22 +-- rust-lib/flowy-sys/src/system.rs | 27 ++- rust-lib/flowy-sys/src/util/mod.rs | 26 +++ rust-lib/flowy-sys/tests/api/helper.rs | 44 +---- rust-lib/flowy-sys/tests/api/module.rs | 20 +-- 21 files changed, 449 insertions(+), 481 deletions(-) create mode 100644 rust-lib/flowy-sys/src/dispatch.rs delete mode 100644 rust-lib/flowy-sys/src/sender/data.rs delete mode 100644 rust-lib/flowy-sys/src/sender/mod.rs delete mode 100644 rust-lib/flowy-sys/src/sender/sender.rs diff --git a/rust-lib/flowy-sys/Cargo.toml b/rust-lib/flowy-sys/Cargo.toml index c108b0e74a..77635ae1f2 100644 --- a/rust-lib/flowy-sys/Cargo.toml +++ b/rust-lib/flowy-sys/Cargo.toml @@ -12,7 +12,7 @@ paste = "1" futures-channel = "0.3.15" futures = "0.3.15" futures-util = "0.3.15" -bytes = "0.5" +bytes = "1.0" tokio = { version = "1", features = ["full"] } uuid = { version = "0.8", features = ["serde", "v4"] } log = "0.4.14" diff --git a/rust-lib/flowy-sys/src/dispatch.rs b/rust-lib/flowy-sys/src/dispatch.rs new file mode 100644 index 0000000000..bc47b939c9 --- /dev/null +++ b/rust-lib/flowy-sys/src/dispatch.rs @@ -0,0 +1,174 @@ +use crate::{ + error::{Error, InternalError, SystemError}, + module::{as_module_map, Event, Module, ModuleMap, ModuleRequest}, + request::Payload, + response::EventResponse, + service::{Service, ServiceFactory}, + util::tokio_default_runtime, +}; +use derivative::*; +use futures_core::future::BoxFuture; +use lazy_static::lazy_static; +use std::{ + fmt::{Debug, Display}, + hash::Hash, + sync::RwLock, +}; + +lazy_static! { + pub static ref EVENT_DISPATCH: RwLock> = RwLock::new(None); +} + +pub struct EventDispatch { + module_map: ModuleMap, + runtime: tokio::runtime::Runtime, +} + +impl EventDispatch { + pub fn construct(module_factory: F) + where + F: FnOnce() -> Vec, + { + let modules = module_factory(); + let module_map = as_module_map(modules); + let runtime = tokio_default_runtime().unwrap(); + + let dispatch = EventDispatch { + module_map, + runtime, + }; + + *(EVENT_DISPATCH.write().unwrap()) = Some(dispatch); + } + + pub async fn async_send(request: DispatchRequest) -> Result + where + T: 'static + Debug + Send + Sync, + { + match EVENT_DISPATCH.read() { + Ok(dispatch) => { + let dispatch = dispatch.as_ref().unwrap(); + let module_map = dispatch.module_map.clone(); + let service = Box::new(DispatchService { module_map }); + dispatch + .runtime + .spawn(async move { service.call(request).await }) + .await + .unwrap_or_else(|e| { + let msg = format!("{:?}", e); + Ok(InternalError::new(msg).as_response()) + }) + }, + + Err(e) => { + let msg = format!("{:?}", e); + Err(InternalError::new(msg).into()) + }, + } + } + + pub fn sync_send(request: DispatchRequest) -> Result + where + T: 'static + Debug + Send + Sync, + { + futures::executor::block_on(async { EventDispatch::async_send(request).await }) + } +} + +pub type BoxStreamCallback = Box; + +#[derive(Derivative)] +#[derivative(Debug)] +pub struct DispatchRequest +where + T: 'static + Debug, +{ + pub config: T, + pub event: Event, + pub payload: Option, + #[derivative(Debug = "ignore")] + pub callback: Option>, +} + +impl DispatchRequest +where + T: 'static + Debug, +{ + pub fn new(config: T, event: E) -> Self + where + E: Eq + Hash + Debug + Clone + Display, + { + Self { + config, + payload: None, + event: event.into(), + callback: None, + } + } + + pub fn payload(mut self, payload: Payload) -> Self { + self.payload = Some(payload); + self + } + + pub fn callback(mut self, callback: F) -> Self + where + F: FnOnce(T, EventResponse) + 'static + Send + Sync, + { + self.callback = Some(Box::new(callback)); + self + } +} + +pub(crate) struct DispatchService { + pub(crate) module_map: ModuleMap, +} + +impl Service> for DispatchService +where + T: 'static + Debug + Send + Sync, +{ + type Response = EventResponse; + type Error = SystemError; + type Future = BoxFuture<'static, Result>; + + fn call(&self, dispatch_request: DispatchRequest) -> Self::Future { + let module_map = self.module_map.clone(); + let DispatchRequest { + config, + event, + payload, + callback, + } = dispatch_request; + + let mut request = ModuleRequest::new(event.clone()); + if let Some(payload) = payload { + request = request.payload(payload); + }; + Box::pin(async move { + let result = { + match module_map.get(&event) { + Some(module) => { + let fut = module.new_service(()); + let service_fut = fut.await?.call(request); + service_fut.await + }, + None => { + let msg = format!( + "Can not find the module to handle the request:{:?}", + request + ); + Err(InternalError::new(msg).into()) + }, + } + }; + + let response = result.unwrap_or_else(|e| e.into()); + if let Some(callback) = callback { + callback(config, response.clone()); + } + + Ok(response) + }) + } +} diff --git a/rust-lib/flowy-sys/src/error/error.rs b/rust-lib/flowy-sys/src/error/error.rs index 671ab7c982..a0fa6c6e7b 100644 --- a/rust-lib/flowy-sys/src/error/error.rs +++ b/rust-lib/flowy-sys/src/error/error.rs @@ -7,7 +7,7 @@ use serde::{Serialize, Serializer}; use std::{fmt, option::NoneError}; use tokio::sync::mpsc::error::SendError; -pub trait Error: fmt::Debug + fmt::Display + DynClone { +pub trait Error: fmt::Debug + fmt::Display + DynClone + Send + Sync { fn status_code(&self) -> StatusCode; fn as_response(&self) -> EventResponse { EventResponse::new(self.status_code()) } @@ -83,21 +83,21 @@ impl InternalError { impl fmt::Debug for InternalError where - T: fmt::Debug + 'static + Clone, + T: fmt::Debug + 'static + Clone + Send + Sync, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.inner, f) } } impl fmt::Display for InternalError where - T: fmt::Debug + fmt::Display + 'static + Clone, + T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.inner, f) } } impl Error for InternalError where - T: fmt::Debug + fmt::Display + 'static + Clone, + T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync, { fn status_code(&self) -> StatusCode { StatusCode::Err } diff --git a/rust-lib/flowy-sys/src/lib.rs b/rust-lib/flowy-sys/src/lib.rs index 9479f5308c..a89876bb3e 100644 --- a/rust-lib/flowy-sys/src/lib.rs +++ b/rust-lib/flowy-sys/src/lib.rs @@ -8,9 +8,9 @@ mod rt; mod service; mod util; -mod sender; +mod dispatch; mod system; pub mod prelude { - pub use crate::{error::*, module::*, request::*, response::*, rt::*, sender::*}; + pub use crate::{dispatch::*, error::*, module::*, request::*, response::*, rt::*}; } diff --git a/rust-lib/flowy-sys/src/module/container.rs b/rust-lib/flowy-sys/src/module/container.rs index 2d512c0586..118dea0455 100644 --- a/rust-lib/flowy-sys/src/module/container.rs +++ b/rust-lib/flowy-sys/src/module/container.rs @@ -5,7 +5,7 @@ use std::{ #[derive(Default)] pub struct DataContainer { - map: HashMap>, + map: HashMap>, } impl DataContainer { @@ -16,27 +16,112 @@ impl DataContainer { } } - pub fn insert(&mut self, val: T) -> Option { + pub fn insert(&mut self, val: T) -> Option + where + T: 'static + Send + Sync, + { self.map .insert(TypeId::of::(), Box::new(val)) .and_then(downcast_owned) } - pub fn remove(&mut self) -> Option { self.map.remove(&TypeId::of::()).and_then(downcast_owned) } - - pub fn get(&self) -> Option<&T> { - self.map.get(&TypeId::of::()).and_then(|boxed| boxed.downcast_ref()) + pub fn remove(&mut self) -> Option + where + T: 'static + Send + Sync, + { + self.map.remove(&TypeId::of::()).and_then(downcast_owned) } - pub fn get_mut(&mut self) -> Option<&mut T> { + pub fn get(&self) -> Option<&T> + where + T: 'static + Send + Sync, + { + self.map + .get(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_ref()) + } + + pub fn get_mut(&mut self) -> Option<&mut T> + where + T: 'static + Send + Sync, + { self.map .get_mut(&TypeId::of::()) .and_then(|boxed| boxed.downcast_mut()) } - pub fn contains(&self) -> bool { self.map.contains_key(&TypeId::of::()) } + pub fn contains(&self) -> bool + where + T: 'static + Send + Sync, + { + self.map.contains_key(&TypeId::of::()) + } pub fn extend(&mut self, other: DataContainer) { self.map.extend(other.map); } } -fn downcast_owned(boxed: Box) -> Option { boxed.downcast().ok().map(|boxed| *boxed) } +fn downcast_owned(boxed: Box) -> Option { + boxed.downcast().ok().map(|boxed| *boxed) +} + +// use std::{ +// any::{Any, TypeId}, +// collections::HashMap, +// sync::RwLock, +// }; +// +// #[derive(Default)] +// pub struct DataContainer { +// map: RwLock>>, +// } +// +// impl DataContainer { +// #[inline] +// pub fn new() -> DataContainer { +// DataContainer { +// map: RwLock::new(HashMap::default()), +// } +// } +// +// pub fn insert(&mut self, val: T) -> Option { +// self.map +// .write() +// .unwrap() +// .insert(TypeId::of::(), Box::new(val)) +// .and_then(downcast_owned) +// } +// +// pub fn remove(&mut self) -> Option { +// self.map +// .write() +// .unwrap() +// .remove(&TypeId::of::()) +// .and_then(downcast_owned) +// } +// +// pub fn get(&self) -> Option<&T> { +// self.map +// .read() +// .unwrap() +// .get(&TypeId::of::()) +// .and_then(|boxed| boxed.downcast_ref()) +// } +// +// pub fn get_mut(&mut self) -> Option<&mut T> { +// self.map +// .write() +// .unwrap() +// .get_mut(&TypeId::of::()) +// .and_then(|boxed| boxed.downcast_mut()) +// } +// +// pub fn contains(&self) -> bool { +// self.map.read().unwrap().contains_key(&TypeId::of::()) +// } +// +// pub fn extend(&mut self, other: DataContainer) { +// self.map.write().unwrap().extend(other.map); } } +// +// fn downcast_owned(boxed: Box) -> Option { +// boxed.downcast().ok().map(|boxed| *boxed) +// } diff --git a/rust-lib/flowy-sys/src/module/data.rs b/rust-lib/flowy-sys/src/module/data.rs index adb01bf781..d46e75aded 100644 --- a/rust-lib/flowy-sys/src/module/data.rs +++ b/rust-lib/flowy-sys/src/module/data.rs @@ -5,29 +5,44 @@ use crate::{ }; use std::{ops::Deref, sync::Arc}; -pub struct ModuleData(Arc); +pub struct ModuleData(Arc); -impl ModuleData { +impl ModuleData +where + T: Send + Sync, +{ pub fn new(data: T) -> Self { ModuleData(Arc::new(data)) } pub fn get_ref(&self) -> &T { self.0.as_ref() } } -impl Deref for ModuleData { +impl Deref for ModuleData +where + T: ?Sized + Send + Sync, +{ type Target = Arc; fn deref(&self) -> &Arc { &self.0 } } -impl Clone for ModuleData { +impl Clone for ModuleData +where + T: ?Sized + Send + Sync, +{ fn clone(&self) -> ModuleData { ModuleData(self.0.clone()) } } -impl From> for ModuleData { +impl From> for ModuleData +where + T: ?Sized + Send + Sync, +{ fn from(arc: Arc) -> Self { ModuleData(arc) } } -impl FromRequest for ModuleData { +impl FromRequest for ModuleData +where + T: ?Sized + Send + Sync + 'static, +{ type Error = SystemError; type Future = Ready>; diff --git a/rust-lib/flowy-sys/src/module/module.rs b/rust-lib/flowy-sys/src/module/module.rs index edd8f9d6e5..68b7edb951 100644 --- a/rust-lib/flowy-sys/src/module/module.rs +++ b/rust-lib/flowy-sys/src/module/module.rs @@ -4,13 +4,11 @@ use std::{ future::Future, hash::Hash, pin::Pin, - rc::Rc, task::{Context, Poll}, }; -use futures_core::{future::LocalBoxFuture, ready}; +use futures_core::ready; use pin_project::pin_project; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ error::{InternalError, SystemError}, @@ -29,6 +27,21 @@ use crate::{ ServiceResponse, }, }; +use futures_core::future::BoxFuture; +use std::sync::Arc; + +pub type ModuleMap = Arc>>; +pub(crate) fn as_module_map(modules: Vec) -> ModuleMap { + let mut module_map = HashMap::new(); + modules.into_iter().for_each(|m| { + let events = m.events(); + let module = Arc::new(m); + events.into_iter().for_each(|e| { + module_map.insert(e, module.clone()); + }); + }); + Arc::new(module_map) +} #[derive(PartialEq, Eq, Hash, Debug, Clone)] pub struct Event(String); @@ -42,20 +55,15 @@ pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResp pub struct Module { name: String, data: DataContainer, - service_map: Rc>, - req_tx: UnboundedSender, - req_rx: UnboundedReceiver, + service_map: Arc>, } impl Module { pub fn new() -> Self { - let (req_tx, req_rx) = unbounded_channel::(); Self { name: "".to_owned(), data: DataContainer::new(), - service_map: Rc::new(HashMap::new()), - req_tx, - req_rx, + service_map: Arc::new(HashMap::new()), } } @@ -64,7 +72,7 @@ impl Module { self } - pub fn data(mut self, data: D) -> Self { + pub fn data(mut self, data: D) -> Self { self.data.insert(ModuleData::new(data)); self } @@ -72,8 +80,9 @@ impl Module { pub fn event(mut self, event: E, handler: H) -> Self where H: Handler, - T: FromRequest + 'static, - R: Future + 'static, + T: FromRequest + 'static + Send + Sync, + ::Future: Sync + Send, + R: Future + 'static + Send + Sync, R::Output: Responder + 'static, E: Eq + Hash + Debug + Clone + Display, { @@ -82,39 +91,17 @@ impl Module { log::error!("Duplicate Event: {:?}", &event); } - Rc::get_mut(&mut self.service_map) + Arc::get_mut(&mut self.service_map) .unwrap() .insert(event, factory(HandlerService::new(handler))); self } - pub fn forward_map(&self) -> HashMap> { + pub fn events(&self) -> Vec { self.service_map .keys() - .map(|key| (key.clone(), self.req_tx.clone())) - .collect::>() - } - - pub fn events(&self) -> Vec { self.service_map.keys().map(|key| key.clone()).collect::>() } -} - -impl Future for Module { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) { - None => return Poll::Ready(()), - Some(request) => { - let mut service = self.new_service(()); - if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) { - log::trace!("Spawn module service for request {}", request.id()); - tokio::task::spawn_local(async move { - let _ = service.call(request).await; - }); - } - }, - } - } + .map(|key| key.clone()) + .collect::>() } } @@ -140,23 +127,23 @@ impl ModuleRequest { self } - pub(crate) fn into_parts(self) -> (EventRequest, Payload) { (self.inner, self.payload) } - - pub(crate) fn into_service_request(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) } - pub(crate) fn id(&self) -> &str { &self.inner.id } pub(crate) fn event(&self) -> &Event { &self.inner.event } } +impl std::convert::Into for ModuleRequest { + fn into(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) } +} + impl ServiceFactory for Module { type Response = EventResponse; type Error = SystemError; type Service = BoxService; type Context = (); - type Future = LocalBoxFuture<'static, Result>; + type Future = BoxFuture<'static, Result>; - fn new_service(&self, cfg: Self::Context) -> Self::Future { + fn new_service(&self, _cfg: Self::Context) -> Self::Future { let service_map = self.service_map.clone(); Box::pin(async move { let service = ModuleService { service_map }; @@ -167,13 +154,13 @@ impl ServiceFactory for Module { } pub struct ModuleService { - service_map: Rc>, + service_map: Arc>, } impl Service for ModuleService { type Response = EventResponse; type Error = SystemError; - type Future = LocalBoxFuture<'static, Result>; + type Future = BoxFuture<'static, Result>; fn call(&self, request: ModuleRequest) -> Self::Future { log::trace!("Call module service for request {}", &request.id()); @@ -183,8 +170,7 @@ impl Service for ModuleService { let fut = ModuleServiceFuture { fut: Box::pin(async { let service = service_fut.await?; - let request = request.into_service_request(); - service.call(request).await + service.call(request.into()).await }), }; Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) }) @@ -200,7 +186,7 @@ impl Service for ModuleService { #[pin_project] pub struct ModuleServiceFuture { #[pin] - fut: LocalBoxFuture<'static, Result>, + fut: BoxFuture<'static, Result>, } impl Future for ModuleServiceFuture { diff --git a/rust-lib/flowy-sys/src/request/request.rs b/rust-lib/flowy-sys/src/request/request.rs index dac95ade6d..ae3d16a20d 100644 --- a/rust-lib/flowy-sys/src/request/request.rs +++ b/rust-lib/flowy-sys/src/request/request.rs @@ -4,15 +4,13 @@ use crate::{ error::{InternalError, SystemError}, module::Event, request::payload::Payload, - response::Responder, util::ready::{ready, Ready}, }; -use crate::response::{EventResponse, ResponseBuilder}; + use futures_core::ready; use std::{ fmt::Debug, - hash::Hash, ops, pin::Pin, task::{Context, Poll}, @@ -135,8 +133,11 @@ where match payload { Payload::None => ready(Err(unexpected_none_payload(req))), Payload::Bytes(bytes) => { - let data: T = bincode::deserialize(bytes).unwrap(); - ready(Ok(Data(data))) + let s = String::from_utf8_lossy(bytes); + match serde_json::from_str(s.as_ref()) { + Ok(data) => ready(Ok(Data(data))), + Err(e) => ready(Err(InternalError::new(format!("{:?}", e)).into())), + } }, } } diff --git a/rust-lib/flowy-sys/src/response/data.rs b/rust-lib/flowy-sys/src/response/data.rs index 9115174204..1d748f25b7 100644 --- a/rust-lib/flowy-sys/src/response/data.rs +++ b/rust-lib/flowy-sys/src/response/data.rs @@ -1,5 +1,5 @@ -use bytes::{Buf, Bytes}; -use serde::{Deserialize, Serialize}; +use bytes::{Bytes}; + use std::{fmt, fmt::Formatter}; #[derive(Debug, Clone)] @@ -27,7 +27,10 @@ impl std::convert::Into for &'_ String { } impl std::convert::Into for Bytes { - fn into(self) -> ResponseData { ResponseData::Bytes(self.bytes().to_vec()) } + fn into(self) -> ResponseData { + // Opti(nathan): do not copy the bytes? + ResponseData::Bytes(self.as_ref().to_vec()) + } } impl std::convert::Into for Vec { diff --git a/rust-lib/flowy-sys/src/response/responder.rs b/rust-lib/flowy-sys/src/response/responder.rs index fb889778db..55b97cf50f 100644 --- a/rust-lib/flowy-sys/src/response/responder.rs +++ b/rust-lib/flowy-sys/src/response/responder.rs @@ -4,7 +4,7 @@ use crate::{ response::{EventResponse, ResponseBuilder}, }; use bytes::Bytes; -use std::ops; + pub trait Responder { fn respond_to(self, req: &EventRequest) -> EventResponse; diff --git a/rust-lib/flowy-sys/src/response/response.rs b/rust-lib/flowy-sys/src/response/response.rs index 6c0324feee..60766f71e2 100644 --- a/rust-lib/flowy-sys/src/response/response.rs +++ b/rust-lib/flowy-sys/src/response/response.rs @@ -5,7 +5,7 @@ use crate::{ }; use crate::request::Data; -use serde::{Deserialize, Serialize, Serializer}; + use std::{fmt, fmt::Formatter}; #[derive(Clone, Debug, Eq, PartialEq)] diff --git a/rust-lib/flowy-sys/src/rt/runtime.rs b/rust-lib/flowy-sys/src/rt/runtime.rs index 2c24213bb9..428f107e09 100644 --- a/rust-lib/flowy-sys/src/rt/runtime.rs +++ b/rust-lib/flowy-sys/src/rt/runtime.rs @@ -1,5 +1,5 @@ -use std::{future::Future, io, thread}; -use thread_id; +use crate::util::tokio_default_runtime; +use std::{future::Future, io}; use tokio::{runtime, task::LocalSet}; #[derive(Debug)] @@ -10,26 +10,7 @@ pub struct Runtime { impl Runtime { pub fn new() -> io::Result { - let rt = runtime::Builder::new_multi_thread() - .thread_name("flowy-sys") - .enable_io() - .enable_time() - .on_thread_start(move || { - log::trace!( - "{:?} thread started: thread_id= {}", - thread::current(), - thread_id::get() - ); - }) - .on_thread_stop(move || { - log::trace!( - "{:?} thread stopping: thread_id= {}", - thread::current(), - thread_id::get(), - ); - }) - .build()?; - + let rt = tokio_default_runtime()?; Ok(Runtime { rt, local: LocalSet::new(), diff --git a/rust-lib/flowy-sys/src/sender/data.rs b/rust-lib/flowy-sys/src/sender/data.rs deleted file mode 100644 index 31c86c9898..0000000000 --- a/rust-lib/flowy-sys/src/sender/data.rs +++ /dev/null @@ -1,99 +0,0 @@ -use crate::{module::Event, request::Payload, response::EventResponse}; -use derivative::*; -use std::{ - fmt::{Debug, Display}, - hash::Hash, -}; -// #[derive(Debug)] -// pub struct SenderPayload { -// pub(crate) payload: Payload, -// pub(crate) event: Event, -// } -// -// impl SenderPayload { -// pub fn new(event: E) -> SenderPayload -// where -// E: Eq + Hash + Debug + Clone + Display, -// { -// Self { -// event: event.into(), -// payload: Payload::None, -// } -// } -// -// pub fn payload(mut self, payload: Payload) -> Self { -// self.payload = payload; -// self -// } -// -// pub fn from_bytes(bytes: Vec) -> Self { unimplemented!() } -// } -// -// impl std::convert::Into for SenderPayload { -// fn into(self) -> ModuleRequest { -// ModuleRequest::new(self.event).payload(self.payload) } } -// -// impl std::default::Default for SenderPayload { -// fn default() -> Self { SenderPayload::new("").payload(Payload::None) } -// } -// -// impl std::convert::Into for SenderPayload { -// fn into(self) -> EventRequest { unimplemented!() } -// } - -pub type BoxStreamCallback = Box; - -// #[derive(Debug)] -// pub struct SenderRequest2 -// where -// T: 'static + Debug, -// C: FnOnce(T, EventResponse) + 'static, -// { -// pub config: T, -// pub event: Event, -// pub payload: Option, -// pub callback: Box, -// } - -#[derive(Derivative)] -#[derivative(Debug)] -pub struct SenderRequest -where - T: 'static + Debug, -{ - pub config: T, - pub event: Event, - pub payload: Option, - #[derivative(Debug = "ignore")] - pub callback: Option>, -} - -impl SenderRequest -where - T: 'static + Debug, -{ - pub fn new(config: T, event: E) -> Self - where - E: Eq + Hash + Debug + Clone + Display, - { - Self { - config, - payload: None, - event: event.into(), - callback: None, - } - } - - pub fn payload(mut self, payload: Payload) -> Self { - self.payload = Some(payload); - self - } - - pub fn callback(mut self, callback: F) -> Self - where - F: FnOnce(T, EventResponse) + 'static + Send + Sync, - { - self.callback = Some(Box::new(callback)); - self - } -} diff --git a/rust-lib/flowy-sys/src/sender/mod.rs b/rust-lib/flowy-sys/src/sender/mod.rs deleted file mode 100644 index 9ddbdec49f..0000000000 --- a/rust-lib/flowy-sys/src/sender/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod data; -mod sender; - -pub use data::*; -pub use sender::*; diff --git a/rust-lib/flowy-sys/src/sender/sender.rs b/rust-lib/flowy-sys/src/sender/sender.rs deleted file mode 100644 index 7391e76c55..0000000000 --- a/rust-lib/flowy-sys/src/sender/sender.rs +++ /dev/null @@ -1,167 +0,0 @@ -use crate::{ - error::{InternalError, SystemError}, - module::ModuleRequest, - request::{EventRequest, Payload}, - response::EventResponse, - sender::SenderRequest, - service::{BoxService, Service, ServiceFactory}, - system::ModuleMap, -}; -use futures_core::{future::LocalBoxFuture, ready, task::Context}; -use std::{fmt::Debug, future::Future}; -use tokio::{ - macros::support::{Pin, Poll}, - sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, -}; - -macro_rules! service_factor_impl { - ($name:ident) => { - #[allow(non_snake_case, missing_docs)] - impl ServiceFactory> for $name - where - T: 'static + Debug, - { - type Response = EventResponse; - type Error = SystemError; - type Service = BoxService, Self::Response, Self::Error>; - type Context = (); - type Future = LocalBoxFuture<'static, Result>; - - fn new_service(&self, _cfg: Self::Context) -> Self::Future { - let module_map = self.module_map.clone(); - let service = Box::new(SenderService { module_map }); - Box::pin(async move { Ok(service as Self::Service) }) - } - } - }; -} - -struct SenderService { - module_map: ModuleMap, -} - -impl Service> for SenderService -where - T: 'static + Debug, -{ - type Response = EventResponse; - type Error = SystemError; - type Future = LocalBoxFuture<'static, Result>; - - fn call(&self, data: SenderRequest) -> Self::Future { - let module_map = self.module_map.clone(); - let SenderRequest { - config, - event, - payload, - callback, - } = data; - - let mut request = ModuleRequest::new(event.clone()); - if let Some(payload) = payload { - request = request.payload(payload); - } - - let fut = async move { - let result = { - match module_map.get(&event) { - Some(module) => { - let fut = module.new_service(()); - let service_fut = fut.await?.call(request); - service_fut.await - }, - None => { - let msg = format!("Can not find the module to handle the request:{:?}", request); - Err(InternalError::new(msg).into()) - }, - } - }; - - let response = result.unwrap_or_else(|e| e.into()); - if let Some(callback) = callback { - callback(config, response.clone()); - } - - Ok(response) - }; - Box::pin(fut) - } -} - -pub struct Sender -where - T: 'static + Debug, -{ - module_map: ModuleMap, - data_tx: UnboundedSender>, - data_rx: Option>>, -} - -service_factor_impl!(Sender); - -impl Sender -where - T: 'static + Debug, -{ - pub fn new(module_map: ModuleMap) -> Self { - let (data_tx, data_rx) = unbounded_channel::>(); - Self { - module_map, - data_tx, - data_rx: Some(data_rx), - } - } - - pub fn async_send(&self, data: SenderRequest) { let _ = self.data_tx.send(data); } - - pub fn sync_send(&self, data: SenderRequest) -> EventResponse { - let factory = self.new_service(()); - - futures::executor::block_on(async { - let service = factory.await.unwrap(); - service.call(data).await.unwrap() - }) - } - - pub fn take_rx(&mut self) -> UnboundedReceiver> { self.data_rx.take().unwrap() } -} - -pub struct SenderRunner -where - T: 'static + Debug, -{ - module_map: ModuleMap, - data_rx: UnboundedReceiver>, -} - -service_factor_impl!(SenderRunner); - -impl SenderRunner -where - T: 'static + Debug, -{ - pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver>) -> Self { - Self { module_map, data_rx } - } -} - -impl Future for SenderRunner -where - T: 'static + Debug, -{ - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match ready!(Pin::new(&mut self.data_rx).poll_recv(cx)) { - None => return Poll::Ready(()), - Some(ctx) => { - let factory = self.new_service(()); - tokio::task::spawn_local(async move { - let service = factory.await.unwrap(); - let _ = service.call(ctx).await; - }); - }, - } - } - } -} diff --git a/rust-lib/flowy-sys/src/service/boxed.rs b/rust-lib/flowy-sys/src/service/boxed.rs index 5de36f2de1..7fcf904070 100644 --- a/rust-lib/flowy-sys/src/service/boxed.rs +++ b/rust-lib/flowy-sys/src/service/boxed.rs @@ -1,27 +1,31 @@ use crate::service::{Service, ServiceFactory}; -use futures_core::future::LocalBoxFuture; +use futures_core::future::{BoxFuture}; pub fn factory(factory: SF) -> BoxServiceFactory where - SF: ServiceFactory + 'static, + SF: ServiceFactory + 'static + Sync + Send, Req: 'static, SF::Response: 'static, SF::Service: 'static, SF::Future: 'static, - SF::Error: 'static, + SF::Error: 'static + Send + Sync, + >::Service: Sync + Send, + <>::Service as Service>::Future: Send + Sync, + >::Future: Send + Sync, { BoxServiceFactory(Box::new(FactoryWrapper(factory))) } type Inner = Box< dyn ServiceFactory< - Req, - Context = Cfg, - Response = Res, - Error = Err, - Service = BoxService, - Future = LocalBoxFuture<'static, Result, Err>>, - >, + Req, + Context = Cfg, + Response = Res, + Error = Err, + Service = BoxService, + Future = BoxFuture<'static, Result, Err>>, + > + Sync + + Send, >; pub struct BoxServiceFactory(Inner); @@ -35,23 +39,26 @@ where type Error = Err; type Service = BoxService; type Context = Cfg; - type Future = LocalBoxFuture<'static, Result>; + type Future = BoxFuture<'static, Result>; fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) } } -pub type BoxService = - Box>>>; +pub type BoxService = Box< + dyn Service>> + + Sync + + Send, +>; -#[allow(dead_code)] -pub fn service(service: S) -> BoxService -where - S: Service + 'static, - Req: 'static, - S::Future: 'static, -{ - Box::new(ServiceWrapper::new(service)) -} +// #[allow(dead_code)] +// pub fn service(service: S) -> BoxService +// where +// S: Service + 'static, +// Req: 'static, +// S::Future: 'static, +// { +// Box::new(ServiceWrapper::new(service)) +// } impl Service for Box where @@ -75,11 +82,11 @@ impl ServiceWrapper { impl Service for ServiceWrapper where S: Service, - S::Future: 'static, + S::Future: 'static + Send + Sync, { type Response = Res; type Error = Err; - type Future = LocalBoxFuture<'static, Result>; + type Future = BoxFuture<'static, Result>; fn call(&self, req: Req) -> Self::Future { Box::pin(self.inner.call(req)) } } @@ -93,17 +100,21 @@ where Err: 'static, SF: ServiceFactory, SF::Future: 'static, - SF::Service: 'static, - >::Future: 'static, + SF::Service: 'static + Send + Sync, + <>::Service as Service>::Future: Send + Sync + 'static, + >::Future: Send + Sync, { type Response = Res; type Error = Err; type Service = BoxService; type Context = Cfg; - type Future = LocalBoxFuture<'static, Result>; + type Future = BoxFuture<'static, Result>; fn new_service(&self, cfg: Cfg) -> Self::Future { let f = self.0.new_service(cfg); - Box::pin(async { f.await.map(|s| Box::new(ServiceWrapper::new(s)) as Self::Service) }) + Box::pin(async { + f.await + .map(|s| Box::new(ServiceWrapper::new(s)) as Self::Service) + }) } } diff --git a/rust-lib/flowy-sys/src/service/handler.rs b/rust-lib/flowy-sys/src/service/handler.rs index 9954b3984f..9524ef4469 100644 --- a/rust-lib/flowy-sys/src/service/handler.rs +++ b/rust-lib/flowy-sys/src/service/handler.rs @@ -16,9 +16,9 @@ use crate::{ util::ready::*, }; -pub trait Handler: Clone + 'static +pub trait Handler: Clone + 'static + Sync + Send where - R: Future, + R: Future + Send + Sync, R::Output: Responder, { fn call(&self, param: T) -> R; @@ -28,7 +28,7 @@ pub struct HandlerService where H: Handler, T: FromRequest, - R: Future, + R: Future + Sync + Send, R::Output: Responder, { handler: H, @@ -39,7 +39,7 @@ impl HandlerService where H: Handler, T: FromRequest, - R: Future, + R: Future + Sync + Send, R::Output: Responder, { pub fn new(handler: H) -> Self { @@ -54,7 +54,7 @@ impl Clone for HandlerService where H: Handler, T: FromRequest, - R: Future, + R: Future + Sync + Send, R::Output: Responder, { fn clone(&self) -> Self { @@ -69,7 +69,7 @@ impl ServiceFactory for HandlerService where F: Handler, T: FromRequest, - R: Future, + R: Future + Send + Sync, R::Output: Responder, { type Response = ServiceResponse; @@ -85,7 +85,7 @@ impl Service for HandlerService where H: Handler, T: FromRequest, - R: Future, + R: Future + Sync + Send, R::Output: Responder, { type Response = ServiceResponse; @@ -104,7 +104,7 @@ pub enum HandlerServiceFuture where H: Handler, T: FromRequest, - R: Future, + R: Future + Sync + Send, R::Output: Responder, { Extract(#[pin] T::Future, Option, H), @@ -115,7 +115,7 @@ impl Future for HandlerServiceFuture where F: Handler, T: FromRequest, - R: Future, + R: Future + Sync + Send, R::Output: Responder, { type Output = Result; @@ -151,8 +151,8 @@ where macro_rules! factory_tuple ({ $($param:ident)* } => { impl Handler<($($param,)*), Res> for Func - where Func: Fn($($param),*) -> Res + Clone + 'static, - Res: Future, + where Func: Fn($($param),*) -> Res + Clone + 'static + Sync + Send, + Res: Future + Sync + Send, Res::Output: Responder, { #[allow(non_snake_case)] diff --git a/rust-lib/flowy-sys/src/system.rs b/rust-lib/flowy-sys/src/system.rs index 3965bafa1f..e8292497f1 100644 --- a/rust-lib/flowy-sys/src/system.rs +++ b/rust-lib/flowy-sys/src/system.rs @@ -1,9 +1,9 @@ use crate::{ - module::{Event, Module}, + module::{as_module_map, Module, ModuleMap}, rt::Runtime, }; use futures_core::{ready, task::Context}; -use std::{cell::RefCell, collections::HashMap, fmt::Debug, future::Future, io, rc::Rc, sync::Arc}; +use std::{cell::RefCell, fmt::Debug, future::Future, io, sync::Arc}; use tokio::{ macros::support::{Pin, Poll}, sync::{ @@ -21,7 +21,6 @@ pub enum SystemCommand { Exit(i8), } -pub type ModuleMap = Rc>>; pub struct FlowySystem { sys_cmd_tx: UnboundedSender, } @@ -32,7 +31,7 @@ impl FlowySystem { F: FnOnce() -> Vec, S: FnOnce(ModuleMap, &Runtime), { - let runtime = Runtime::new().unwrap(); + let runtime = Arc::new(Runtime::new().unwrap()); let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::(); let (stop_tx, stop_rx) = oneshot::channel(); @@ -41,21 +40,15 @@ impl FlowySystem { sys_cmd_rx, }); - let factory = module_factory(); - let mut module_map = HashMap::new(); - factory.into_iter().for_each(|m| { - let events = m.events(); - let rc_module = Rc::new(m); - events.into_iter().for_each(|e| { - module_map.insert(e, rc_module.clone()); - }); - }); + let module_map = as_module_map(module_factory()); + sender_factory(module_map.clone(), &runtime); let system = Self { sys_cmd_tx }; - sender_factory(Rc::new(module_map), &runtime); - FlowySystem::set_current(system); - let runner = SystemRunner { rt: runtime, stop_rx }; + let runner = SystemRunner { + rt: runtime, + stop_rx, + }; runner } @@ -107,7 +100,7 @@ impl Future for SystemController { } pub struct SystemRunner { - rt: Runtime, + rt: Arc, stop_rx: oneshot::Receiver, } diff --git a/rust-lib/flowy-sys/src/util/mod.rs b/rust-lib/flowy-sys/src/util/mod.rs index 849debcc97..a7960f90c9 100644 --- a/rust-lib/flowy-sys/src/util/mod.rs +++ b/rust-lib/flowy-sys/src/util/mod.rs @@ -1 +1,27 @@ +use std::{io, thread}; +use thread_id; +use tokio::runtime; + pub mod ready; + +pub(crate) fn tokio_default_runtime() -> io::Result { + runtime::Builder::new_multi_thread() + .thread_name("flowy-sys") + .enable_io() + .enable_time() + .on_thread_start(move || { + log::trace!( + "{:?} thread started: thread_id= {}", + thread::current(), + thread_id::get() + ); + }) + .on_thread_stop(move || { + log::trace!( + "{:?} thread stopping: thread_id= {}", + thread::current(), + thread_id::get(), + ); + }) + .build() +} diff --git a/rust-lib/flowy-sys/tests/api/helper.rs b/rust-lib/flowy-sys/tests/api/helper.rs index f29cd73339..723f16793e 100644 --- a/rust-lib/flowy-sys/tests/api/helper.rs +++ b/rust-lib/flowy-sys/tests/api/helper.rs @@ -1,4 +1,4 @@ -use flowy_sys::prelude::{EventResponse, FlowySystem, Module, Sender, SenderRequest, SenderRunner}; +use flowy_sys::prelude::*; use std::{cell::RefCell, sync::Once}; #[allow(dead_code)] @@ -10,44 +10,14 @@ pub fn setup_env() { }); } -thread_local!( - static SENDER: RefCell>> = RefCell::new(None); -); - -pub fn sync_send(data: SenderRequest) -> EventResponse { - SENDER.with(|cell| match &*cell.borrow() { - Some(stream) => stream.sync_send(data), - None => panic!(""), - }) +pub async fn async_send(data: DispatchRequest) -> Result { + EventDispatch::async_send(data).await } -pub fn async_send(data: SenderRequest) { - SENDER.with(|cell| match &*cell.borrow() { - Some(stream) => { - stream.async_send(data); - }, - None => panic!(""), - }); -} - -pub fn init_system(modules: Vec, f: F) +pub fn init_system(module_factory: F) where - F: FnOnce() + 'static, + F: FnOnce() -> Vec, { - FlowySystem::construct( - || modules, - |module_map, runtime| { - let mut sender = Sender::::new(module_map.clone()); - runtime.spawn(SenderRunner::new(module_map, sender.take_rx())); - - SENDER.with(|cell| { - *cell.borrow_mut() = Some(sender); - }); - }, - ) - .spawn(async { f() }) - .run() - .unwrap(); + let system = EventDispatch::new(module_factory); + EventDispatch::set_current(system); } - -pub fn stop_system() { FlowySystem::current().stop(); } diff --git a/rust-lib/flowy-sys/tests/api/module.rs b/rust-lib/flowy-sys/tests/api/module.rs index f28f615536..6003133bb0 100644 --- a/rust-lib/flowy-sys/tests/api/module.rs +++ b/rust-lib/flowy-sys/tests/api/module.rs @@ -2,20 +2,14 @@ use crate::helper::*; use flowy_sys::prelude::*; pub async fn hello() -> String { "say hello".to_string() } -#[test] -fn test_init() { + +#[tokio::test] +async fn test_init() { setup_env(); - let event = "1"; - let modules = vec![Module::new().event(event, hello)]; + init_system(|| vec![Module::new().event(event, hello)]); - init_system(modules, move || { - let request = SenderRequest::new(1, event).callback(|_config, response| { - log::info!("async resp: {:?}", response); - }); - - let resp = sync_send(request); - log::info!("sync resp: {:?}", resp); - stop_system(); - }); + let request = DispatchRequest::new(1, event); + let resp = async_send(request).await.unwrap(); + log::info!("sync resp: {:?}", resp); }