mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2025-04-24 22:57:12 -04:00
feat: run rustfmt with custom defined fmt configuration (#1848)
* chore: update rustfmt * chore: apply rustfmt format
This commit is contained in:
parent
e2496e734c
commit
6bb1c4e89c
459 changed files with 50554 additions and 46600 deletions
|
@ -3,25 +3,27 @@ use bytes::Bytes;
|
|||
|
||||
// To bytes
|
||||
pub trait ToBytes {
|
||||
fn into_bytes(self) -> Result<Bytes, DispatchError>;
|
||||
fn into_bytes(self) -> Result<Bytes, DispatchError>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "use_protobuf")]
|
||||
impl<T> ToBytes for T
|
||||
where
|
||||
T: std::convert::TryInto<Bytes, Error = protobuf::ProtobufError>,
|
||||
T: std::convert::TryInto<Bytes, Error = protobuf::ProtobufError>,
|
||||
{
|
||||
fn into_bytes(self) -> Result<Bytes, DispatchError> {
|
||||
match self.try_into() {
|
||||
Ok(data) => Ok(data),
|
||||
Err(e) => Err(InternalError::ProtobufError(format!(
|
||||
"Serial {:?} to bytes failed:{:?}",
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
))
|
||||
.into()),
|
||||
}
|
||||
fn into_bytes(self) -> Result<Bytes, DispatchError> {
|
||||
match self.try_into() {
|
||||
Ok(data) => Ok(data),
|
||||
Err(e) => Err(
|
||||
InternalError::ProtobufError(format!(
|
||||
"Serial {:?} to bytes failed:{:?}",
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
))
|
||||
.into(),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #[cfg(feature = "use_serde")]
|
||||
|
@ -40,30 +42,30 @@ where
|
|||
// From bytes
|
||||
|
||||
pub trait AFPluginFromBytes: Sized {
|
||||
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError>;
|
||||
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "use_protobuf")]
|
||||
impl<T> AFPluginFromBytes for T
|
||||
where
|
||||
// // https://stackoverflow.com/questions/62871045/tryfromu8-trait-bound-in-trait
|
||||
// T: for<'a> std::convert::TryFrom<&'a Bytes, Error =
|
||||
// protobuf::ProtobufError>,
|
||||
T: std::convert::TryFrom<Bytes, Error = protobuf::ProtobufError>,
|
||||
// // https://stackoverflow.com/questions/62871045/tryfromu8-trait-bound-in-trait
|
||||
// T: for<'a> std::convert::TryFrom<&'a Bytes, Error =
|
||||
// protobuf::ProtobufError>,
|
||||
T: std::convert::TryFrom<Bytes, Error = protobuf::ProtobufError>,
|
||||
{
|
||||
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
|
||||
match T::try_from(bytes) {
|
||||
Ok(data) => Ok(data),
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Parse payload to {} failed with error: {:?}",
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
);
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
|
||||
match T::try_from(bytes) {
|
||||
Ok(data) => Ok(data),
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Parse payload to {} failed with error: {:?}",
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
);
|
||||
Err(e.into())
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
//
|
||||
// #[cfg(feature = "use_serde")]
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use crate::{
|
||||
byte_trait::*,
|
||||
errors::{DispatchError, InternalError},
|
||||
request::{unexpected_none_payload, AFPluginEventRequest, FromAFPluginRequest, Payload},
|
||||
response::{AFPluginEventResponse, AFPluginResponder, ResponseBuilder},
|
||||
util::ready::{ready, Ready},
|
||||
byte_trait::*,
|
||||
errors::{DispatchError, InternalError},
|
||||
request::{unexpected_none_payload, AFPluginEventRequest, FromAFPluginRequest, Payload},
|
||||
response::{AFPluginEventResponse, AFPluginResponder, ResponseBuilder},
|
||||
util::ready::{ready, Ready},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use std::ops;
|
||||
|
@ -11,111 +11,118 @@ use std::ops;
|
|||
pub struct AFPluginData<T>(pub T);
|
||||
|
||||
impl<T> AFPluginData<T> {
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ops::Deref for AFPluginData<T> {
|
||||
type Target = T;
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
&self.0
|
||||
}
|
||||
fn deref(&self) -> &T {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ops::DerefMut for AFPluginData<T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
&mut self.0
|
||||
}
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FromAFPluginRequest for AFPluginData<T>
|
||||
where
|
||||
T: AFPluginFromBytes + 'static,
|
||||
T: AFPluginFromBytes + 'static,
|
||||
{
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<Self, DispatchError>>;
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<Self, DispatchError>>;
|
||||
|
||||
#[inline]
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
|
||||
match payload {
|
||||
Payload::None => ready(Err(unexpected_none_payload(req))),
|
||||
Payload::Bytes(bytes) => match T::parse_from_bytes(bytes.clone()) {
|
||||
Ok(data) => ready(Ok(AFPluginData(data))),
|
||||
Err(e) => ready(Err(InternalError::DeserializeFromBytes(format!("{}", e)).into())),
|
||||
},
|
||||
}
|
||||
#[inline]
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
|
||||
match payload {
|
||||
Payload::None => ready(Err(unexpected_none_payload(req))),
|
||||
Payload::Bytes(bytes) => match T::parse_from_bytes(bytes.clone()) {
|
||||
Ok(data) => ready(Ok(AFPluginData(data))),
|
||||
Err(e) => ready(Err(
|
||||
InternalError::DeserializeFromBytes(format!("{}", e)).into(),
|
||||
)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AFPluginResponder for AFPluginData<T>
|
||||
where
|
||||
T: ToBytes,
|
||||
T: ToBytes,
|
||||
{
|
||||
fn respond_to(self, _request: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
match self.into_inner().into_bytes() {
|
||||
Ok(bytes) => {
|
||||
log::trace!("Serialize Data: {:?} to event response", std::any::type_name::<T>());
|
||||
ResponseBuilder::Ok().data(bytes).build()
|
||||
}
|
||||
Err(e) => e.into(),
|
||||
}
|
||||
fn respond_to(self, _request: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
match self.into_inner().into_bytes() {
|
||||
Ok(bytes) => {
|
||||
log::trace!(
|
||||
"Serialize Data: {:?} to event response",
|
||||
std::any::type_name::<T>()
|
||||
);
|
||||
ResponseBuilder::Ok().data(bytes).build()
|
||||
},
|
||||
Err(e) => e.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::convert::TryFrom<&Payload> for AFPluginData<T>
|
||||
where
|
||||
T: AFPluginFromBytes,
|
||||
T: AFPluginFromBytes,
|
||||
{
|
||||
type Error = DispatchError;
|
||||
fn try_from(payload: &Payload) -> Result<AFPluginData<T>, Self::Error> {
|
||||
parse_payload(payload)
|
||||
}
|
||||
type Error = DispatchError;
|
||||
fn try_from(payload: &Payload) -> Result<AFPluginData<T>, Self::Error> {
|
||||
parse_payload(payload)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::convert::TryFrom<Payload> for AFPluginData<T>
|
||||
where
|
||||
T: AFPluginFromBytes,
|
||||
T: AFPluginFromBytes,
|
||||
{
|
||||
type Error = DispatchError;
|
||||
fn try_from(payload: Payload) -> Result<AFPluginData<T>, Self::Error> {
|
||||
parse_payload(&payload)
|
||||
}
|
||||
type Error = DispatchError;
|
||||
fn try_from(payload: Payload) -> Result<AFPluginData<T>, Self::Error> {
|
||||
parse_payload(&payload)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_payload<T>(payload: &Payload) -> Result<AFPluginData<T>, DispatchError>
|
||||
where
|
||||
T: AFPluginFromBytes,
|
||||
T: AFPluginFromBytes,
|
||||
{
|
||||
match payload {
|
||||
Payload::None => Err(InternalError::UnexpectedNone(format!(
|
||||
"Parse fail, expected payload:{:?}",
|
||||
std::any::type_name::<T>()
|
||||
))
|
||||
.into()),
|
||||
Payload::Bytes(bytes) => {
|
||||
let data = T::parse_from_bytes(bytes.clone())?;
|
||||
Ok(AFPluginData(data))
|
||||
}
|
||||
}
|
||||
match payload {
|
||||
Payload::None => Err(
|
||||
InternalError::UnexpectedNone(format!(
|
||||
"Parse fail, expected payload:{:?}",
|
||||
std::any::type_name::<T>()
|
||||
))
|
||||
.into(),
|
||||
),
|
||||
Payload::Bytes(bytes) => {
|
||||
let data = T::parse_from_bytes(bytes.clone())?;
|
||||
Ok(AFPluginData(data))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::convert::TryInto<Payload> for AFPluginData<T>
|
||||
where
|
||||
T: ToBytes,
|
||||
T: ToBytes,
|
||||
{
|
||||
type Error = DispatchError;
|
||||
type Error = DispatchError;
|
||||
|
||||
fn try_into(self) -> Result<Payload, Self::Error> {
|
||||
let inner = self.into_inner();
|
||||
let bytes = inner.into_bytes()?;
|
||||
Ok(Payload::Bytes(bytes))
|
||||
}
|
||||
fn try_into(self) -> Result<Payload, Self::Error> {
|
||||
let inner = self.into_inner();
|
||||
let bytes = inner.into_bytes()?;
|
||||
Ok(Payload::Bytes(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl ToBytes for AFPluginData<String> {
|
||||
fn into_bytes(self) -> Result<Bytes, DispatchError> {
|
||||
Ok(Bytes::from(self.0))
|
||||
}
|
||||
fn into_bytes(self) -> Result<Bytes, DispatchError> {
|
||||
Ok(Bytes::from(self.0))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use crate::runtime::AFPluginRuntime;
|
||||
use crate::{
|
||||
errors::{DispatchError, Error, InternalError},
|
||||
module::{as_plugin_map, AFPlugin, AFPluginMap, AFPluginRequest},
|
||||
response::AFPluginEventResponse,
|
||||
service::{AFPluginServiceFactory, Service},
|
||||
errors::{DispatchError, Error, InternalError},
|
||||
module::{as_plugin_map, AFPlugin, AFPluginMap, AFPluginRequest},
|
||||
response::AFPluginEventResponse,
|
||||
service::{AFPluginServiceFactory, Service},
|
||||
};
|
||||
use derivative::*;
|
||||
use futures_core::future::BoxFuture;
|
||||
|
@ -13,173 +13,180 @@ use std::{future::Future, sync::Arc};
|
|||
use tokio::macros::support::{Pin, Poll};
|
||||
|
||||
pub struct AFPluginDispatcher {
|
||||
plugins: AFPluginMap,
|
||||
runtime: AFPluginRuntime,
|
||||
plugins: AFPluginMap,
|
||||
runtime: AFPluginRuntime,
|
||||
}
|
||||
|
||||
impl AFPluginDispatcher {
|
||||
pub fn construct<F>(runtime: AFPluginRuntime, module_factory: F) -> AFPluginDispatcher
|
||||
where
|
||||
F: FnOnce() -> Vec<AFPlugin>,
|
||||
{
|
||||
let plugins = module_factory();
|
||||
tracing::trace!("{}", plugin_info(&plugins));
|
||||
AFPluginDispatcher {
|
||||
plugins: as_plugin_map(plugins),
|
||||
runtime,
|
||||
}
|
||||
pub fn construct<F>(runtime: AFPluginRuntime, module_factory: F) -> AFPluginDispatcher
|
||||
where
|
||||
F: FnOnce() -> Vec<AFPlugin>,
|
||||
{
|
||||
let plugins = module_factory();
|
||||
tracing::trace!("{}", plugin_info(&plugins));
|
||||
AFPluginDispatcher {
|
||||
plugins: as_plugin_map(plugins),
|
||||
runtime,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn async_send<Req>(dispatch: Arc<AFPluginDispatcher>, request: Req) -> DispatchFuture<AFPluginEventResponse>
|
||||
where
|
||||
Req: std::convert::Into<AFPluginRequest>,
|
||||
{
|
||||
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {}))
|
||||
}
|
||||
pub fn async_send<Req>(
|
||||
dispatch: Arc<AFPluginDispatcher>,
|
||||
request: Req,
|
||||
) -> DispatchFuture<AFPluginEventResponse>
|
||||
where
|
||||
Req: std::convert::Into<AFPluginRequest>,
|
||||
{
|
||||
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {}))
|
||||
}
|
||||
|
||||
pub fn async_send_with_callback<Req, Callback>(
|
||||
dispatch: Arc<AFPluginDispatcher>,
|
||||
request: Req,
|
||||
callback: Callback,
|
||||
) -> DispatchFuture<AFPluginEventResponse>
|
||||
where
|
||||
Req: std::convert::Into<AFPluginRequest>,
|
||||
Callback: FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
|
||||
{
|
||||
let request: AFPluginRequest = request.into();
|
||||
let plugins = dispatch.plugins.clone();
|
||||
let service = Box::new(DispatchService { plugins });
|
||||
tracing::trace!("Async event: {:?}", &request.event);
|
||||
let service_ctx = DispatchContext {
|
||||
request,
|
||||
callback: Some(Box::new(callback)),
|
||||
};
|
||||
let join_handle = dispatch.runtime.spawn(async move {
|
||||
service.call(service_ctx).await.unwrap_or_else(|e| {
|
||||
tracing::error!("Dispatch runtime error: {:?}", e);
|
||||
InternalError::Other(format!("{:?}", e)).as_response()
|
||||
})
|
||||
});
|
||||
pub fn async_send_with_callback<Req, Callback>(
|
||||
dispatch: Arc<AFPluginDispatcher>,
|
||||
request: Req,
|
||||
callback: Callback,
|
||||
) -> DispatchFuture<AFPluginEventResponse>
|
||||
where
|
||||
Req: std::convert::Into<AFPluginRequest>,
|
||||
Callback: FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
|
||||
{
|
||||
let request: AFPluginRequest = request.into();
|
||||
let plugins = dispatch.plugins.clone();
|
||||
let service = Box::new(DispatchService { plugins });
|
||||
tracing::trace!("Async event: {:?}", &request.event);
|
||||
let service_ctx = DispatchContext {
|
||||
request,
|
||||
callback: Some(Box::new(callback)),
|
||||
};
|
||||
let join_handle = dispatch.runtime.spawn(async move {
|
||||
service.call(service_ctx).await.unwrap_or_else(|e| {
|
||||
tracing::error!("Dispatch runtime error: {:?}", e);
|
||||
InternalError::Other(format!("{:?}", e)).as_response()
|
||||
})
|
||||
});
|
||||
|
||||
DispatchFuture {
|
||||
fut: Box::pin(async move {
|
||||
join_handle.await.unwrap_or_else(|e| {
|
||||
let msg = format!("EVENT_DISPATCH join error: {:?}", e);
|
||||
tracing::error!("{}", msg);
|
||||
let error = InternalError::JoinError(msg);
|
||||
error.as_response()
|
||||
})
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync_send(dispatch: Arc<AFPluginDispatcher>, request: AFPluginRequest) -> AFPluginEventResponse {
|
||||
futures::executor::block_on(async {
|
||||
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
|
||||
DispatchFuture {
|
||||
fut: Box::pin(async move {
|
||||
join_handle.await.unwrap_or_else(|e| {
|
||||
let msg = format!("EVENT_DISPATCH join error: {:?}", e);
|
||||
tracing::error!("{}", msg);
|
||||
let error = InternalError::JoinError(msg);
|
||||
error.as_response()
|
||||
})
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn<F>(&self, f: F)
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.runtime.spawn(f);
|
||||
}
|
||||
pub fn sync_send(
|
||||
dispatch: Arc<AFPluginDispatcher>,
|
||||
request: AFPluginRequest,
|
||||
) -> AFPluginEventResponse {
|
||||
futures::executor::block_on(async {
|
||||
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spawn<F>(&self, f: F)
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.runtime.spawn(f);
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub struct DispatchFuture<T: Send + Sync> {
|
||||
#[pin]
|
||||
pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
|
||||
#[pin]
|
||||
pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
|
||||
}
|
||||
|
||||
impl<T> Future for DispatchFuture<T>
|
||||
where
|
||||
T: Send + Sync,
|
||||
T: Send + Sync,
|
||||
{
|
||||
type Output = T;
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
Poll::Ready(futures_core::ready!(this.fut.poll(cx)))
|
||||
}
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
Poll::Ready(futures_core::ready!(this.fut.poll(cx)))
|
||||
}
|
||||
}
|
||||
|
||||
pub type BoxFutureCallback = Box<dyn FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
|
||||
pub type BoxFutureCallback =
|
||||
Box<dyn FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
|
||||
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub struct DispatchContext {
|
||||
pub request: AFPluginRequest,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub callback: Option<BoxFutureCallback>,
|
||||
pub request: AFPluginRequest,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub callback: Option<BoxFutureCallback>,
|
||||
}
|
||||
|
||||
impl DispatchContext {
|
||||
pub(crate) fn into_parts(self) -> (AFPluginRequest, Option<BoxFutureCallback>) {
|
||||
let DispatchContext { request, callback } = self;
|
||||
(request, callback)
|
||||
}
|
||||
pub(crate) fn into_parts(self) -> (AFPluginRequest, Option<BoxFutureCallback>) {
|
||||
let DispatchContext { request, callback } = self;
|
||||
(request, callback)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct DispatchService {
|
||||
pub(crate) plugins: AFPluginMap,
|
||||
pub(crate) plugins: AFPluginMap,
|
||||
}
|
||||
|
||||
impl Service<DispatchContext> for DispatchService {
|
||||
type Response = AFPluginEventResponse;
|
||||
type Error = DispatchError;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
type Response = AFPluginEventResponse;
|
||||
type Error = DispatchError;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
#[cfg_attr(
|
||||
feature = "use_tracing",
|
||||
tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
|
||||
)]
|
||||
fn call(&self, ctx: DispatchContext) -> Self::Future {
|
||||
let module_map = self.plugins.clone();
|
||||
let (request, callback) = ctx.into_parts();
|
||||
#[cfg_attr(
|
||||
feature = "use_tracing",
|
||||
tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
|
||||
)]
|
||||
fn call(&self, ctx: DispatchContext) -> Self::Future {
|
||||
let module_map = self.plugins.clone();
|
||||
let (request, callback) = ctx.into_parts();
|
||||
|
||||
Box::pin(async move {
|
||||
let result = {
|
||||
// print_module_map_info(&module_map);
|
||||
match module_map.get(&request.event) {
|
||||
Some(module) => {
|
||||
tracing::trace!("Handle event: {:?} by {:?}", &request.event, module.name);
|
||||
let fut = module.new_service(());
|
||||
let service_fut = fut.await?.call(request);
|
||||
service_fut.await
|
||||
}
|
||||
None => {
|
||||
let msg = format!("Can not find the event handler. {:?}", request);
|
||||
tracing::error!("{}", msg);
|
||||
Err(InternalError::HandleNotFound(msg).into())
|
||||
}
|
||||
}
|
||||
};
|
||||
Box::pin(async move {
|
||||
let result = {
|
||||
// print_module_map_info(&module_map);
|
||||
match module_map.get(&request.event) {
|
||||
Some(module) => {
|
||||
tracing::trace!("Handle event: {:?} by {:?}", &request.event, module.name);
|
||||
let fut = module.new_service(());
|
||||
let service_fut = fut.await?.call(request);
|
||||
service_fut.await
|
||||
},
|
||||
None => {
|
||||
let msg = format!("Can not find the event handler. {:?}", request);
|
||||
tracing::error!("{}", msg);
|
||||
Err(InternalError::HandleNotFound(msg).into())
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let response = result.unwrap_or_else(|e| e.into());
|
||||
tracing::trace!("Dispatch result: {:?}", response);
|
||||
if let Some(callback) = callback {
|
||||
callback(response.clone()).await;
|
||||
}
|
||||
let response = result.unwrap_or_else(|e| e.into());
|
||||
tracing::trace!("Dispatch result: {:?}", response);
|
||||
if let Some(callback) = callback {
|
||||
callback(response.clone()).await;
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
})
|
||||
}
|
||||
Ok(response)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn plugin_info(plugins: &[AFPlugin]) -> String {
|
||||
let mut info = format!("{} plugins loaded\n", plugins.len());
|
||||
for module in plugins {
|
||||
info.push_str(&format!("-> {} loaded \n", module.name));
|
||||
}
|
||||
info
|
||||
let mut info = format!("{} plugins loaded\n", plugins.len());
|
||||
for module in plugins {
|
||||
info.push_str(&format!("-> {} loaded \n", module.name));
|
||||
}
|
||||
info
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn print_plugins(plugins: &AFPluginMap) {
|
||||
plugins.iter().for_each(|(k, v)| {
|
||||
tracing::info!("Event: {:?} plugin : {:?}", k, v.name);
|
||||
})
|
||||
plugins.iter().for_each(|(k, v)| {
|
||||
tracing::info!("Event: {:?} plugin : {:?}", k, v.name);
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use crate::{
|
||||
byte_trait::AFPluginFromBytes,
|
||||
request::AFPluginEventRequest,
|
||||
response::{AFPluginEventResponse, ResponseBuilder},
|
||||
byte_trait::AFPluginFromBytes,
|
||||
request::AFPluginEventRequest,
|
||||
response::{AFPluginEventResponse, ResponseBuilder},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use dyn_clone::DynClone;
|
||||
|
@ -10,124 +10,129 @@ use std::fmt;
|
|||
use tokio::{sync::mpsc::error::SendError, task::JoinError};
|
||||
|
||||
pub trait Error: fmt::Debug + DynClone + Send + Sync {
|
||||
fn as_response(&self) -> AFPluginEventResponse;
|
||||
fn as_response(&self) -> AFPluginEventResponse;
|
||||
}
|
||||
|
||||
dyn_clone::clone_trait_object!(Error);
|
||||
|
||||
impl<T: Error + 'static> From<T> for DispatchError {
|
||||
fn from(err: T) -> DispatchError {
|
||||
DispatchError { inner: Box::new(err) }
|
||||
fn from(err: T) -> DispatchError {
|
||||
DispatchError {
|
||||
inner: Box::new(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DispatchError {
|
||||
inner: Box<dyn Error>,
|
||||
inner: Box<dyn Error>,
|
||||
}
|
||||
|
||||
impl DispatchError {
|
||||
pub fn inner_error(&self) -> &dyn Error {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
pub fn inner_error(&self) -> &dyn Error {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DispatchError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{:?}", &self.inner)
|
||||
}
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{:?}", &self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for DispatchError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{:?}", &self.inner)
|
||||
}
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{:?}", &self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DispatchError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
None
|
||||
}
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
None
|
||||
}
|
||||
|
||||
fn cause(&self) -> Option<&dyn std::error::Error> {
|
||||
None
|
||||
}
|
||||
fn cause(&self) -> Option<&dyn std::error::Error> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SendError<AFPluginEventRequest>> for DispatchError {
|
||||
fn from(err: SendError<AFPluginEventRequest>) -> Self {
|
||||
InternalError::Other(format!("{}", err)).into()
|
||||
}
|
||||
fn from(err: SendError<AFPluginEventRequest>) -> Self {
|
||||
InternalError::Other(format!("{}", err)).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for DispatchError {
|
||||
fn from(s: String) -> Self {
|
||||
InternalError::Other(s).into()
|
||||
}
|
||||
fn from(s: String) -> Self {
|
||||
InternalError::Other(s).into()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "use_protobuf")]
|
||||
impl From<protobuf::ProtobufError> for DispatchError {
|
||||
fn from(e: protobuf::ProtobufError) -> Self {
|
||||
InternalError::ProtobufError(format!("{:?}", e)).into()
|
||||
}
|
||||
fn from(e: protobuf::ProtobufError) -> Self {
|
||||
InternalError::ProtobufError(format!("{:?}", e)).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl AFPluginFromBytes for DispatchError {
|
||||
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
|
||||
let s = String::from_utf8(bytes.to_vec()).unwrap();
|
||||
Ok(InternalError::DeserializeFromBytes(s).into())
|
||||
}
|
||||
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
|
||||
let s = String::from_utf8(bytes.to_vec()).unwrap();
|
||||
Ok(InternalError::DeserializeFromBytes(s).into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DispatchError> for AFPluginEventResponse {
|
||||
fn from(err: DispatchError) -> Self {
|
||||
err.inner_error().as_response()
|
||||
}
|
||||
fn from(err: DispatchError) -> Self {
|
||||
err.inner_error().as_response()
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "use_serde")]
|
||||
impl serde::Serialize for DispatchError {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.serialize_str(&format!("{}", self))
|
||||
}
|
||||
fn serialize<S>(
|
||||
&self,
|
||||
serializer: S,
|
||||
) -> Result<<S as serde::Serializer>::Ok, <S as serde::Serializer>::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.serialize_str(&format!("{}", self))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum InternalError {
|
||||
ProtobufError(String),
|
||||
UnexpectedNone(String),
|
||||
DeserializeFromBytes(String),
|
||||
JoinError(String),
|
||||
ServiceNotFound(String),
|
||||
HandleNotFound(String),
|
||||
Other(String),
|
||||
ProtobufError(String),
|
||||
UnexpectedNone(String),
|
||||
DeserializeFromBytes(String),
|
||||
JoinError(String),
|
||||
ServiceNotFound(String),
|
||||
HandleNotFound(String),
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl fmt::Display for InternalError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
InternalError::ProtobufError(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::UnexpectedNone(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::DeserializeFromBytes(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::JoinError(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::ServiceNotFound(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::HandleNotFound(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::Other(s) => fmt::Display::fmt(&s, f),
|
||||
}
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
InternalError::ProtobufError(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::UnexpectedNone(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::DeserializeFromBytes(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::JoinError(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::ServiceNotFound(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::HandleNotFound(s) => fmt::Display::fmt(&s, f),
|
||||
InternalError::Other(s) => fmt::Display::fmt(&s, f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for InternalError {
|
||||
fn as_response(&self) -> AFPluginEventResponse {
|
||||
ResponseBuilder::Err().data(self.to_string()).build()
|
||||
}
|
||||
fn as_response(&self) -> AFPluginEventResponse {
|
||||
ResponseBuilder::Err().data(self.to_string()).build()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<JoinError> for InternalError {
|
||||
fn from(e: JoinError) -> Self {
|
||||
InternalError::JoinError(format!("{}", e))
|
||||
}
|
||||
fn from(e: JoinError) -> Self {
|
||||
InternalError::JoinError(format!("{}", e))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,5 +16,7 @@ pub mod runtime;
|
|||
pub use errors::Error;
|
||||
|
||||
pub mod prelude {
|
||||
pub use crate::{byte_trait::*, data::*, dispatcher::*, errors::*, module::*, request::*, response::*};
|
||||
pub use crate::{
|
||||
byte_trait::*, data::*, dispatcher::*, errors::*, module::*, request::*, response::*,
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
#[macro_export]
|
||||
macro_rules! dispatch_future {
|
||||
($fut:expr) => {
|
||||
DispatchFuture {
|
||||
fut: Box::pin(async move { $fut.await }),
|
||||
}
|
||||
};
|
||||
($fut:expr) => {
|
||||
DispatchFuture {
|
||||
fut: Box::pin(async move { $fut.await }),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,59 +1,66 @@
|
|||
use std::{
|
||||
any::{Any, TypeId},
|
||||
collections::HashMap,
|
||||
any::{Any, TypeId},
|
||||
collections::HashMap,
|
||||
};
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct AFPluginStateMap(HashMap<TypeId, Box<dyn Any + Sync + Send>>);
|
||||
|
||||
impl AFPluginStateMap {
|
||||
#[inline]
|
||||
pub fn new() -> AFPluginStateMap {
|
||||
AFPluginStateMap(HashMap::default())
|
||||
}
|
||||
#[inline]
|
||||
pub fn new() -> AFPluginStateMap {
|
||||
AFPluginStateMap(HashMap::default())
|
||||
}
|
||||
|
||||
pub fn insert<T>(&mut self, val: T) -> Option<T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self.0.insert(TypeId::of::<T>(), Box::new(val)).and_then(downcast_owned)
|
||||
}
|
||||
pub fn insert<T>(&mut self, val: T) -> Option<T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self
|
||||
.0
|
||||
.insert(TypeId::of::<T>(), Box::new(val))
|
||||
.and_then(downcast_owned)
|
||||
}
|
||||
|
||||
pub fn remove<T>(&mut self) -> Option<T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self.0.remove(&TypeId::of::<T>()).and_then(downcast_owned)
|
||||
}
|
||||
pub fn remove<T>(&mut self) -> Option<T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self.0.remove(&TypeId::of::<T>()).and_then(downcast_owned)
|
||||
}
|
||||
|
||||
pub fn get<T>(&self) -> Option<&T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self.0.get(&TypeId::of::<T>()).and_then(|boxed| boxed.downcast_ref())
|
||||
}
|
||||
pub fn get<T>(&self) -> Option<&T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self
|
||||
.0
|
||||
.get(&TypeId::of::<T>())
|
||||
.and_then(|boxed| boxed.downcast_ref())
|
||||
}
|
||||
|
||||
pub fn get_mut<T>(&mut self) -> Option<&mut T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self.0
|
||||
.get_mut(&TypeId::of::<T>())
|
||||
.and_then(|boxed| boxed.downcast_mut())
|
||||
}
|
||||
pub fn get_mut<T>(&mut self) -> Option<&mut T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self
|
||||
.0
|
||||
.get_mut(&TypeId::of::<T>())
|
||||
.and_then(|boxed| boxed.downcast_mut())
|
||||
}
|
||||
|
||||
pub fn contains<T>(&self) -> bool
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self.0.contains_key(&TypeId::of::<T>())
|
||||
}
|
||||
pub fn contains<T>(&self) -> bool
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
self.0.contains_key(&TypeId::of::<T>())
|
||||
}
|
||||
|
||||
pub fn extend(&mut self, other: AFPluginStateMap) {
|
||||
self.0.extend(other.0);
|
||||
}
|
||||
pub fn extend(&mut self, other: AFPluginStateMap) {
|
||||
self.0.extend(other.0);
|
||||
}
|
||||
}
|
||||
|
||||
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
|
||||
boxed.downcast().ok().map(|boxed| *boxed)
|
||||
boxed.downcast().ok().map(|boxed| *boxed)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use crate::{
|
||||
errors::{DispatchError, InternalError},
|
||||
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
|
||||
util::ready::{ready, Ready},
|
||||
errors::{DispatchError, InternalError},
|
||||
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
|
||||
util::ready::{ready, Ready},
|
||||
};
|
||||
use std::{any::type_name, ops::Deref, sync::Arc};
|
||||
|
||||
|
@ -9,61 +9,64 @@ pub struct AFPluginState<T: ?Sized + Send + Sync>(Arc<T>);
|
|||
|
||||
impl<T> AFPluginState<T>
|
||||
where
|
||||
T: Send + Sync,
|
||||
T: Send + Sync,
|
||||
{
|
||||
pub fn new(data: T) -> Self {
|
||||
AFPluginState(Arc::new(data))
|
||||
}
|
||||
pub fn new(data: T) -> Self {
|
||||
AFPluginState(Arc::new(data))
|
||||
}
|
||||
|
||||
pub fn get_ref(&self) -> &T {
|
||||
self.0.as_ref()
|
||||
}
|
||||
pub fn get_ref(&self) -> &T {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for AFPluginState<T>
|
||||
where
|
||||
T: ?Sized + Send + Sync,
|
||||
T: ?Sized + Send + Sync,
|
||||
{
|
||||
type Target = Arc<T>;
|
||||
type Target = Arc<T>;
|
||||
|
||||
fn deref(&self) -> &Arc<T> {
|
||||
&self.0
|
||||
}
|
||||
fn deref(&self) -> &Arc<T> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for AFPluginState<T>
|
||||
where
|
||||
T: ?Sized + Send + Sync,
|
||||
T: ?Sized + Send + Sync,
|
||||
{
|
||||
fn clone(&self) -> AFPluginState<T> {
|
||||
AFPluginState(self.0.clone())
|
||||
}
|
||||
fn clone(&self) -> AFPluginState<T> {
|
||||
AFPluginState(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<Arc<T>> for AFPluginState<T>
|
||||
where
|
||||
T: ?Sized + Send + Sync,
|
||||
T: ?Sized + Send + Sync,
|
||||
{
|
||||
fn from(arc: Arc<T>) -> Self {
|
||||
AFPluginState(arc)
|
||||
}
|
||||
fn from(arc: Arc<T>) -> Self {
|
||||
AFPluginState(arc)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FromAFPluginRequest for AFPluginState<T>
|
||||
where
|
||||
T: ?Sized + Send + Sync + 'static,
|
||||
T: ?Sized + Send + Sync + 'static,
|
||||
{
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<Self, DispatchError>>;
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<Self, DispatchError>>;
|
||||
|
||||
#[inline]
|
||||
fn from_request(req: &AFPluginEventRequest, _: &mut Payload) -> Self::Future {
|
||||
if let Some(state) = req.get_state::<AFPluginState<T>>() {
|
||||
ready(Ok(state.clone()))
|
||||
} else {
|
||||
let msg = format!("Failed to get the plugin state of type: {}", type_name::<T>());
|
||||
log::error!("{}", msg,);
|
||||
ready(Err(InternalError::Other(msg).into()))
|
||||
}
|
||||
#[inline]
|
||||
fn from_request(req: &AFPluginEventRequest, _: &mut Payload) -> Self::Future {
|
||||
if let Some(state) = req.get_state::<AFPluginState<T>>() {
|
||||
ready(Ok(state.clone()))
|
||||
} else {
|
||||
let msg = format!(
|
||||
"Failed to get the plugin state of type: {}",
|
||||
type_name::<T>()
|
||||
);
|
||||
log::error!("{}", msg,);
|
||||
ready(Err(InternalError::Other(msg).into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use crate::{
|
||||
errors::{DispatchError, InternalError},
|
||||
module::{container::AFPluginStateMap, AFPluginState},
|
||||
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
|
||||
response::{AFPluginEventResponse, AFPluginResponder},
|
||||
service::{
|
||||
factory, AFPluginHandler, AFPluginHandlerService, AFPluginServiceFactory, BoxService, BoxServiceFactory,
|
||||
Service, ServiceRequest, ServiceResponse,
|
||||
},
|
||||
errors::{DispatchError, InternalError},
|
||||
module::{container::AFPluginStateMap, AFPluginState},
|
||||
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
|
||||
response::{AFPluginEventResponse, AFPluginResponder},
|
||||
service::{
|
||||
factory, AFPluginHandler, AFPluginHandlerService, AFPluginServiceFactory, BoxService,
|
||||
BoxServiceFactory, Service, ServiceRequest, ServiceResponse,
|
||||
},
|
||||
};
|
||||
use futures_core::future::BoxFuture;
|
||||
use futures_core::ready;
|
||||
|
@ -14,35 +14,35 @@ use nanoid::nanoid;
|
|||
use pin_project::pin_project;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt,
|
||||
fmt::{Debug, Display},
|
||||
future::Future,
|
||||
hash::Hash,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
collections::HashMap,
|
||||
fmt,
|
||||
fmt::{Debug, Display},
|
||||
future::Future,
|
||||
hash::Hash,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub type AFPluginMap = Arc<HashMap<AFPluginEvent, Arc<AFPlugin>>>;
|
||||
pub(crate) fn as_plugin_map(plugins: Vec<AFPlugin>) -> AFPluginMap {
|
||||
let mut plugin_map = HashMap::new();
|
||||
plugins.into_iter().for_each(|m| {
|
||||
let events = m.events();
|
||||
let plugins = Arc::new(m);
|
||||
events.into_iter().for_each(|e| {
|
||||
plugin_map.insert(e, plugins.clone());
|
||||
});
|
||||
let mut plugin_map = HashMap::new();
|
||||
plugins.into_iter().for_each(|m| {
|
||||
let events = m.events();
|
||||
let plugins = Arc::new(m);
|
||||
events.into_iter().for_each(|e| {
|
||||
plugin_map.insert(e, plugins.clone());
|
||||
});
|
||||
Arc::new(plugin_map)
|
||||
});
|
||||
Arc::new(plugin_map)
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
|
||||
pub struct AFPluginEvent(pub String);
|
||||
|
||||
impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for AFPluginEvent {
|
||||
fn from(t: T) -> Self {
|
||||
AFPluginEvent(format!("{}", t))
|
||||
}
|
||||
fn from(t: T) -> Self {
|
||||
AFPluginEvent(format!("{}", t))
|
||||
}
|
||||
}
|
||||
|
||||
/// A plugin is used to handle the events that the plugin can handle.
|
||||
|
@ -52,67 +52,74 @@ impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for AFPluginE
|
|||
/// which means only one handler will get called.
|
||||
///
|
||||
pub struct AFPlugin {
|
||||
pub name: String,
|
||||
pub name: String,
|
||||
|
||||
/// a list of `AFPluginState` that the plugin registers. The state can be read by the plugin's handler.
|
||||
states: Arc<AFPluginStateMap>,
|
||||
/// a list of `AFPluginState` that the plugin registers. The state can be read by the plugin's handler.
|
||||
states: Arc<AFPluginStateMap>,
|
||||
|
||||
/// Contains a list of factories that are used to generate the services used to handle the passed-in
|
||||
/// `ServiceRequest`.
|
||||
///
|
||||
event_service_factory:
|
||||
Arc<HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>>,
|
||||
/// Contains a list of factories that are used to generate the services used to handle the passed-in
|
||||
/// `ServiceRequest`.
|
||||
///
|
||||
event_service_factory: Arc<
|
||||
HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl std::default::Default for AFPlugin {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: "".to_owned(),
|
||||
states: Arc::new(AFPluginStateMap::new()),
|
||||
event_service_factory: Arc::new(HashMap::new()),
|
||||
}
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: "".to_owned(),
|
||||
states: Arc::new(AFPluginStateMap::new()),
|
||||
event_service_factory: Arc::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AFPlugin {
|
||||
pub fn new() -> Self {
|
||||
AFPlugin::default()
|
||||
}
|
||||
pub fn new() -> Self {
|
||||
AFPlugin::default()
|
||||
}
|
||||
|
||||
pub fn name(mut self, s: &str) -> Self {
|
||||
self.name = s.to_owned();
|
||||
self
|
||||
}
|
||||
pub fn name(mut self, s: &str) -> Self {
|
||||
self.name = s.to_owned();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn state<D: 'static + Send + Sync>(mut self, data: D) -> Self {
|
||||
Arc::get_mut(&mut self.states).unwrap().insert(AFPluginState::new(data));
|
||||
pub fn state<D: 'static + Send + Sync>(mut self, data: D) -> Self {
|
||||
Arc::get_mut(&mut self.states)
|
||||
.unwrap()
|
||||
.insert(AFPluginState::new(data));
|
||||
|
||||
self
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
|
||||
where
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest + 'static + Send + Sync,
|
||||
<T as FromAFPluginRequest>::Future: Sync + Send,
|
||||
R: Future + 'static + Send + Sync,
|
||||
R::Output: AFPluginResponder + 'static,
|
||||
E: Eq + Hash + Debug + Clone + Display,
|
||||
{
|
||||
let event: AFPluginEvent = event.into();
|
||||
if self.event_service_factory.contains_key(&event) {
|
||||
panic!("Register duplicate Event: {:?}", &event);
|
||||
} else {
|
||||
Arc::get_mut(&mut self.event_service_factory)
|
||||
.unwrap()
|
||||
.insert(event, factory(AFPluginHandlerService::new(handler)));
|
||||
}
|
||||
self
|
||||
pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
|
||||
where
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest + 'static + Send + Sync,
|
||||
<T as FromAFPluginRequest>::Future: Sync + Send,
|
||||
R: Future + 'static + Send + Sync,
|
||||
R::Output: AFPluginResponder + 'static,
|
||||
E: Eq + Hash + Debug + Clone + Display,
|
||||
{
|
||||
let event: AFPluginEvent = event.into();
|
||||
if self.event_service_factory.contains_key(&event) {
|
||||
panic!("Register duplicate Event: {:?}", &event);
|
||||
} else {
|
||||
Arc::get_mut(&mut self.event_service_factory)
|
||||
.unwrap()
|
||||
.insert(event, factory(AFPluginHandlerService::new(handler)));
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn events(&self) -> Vec<AFPluginEvent> {
|
||||
self.event_service_factory.keys().cloned().collect::<Vec<_>>()
|
||||
}
|
||||
pub fn events(&self) -> Vec<AFPluginEvent> {
|
||||
self
|
||||
.event_service_factory
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
||||
/// A request that will be passed to the corresponding plugin.
|
||||
|
@ -121,101 +128,106 @@ impl AFPlugin {
|
|||
///
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AFPluginRequest {
|
||||
pub id: String,
|
||||
pub event: AFPluginEvent,
|
||||
pub(crate) payload: Payload,
|
||||
pub id: String,
|
||||
pub event: AFPluginEvent,
|
||||
pub(crate) payload: Payload,
|
||||
}
|
||||
|
||||
impl AFPluginRequest {
|
||||
pub fn new<E>(event: E) -> Self
|
||||
where
|
||||
E: Into<AFPluginEvent>,
|
||||
{
|
||||
Self {
|
||||
id: nanoid!(6),
|
||||
event: event.into(),
|
||||
payload: Payload::None,
|
||||
}
|
||||
pub fn new<E>(event: E) -> Self
|
||||
where
|
||||
E: Into<AFPluginEvent>,
|
||||
{
|
||||
Self {
|
||||
id: nanoid!(6),
|
||||
event: event.into(),
|
||||
payload: Payload::None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn payload<P>(mut self, payload: P) -> Self
|
||||
where
|
||||
P: Into<Payload>,
|
||||
{
|
||||
self.payload = payload.into();
|
||||
self
|
||||
}
|
||||
pub fn payload<P>(mut self, payload: P) -> Self
|
||||
where
|
||||
P: Into<Payload>,
|
||||
{
|
||||
self.payload = payload.into();
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for AFPluginRequest {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}:{:?}", self.id, self.event)
|
||||
}
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}:{:?}", self.id, self.event)
|
||||
}
|
||||
}
|
||||
|
||||
impl AFPluginServiceFactory<AFPluginRequest> for AFPlugin {
|
||||
type Response = AFPluginEventResponse;
|
||||
type Error = DispatchError;
|
||||
type Service = BoxService<AFPluginRequest, Self::Response, Self::Error>;
|
||||
type Context = ();
|
||||
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
|
||||
type Response = AFPluginEventResponse;
|
||||
type Error = DispatchError;
|
||||
type Service = BoxService<AFPluginRequest, Self::Response, Self::Error>;
|
||||
type Context = ();
|
||||
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
|
||||
|
||||
fn new_service(&self, _cfg: Self::Context) -> Self::Future {
|
||||
let services = self.event_service_factory.clone();
|
||||
let states = self.states.clone();
|
||||
Box::pin(async move {
|
||||
let service = AFPluginService { services, states };
|
||||
Ok(Box::new(service) as Self::Service)
|
||||
})
|
||||
}
|
||||
fn new_service(&self, _cfg: Self::Context) -> Self::Future {
|
||||
let services = self.event_service_factory.clone();
|
||||
let states = self.states.clone();
|
||||
Box::pin(async move {
|
||||
let service = AFPluginService { services, states };
|
||||
Ok(Box::new(service) as Self::Service)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AFPluginService {
|
||||
services: Arc<HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>>,
|
||||
states: Arc<AFPluginStateMap>,
|
||||
services: Arc<
|
||||
HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
|
||||
>,
|
||||
states: Arc<AFPluginStateMap>,
|
||||
}
|
||||
|
||||
impl Service<AFPluginRequest> for AFPluginService {
|
||||
type Response = AFPluginEventResponse;
|
||||
type Error = DispatchError;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
type Response = AFPluginEventResponse;
|
||||
type Error = DispatchError;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn call(&self, request: AFPluginRequest) -> Self::Future {
|
||||
let AFPluginRequest { id, event, payload } = request;
|
||||
let states = self.states.clone();
|
||||
let request = AFPluginEventRequest::new(id, event, states);
|
||||
fn call(&self, request: AFPluginRequest) -> Self::Future {
|
||||
let AFPluginRequest { id, event, payload } = request;
|
||||
let states = self.states.clone();
|
||||
let request = AFPluginEventRequest::new(id, event, states);
|
||||
|
||||
match self.services.get(&request.event) {
|
||||
Some(factory) => {
|
||||
let service_fut = factory.new_service(());
|
||||
let fut = AFPluginServiceFuture {
|
||||
fut: Box::pin(async {
|
||||
let service = service_fut.await?;
|
||||
let service_req = ServiceRequest::new(request, payload);
|
||||
service.call(service_req).await
|
||||
}),
|
||||
};
|
||||
Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
|
||||
}
|
||||
None => {
|
||||
let msg = format!("Can not find service factory for event: {:?}", request.event);
|
||||
Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
|
||||
}
|
||||
}
|
||||
match self.services.get(&request.event) {
|
||||
Some(factory) => {
|
||||
let service_fut = factory.new_service(());
|
||||
let fut = AFPluginServiceFuture {
|
||||
fut: Box::pin(async {
|
||||
let service = service_fut.await?;
|
||||
let service_req = ServiceRequest::new(request, payload);
|
||||
service.call(service_req).await
|
||||
}),
|
||||
};
|
||||
Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
|
||||
},
|
||||
None => {
|
||||
let msg = format!(
|
||||
"Can not find service factory for event: {:?}",
|
||||
request.event
|
||||
);
|
||||
Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub struct AFPluginServiceFuture {
|
||||
#[pin]
|
||||
fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
|
||||
#[pin]
|
||||
fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
|
||||
}
|
||||
|
||||
impl Future for AFPluginServiceFuture {
|
||||
type Output = Result<AFPluginEventResponse, DispatchError>;
|
||||
type Output = Result<AFPluginEventResponse, DispatchError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
|
||||
Poll::Ready(Ok(response))
|
||||
}
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
|
||||
Poll::Ready(Ok(response))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,69 +7,69 @@ pub enum PayloadError {}
|
|||
#[derive(Clone)]
|
||||
#[cfg_attr(feature = "use_serde", derive(serde::Serialize))]
|
||||
pub enum Payload {
|
||||
None,
|
||||
Bytes(Bytes),
|
||||
None,
|
||||
Bytes(Bytes),
|
||||
}
|
||||
|
||||
impl Payload {
|
||||
pub fn to_vec(self) -> Vec<u8> {
|
||||
match self {
|
||||
Payload::None => vec![],
|
||||
Payload::Bytes(bytes) => bytes.to_vec(),
|
||||
}
|
||||
pub fn to_vec(self) -> Vec<u8> {
|
||||
match self {
|
||||
Payload::None => vec![],
|
||||
Payload::Bytes(bytes) => bytes.to_vec(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Payload {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
format_payload_print(self, f)
|
||||
}
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
format_payload_print(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Payload {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
format_payload_print(self, f)
|
||||
}
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
format_payload_print(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
fn format_payload_print(payload: &Payload, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match payload {
|
||||
Payload::Bytes(bytes) => f.write_fmt(format_args!("{} bytes", bytes.len())),
|
||||
Payload::None => f.write_str("Empty"),
|
||||
}
|
||||
match payload {
|
||||
Payload::Bytes(bytes) => f.write_fmt(format_args!("{} bytes", bytes.len())),
|
||||
Payload::None => f.write_str("Empty"),
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<String> for Payload {
|
||||
fn from(s: String) -> Self {
|
||||
Payload::Bytes(Bytes::from(s))
|
||||
}
|
||||
fn from(s: String) -> Self {
|
||||
Payload::Bytes(Bytes::from(s))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<&'_ String> for Payload {
|
||||
fn from(s: &String) -> Self {
|
||||
Payload::Bytes(Bytes::from(s.to_owned()))
|
||||
}
|
||||
fn from(s: &String) -> Self {
|
||||
Payload::Bytes(Bytes::from(s.to_owned()))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<Bytes> for Payload {
|
||||
fn from(bytes: Bytes) -> Self {
|
||||
Payload::Bytes(bytes)
|
||||
}
|
||||
fn from(bytes: Bytes) -> Self {
|
||||
Payload::Bytes(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<()> for Payload {
|
||||
fn from(_: ()) -> Self {
|
||||
Payload::None
|
||||
}
|
||||
fn from(_: ()) -> Self {
|
||||
Payload::None
|
||||
}
|
||||
}
|
||||
impl std::convert::From<Vec<u8>> for Payload {
|
||||
fn from(bytes: Vec<u8>) -> Self {
|
||||
Payload::Bytes(Bytes::from(bytes))
|
||||
}
|
||||
fn from(bytes: Vec<u8>) -> Self {
|
||||
Payload::Bytes(Bytes::from(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::convert::From<&str> for Payload {
|
||||
fn from(s: &str) -> Self {
|
||||
s.to_string().into()
|
||||
}
|
||||
fn from(s: &str) -> Self {
|
||||
s.to_string().into()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,118 +1,118 @@
|
|||
use std::future::Future;
|
||||
|
||||
use crate::{
|
||||
errors::{DispatchError, InternalError},
|
||||
module::{AFPluginEvent, AFPluginStateMap},
|
||||
request::payload::Payload,
|
||||
util::ready::{ready, Ready},
|
||||
errors::{DispatchError, InternalError},
|
||||
module::{AFPluginEvent, AFPluginStateMap},
|
||||
request::payload::Payload,
|
||||
util::ready::{ready, Ready},
|
||||
};
|
||||
use derivative::*;
|
||||
use futures_core::ready;
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
fmt::Debug,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Derivative)]
|
||||
pub struct AFPluginEventRequest {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) id: String,
|
||||
pub(crate) event: AFPluginEvent,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub(crate) states: Arc<AFPluginStateMap>,
|
||||
#[allow(dead_code)]
|
||||
pub(crate) id: String,
|
||||
pub(crate) event: AFPluginEvent,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub(crate) states: Arc<AFPluginStateMap>,
|
||||
}
|
||||
|
||||
impl AFPluginEventRequest {
|
||||
pub fn new<E>(id: String, event: E, module_data: Arc<AFPluginStateMap>) -> AFPluginEventRequest
|
||||
where
|
||||
E: Into<AFPluginEvent>,
|
||||
{
|
||||
Self {
|
||||
id,
|
||||
event: event.into(),
|
||||
states: module_data,
|
||||
}
|
||||
pub fn new<E>(id: String, event: E, module_data: Arc<AFPluginStateMap>) -> AFPluginEventRequest
|
||||
where
|
||||
E: Into<AFPluginEvent>,
|
||||
{
|
||||
Self {
|
||||
id,
|
||||
event: event.into(),
|
||||
states: module_data,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_state<T: 'static>(&self) -> Option<&T>
|
||||
where
|
||||
T: Send + Sync,
|
||||
{
|
||||
if let Some(data) = self.states.get::<T>() {
|
||||
return Some(data);
|
||||
}
|
||||
|
||||
pub fn get_state<T: 'static>(&self) -> Option<&T>
|
||||
where
|
||||
T: Send + Sync,
|
||||
{
|
||||
if let Some(data) = self.states.get::<T>() {
|
||||
return Some(data);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub trait FromAFPluginRequest: Sized {
|
||||
type Error: Into<DispatchError>;
|
||||
type Future: Future<Output = Result<Self, Self::Error>>;
|
||||
type Error: Into<DispatchError>;
|
||||
type Future: Future<Output = Result<Self, Self::Error>>;
|
||||
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future;
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
impl FromAFPluginRequest for () {
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<(), DispatchError>>;
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<(), DispatchError>>;
|
||||
|
||||
fn from_request(_req: &AFPluginEventRequest, _payload: &mut Payload) -> Self::Future {
|
||||
ready(Ok(()))
|
||||
}
|
||||
fn from_request(_req: &AFPluginEventRequest, _payload: &mut Payload) -> Self::Future {
|
||||
ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
impl FromAFPluginRequest for String {
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<Self, Self::Error>>;
|
||||
type Error = DispatchError;
|
||||
type Future = Ready<Result<Self, Self::Error>>;
|
||||
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
|
||||
match &payload {
|
||||
Payload::None => ready(Err(unexpected_none_payload(req))),
|
||||
Payload::Bytes(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())),
|
||||
}
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
|
||||
match &payload {
|
||||
Payload::None => ready(Err(unexpected_none_payload(req))),
|
||||
Payload::Bytes(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unexpected_none_payload(request: &AFPluginEventRequest) -> DispatchError {
|
||||
log::warn!("{:?} expected payload", &request.event);
|
||||
InternalError::UnexpectedNone("Expected payload".to_string()).into()
|
||||
log::warn!("{:?} expected payload", &request.event);
|
||||
InternalError::UnexpectedNone("Expected payload".to_string()).into()
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
impl<T> FromAFPluginRequest for Result<T, T::Error>
|
||||
where
|
||||
T: FromAFPluginRequest,
|
||||
T: FromAFPluginRequest,
|
||||
{
|
||||
type Error = DispatchError;
|
||||
type Future = FromRequestFuture<T::Future>;
|
||||
type Error = DispatchError;
|
||||
type Future = FromRequestFuture<T::Future>;
|
||||
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
|
||||
FromRequestFuture {
|
||||
fut: T::from_request(req, payload),
|
||||
}
|
||||
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
|
||||
FromRequestFuture {
|
||||
fut: T::from_request(req, payload),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct FromRequestFuture<Fut> {
|
||||
#[pin]
|
||||
fut: Fut,
|
||||
#[pin]
|
||||
fut: Fut,
|
||||
}
|
||||
|
||||
impl<Fut, T, E> Future for FromRequestFuture<Fut>
|
||||
where
|
||||
Fut: Future<Output = Result<T, E>>,
|
||||
Fut: Future<Output = Result<T, E>>,
|
||||
{
|
||||
type Output = Result<Result<T, E>, DispatchError>;
|
||||
type Output = Result<Result<T, E>, DispatchError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let res = ready!(this.fut.poll(cx));
|
||||
Poll::Ready(Ok(res))
|
||||
}
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let res = ready!(this.fut.poll(cx));
|
||||
Poll::Ready(Ok(res))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,42 +1,42 @@
|
|||
use crate::{
|
||||
request::Payload,
|
||||
response::{AFPluginEventResponse, StatusCode},
|
||||
request::Payload,
|
||||
response::{AFPluginEventResponse, StatusCode},
|
||||
};
|
||||
|
||||
macro_rules! static_response {
|
||||
($name:ident, $status:expr) => {
|
||||
#[allow(non_snake_case, missing_docs)]
|
||||
pub fn $name() -> ResponseBuilder {
|
||||
ResponseBuilder::new($status)
|
||||
}
|
||||
};
|
||||
($name:ident, $status:expr) => {
|
||||
#[allow(non_snake_case, missing_docs)]
|
||||
pub fn $name() -> ResponseBuilder {
|
||||
ResponseBuilder::new($status)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub struct ResponseBuilder<T = Payload> {
|
||||
pub payload: T,
|
||||
pub status: StatusCode,
|
||||
pub payload: T,
|
||||
pub status: StatusCode,
|
||||
}
|
||||
|
||||
impl ResponseBuilder {
|
||||
pub fn new(status: StatusCode) -> Self {
|
||||
ResponseBuilder {
|
||||
payload: Payload::None,
|
||||
status,
|
||||
}
|
||||
pub fn new(status: StatusCode) -> Self {
|
||||
ResponseBuilder {
|
||||
payload: Payload::None,
|
||||
status,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn data<D: std::convert::Into<Payload>>(mut self, data: D) -> Self {
|
||||
self.payload = data.into();
|
||||
self
|
||||
pub fn data<D: std::convert::Into<Payload>>(mut self, data: D) -> Self {
|
||||
self.payload = data.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> AFPluginEventResponse {
|
||||
AFPluginEventResponse {
|
||||
payload: self.payload,
|
||||
status_code: self.status,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build(self) -> AFPluginEventResponse {
|
||||
AFPluginEventResponse {
|
||||
payload: self.payload,
|
||||
status_code: self.status,
|
||||
}
|
||||
}
|
||||
|
||||
static_response!(Ok, StatusCode::Ok);
|
||||
static_response!(Err, StatusCode::Err);
|
||||
static_response!(Ok, StatusCode::Ok);
|
||||
static_response!(Err, StatusCode::Err);
|
||||
}
|
||||
|
|
|
@ -1,23 +1,23 @@
|
|||
#[allow(unused_imports)]
|
||||
use crate::errors::{DispatchError, InternalError};
|
||||
use crate::{
|
||||
request::AFPluginEventRequest,
|
||||
response::{AFPluginEventResponse, ResponseBuilder},
|
||||
request::AFPluginEventRequest,
|
||||
response::{AFPluginEventResponse, ResponseBuilder},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
|
||||
pub trait AFPluginResponder {
|
||||
fn respond_to(self, req: &AFPluginEventRequest) -> AFPluginEventResponse;
|
||||
fn respond_to(self, req: &AFPluginEventRequest) -> AFPluginEventResponse;
|
||||
}
|
||||
|
||||
macro_rules! impl_responder {
|
||||
($res: ty) => {
|
||||
impl AFPluginResponder for $res {
|
||||
fn respond_to(self, _: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
ResponseBuilder::Ok().data(self).build()
|
||||
}
|
||||
}
|
||||
};
|
||||
($res: ty) => {
|
||||
impl AFPluginResponder for $res {
|
||||
fn respond_to(self, _: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
ResponseBuilder::Ok().data(self).build()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_responder!(&'static str);
|
||||
|
@ -29,13 +29,13 @@ impl_responder!(Vec<u8>);
|
|||
|
||||
impl<T, E> AFPluginResponder for Result<T, E>
|
||||
where
|
||||
T: AFPluginResponder,
|
||||
E: Into<DispatchError>,
|
||||
T: AFPluginResponder,
|
||||
E: Into<DispatchError>,
|
||||
{
|
||||
fn respond_to(self, request: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
match self {
|
||||
Ok(val) => val.respond_to(request),
|
||||
Err(e) => e.into().into(),
|
||||
}
|
||||
fn respond_to(self, request: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
match self {
|
||||
Ok(val) => val.respond_to(request),
|
||||
Err(e) => e.into().into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use crate::{
|
||||
byte_trait::AFPluginFromBytes,
|
||||
data::AFPluginData,
|
||||
errors::DispatchError,
|
||||
request::{AFPluginEventRequest, Payload},
|
||||
response::AFPluginResponder,
|
||||
byte_trait::AFPluginFromBytes,
|
||||
data::AFPluginData,
|
||||
errors::DispatchError,
|
||||
request::{AFPluginEventRequest, Payload},
|
||||
response::AFPluginResponder,
|
||||
};
|
||||
use derivative::*;
|
||||
use std::{convert::TryFrom, fmt, fmt::Formatter};
|
||||
|
@ -12,70 +12,70 @@ use std::{convert::TryFrom, fmt, fmt::Formatter};
|
|||
#[cfg_attr(feature = "use_serde", derive(serde_repr::Serialize_repr))]
|
||||
#[repr(u8)]
|
||||
pub enum StatusCode {
|
||||
Ok = 0,
|
||||
Err = 1,
|
||||
Ok = 0,
|
||||
Err = 1,
|
||||
}
|
||||
|
||||
// serde user guide: https://serde.rs/field-attrs.html
|
||||
#[derive(Debug, Clone, Derivative)]
|
||||
#[cfg_attr(feature = "use_serde", derive(serde::Serialize))]
|
||||
pub struct AFPluginEventResponse {
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub payload: Payload,
|
||||
pub status_code: StatusCode,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub payload: Payload,
|
||||
pub status_code: StatusCode,
|
||||
}
|
||||
|
||||
impl AFPluginEventResponse {
|
||||
pub fn new(status_code: StatusCode) -> Self {
|
||||
AFPluginEventResponse {
|
||||
payload: Payload::None,
|
||||
status_code,
|
||||
}
|
||||
pub fn new(status_code: StatusCode) -> Self {
|
||||
AFPluginEventResponse {
|
||||
payload: Payload::None,
|
||||
status_code,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse<T, E>(self) -> Result<Result<T, E>, DispatchError>
|
||||
where
|
||||
T: AFPluginFromBytes,
|
||||
E: AFPluginFromBytes,
|
||||
{
|
||||
match self.status_code {
|
||||
StatusCode::Ok => {
|
||||
let data = <AFPluginData<T>>::try_from(self.payload)?;
|
||||
Ok(Ok(data.into_inner()))
|
||||
}
|
||||
StatusCode::Err => {
|
||||
let err = <AFPluginData<E>>::try_from(self.payload)?;
|
||||
Ok(Err(err.into_inner()))
|
||||
}
|
||||
}
|
||||
pub fn parse<T, E>(self) -> Result<Result<T, E>, DispatchError>
|
||||
where
|
||||
T: AFPluginFromBytes,
|
||||
E: AFPluginFromBytes,
|
||||
{
|
||||
match self.status_code {
|
||||
StatusCode::Ok => {
|
||||
let data = <AFPluginData<T>>::try_from(self.payload)?;
|
||||
Ok(Ok(data.into_inner()))
|
||||
},
|
||||
StatusCode::Err => {
|
||||
let err = <AFPluginData<E>>::try_from(self.payload)?;
|
||||
Ok(Err(err.into_inner()))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for AFPluginEventResponse {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
f.write_fmt(format_args!("Status_Code: {:?}", self.status_code))?;
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
f.write_fmt(format_args!("Status_Code: {:?}", self.status_code))?;
|
||||
|
||||
match &self.payload {
|
||||
Payload::Bytes(b) => f.write_fmt(format_args!("Data: {} bytes", b.len()))?,
|
||||
Payload::None => f.write_fmt(format_args!("Data: Empty"))?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
match &self.payload {
|
||||
Payload::Bytes(b) => f.write_fmt(format_args!("Data: {} bytes", b.len()))?,
|
||||
Payload::None => f.write_fmt(format_args!("Data: Empty"))?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl AFPluginResponder for AFPluginEventResponse {
|
||||
#[inline]
|
||||
fn respond_to(self, _: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
self
|
||||
}
|
||||
#[inline]
|
||||
fn respond_to(self, _: &AFPluginEventRequest) -> AFPluginEventResponse {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub type DataResult<T, E> = std::result::Result<AFPluginData<T>, E>;
|
||||
|
||||
pub fn data_result<T, E>(data: T) -> Result<AFPluginData<T>, E>
|
||||
where
|
||||
E: Into<DispatchError>,
|
||||
E: Into<DispatchError>,
|
||||
{
|
||||
Ok(AFPluginData(data))
|
||||
Ok(AFPluginData(data))
|
||||
}
|
||||
|
|
|
@ -4,23 +4,23 @@ use tokio::runtime;
|
|||
pub type AFPluginRuntime = tokio::runtime::Runtime;
|
||||
|
||||
pub fn tokio_default_runtime() -> io::Result<AFPluginRuntime> {
|
||||
runtime::Builder::new_multi_thread()
|
||||
.thread_name("dispatch-rt")
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.on_thread_start(move || {
|
||||
tracing::trace!(
|
||||
"{:?} thread started: thread_id= {}",
|
||||
thread::current(),
|
||||
thread_id::get()
|
||||
);
|
||||
})
|
||||
.on_thread_stop(move || {
|
||||
tracing::trace!(
|
||||
"{:?} thread stopping: thread_id= {}",
|
||||
thread::current(),
|
||||
thread_id::get(),
|
||||
);
|
||||
})
|
||||
.build()
|
||||
runtime::Builder::new_multi_thread()
|
||||
.thread_name("dispatch-rt")
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.on_thread_start(move || {
|
||||
tracing::trace!(
|
||||
"{:?} thread started: thread_id= {}",
|
||||
thread::current(),
|
||||
thread_id::get()
|
||||
);
|
||||
})
|
||||
.on_thread_stop(move || {
|
||||
tracing::trace!(
|
||||
"{:?} thread stopping: thread_id= {}",
|
||||
thread::current(),
|
||||
thread_id::get(),
|
||||
);
|
||||
})
|
||||
.build()
|
||||
}
|
||||
|
|
|
@ -3,51 +3,54 @@ use futures_core::future::BoxFuture;
|
|||
|
||||
pub fn factory<SF, Req>(factory: SF) -> BoxServiceFactory<SF::Context, Req, SF::Response, SF::Error>
|
||||
where
|
||||
SF: AFPluginServiceFactory<Req> + 'static + Sync + Send,
|
||||
Req: 'static,
|
||||
SF::Response: 'static,
|
||||
SF::Service: 'static,
|
||||
SF::Future: 'static,
|
||||
SF::Error: 'static + Send + Sync,
|
||||
<SF as AFPluginServiceFactory<Req>>::Service: Sync + Send,
|
||||
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync,
|
||||
<SF as AFPluginServiceFactory<Req>>::Future: Send + Sync,
|
||||
SF: AFPluginServiceFactory<Req> + 'static + Sync + Send,
|
||||
Req: 'static,
|
||||
SF::Response: 'static,
|
||||
SF::Service: 'static,
|
||||
SF::Future: 'static,
|
||||
SF::Error: 'static + Send + Sync,
|
||||
<SF as AFPluginServiceFactory<Req>>::Service: Sync + Send,
|
||||
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync,
|
||||
<SF as AFPluginServiceFactory<Req>>::Future: Send + Sync,
|
||||
{
|
||||
BoxServiceFactory(Box::new(FactoryWrapper(factory)))
|
||||
BoxServiceFactory(Box::new(FactoryWrapper(factory)))
|
||||
}
|
||||
|
||||
type Inner<Cfg, Req, Res, Err> = Box<
|
||||
dyn AFPluginServiceFactory<
|
||||
Req,
|
||||
Context = Cfg,
|
||||
Response = Res,
|
||||
Error = Err,
|
||||
Service = BoxService<Req, Res, Err>,
|
||||
Future = BoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
|
||||
> + Sync
|
||||
+ Send,
|
||||
dyn AFPluginServiceFactory<
|
||||
Req,
|
||||
Context = Cfg,
|
||||
Response = Res,
|
||||
Error = Err,
|
||||
Service = BoxService<Req, Res, Err>,
|
||||
Future = BoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
|
||||
> + Sync
|
||||
+ Send,
|
||||
>;
|
||||
|
||||
pub struct BoxServiceFactory<Cfg, Req, Res, Err>(Inner<Cfg, Req, Res, Err>);
|
||||
impl<Cfg, Req, Res, Err> AFPluginServiceFactory<Req> for BoxServiceFactory<Cfg, Req, Res, Err>
|
||||
where
|
||||
Req: 'static,
|
||||
Res: 'static,
|
||||
Err: 'static,
|
||||
Req: 'static,
|
||||
Res: 'static,
|
||||
Err: 'static,
|
||||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Service = BoxService<Req, Res, Err>;
|
||||
type Context = Cfg;
|
||||
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Service = BoxService<Req, Res, Err>;
|
||||
type Context = Cfg;
|
||||
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
|
||||
|
||||
fn new_service(&self, cfg: Cfg) -> Self::Future {
|
||||
self.0.new_service(cfg)
|
||||
}
|
||||
fn new_service(&self, cfg: Cfg) -> Self::Future {
|
||||
self.0.new_service(cfg)
|
||||
}
|
||||
}
|
||||
|
||||
pub type BoxService<Req, Res, Err> =
|
||||
Box<dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<'static, Result<Res, Err>>> + Sync + Send>;
|
||||
pub type BoxService<Req, Res, Err> = Box<
|
||||
dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<'static, Result<Res, Err>>>
|
||||
+ Sync
|
||||
+ Send,
|
||||
>;
|
||||
|
||||
// #[allow(dead_code)]
|
||||
// pub fn service<S, Req>(service: S) -> BoxService<Req, S::Response, S::Error>
|
||||
|
@ -61,62 +64,65 @@ pub type BoxService<Req, Res, Err> =
|
|||
|
||||
impl<S, Req> Service<Req> for Box<S>
|
||||
where
|
||||
S: Service<Req> + ?Sized,
|
||||
S: Service<Req> + ?Sized,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
(**self).call(request)
|
||||
}
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
(**self).call(request)
|
||||
}
|
||||
}
|
||||
|
||||
struct ServiceWrapper<S> {
|
||||
inner: S,
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> ServiceWrapper<S> {
|
||||
fn new(inner: S) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
fn new(inner: S) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Req, Res, Err> Service<Req> for ServiceWrapper<S>
|
||||
where
|
||||
S: Service<Req, Response = Res, Error = Err>,
|
||||
S::Future: 'static + Send + Sync,
|
||||
S: Service<Req, Response = Res, Error = Err>,
|
||||
S::Future: 'static + Send + Sync,
|
||||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Future = BoxFuture<'static, Result<Res, Err>>;
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Future = BoxFuture<'static, Result<Res, Err>>;
|
||||
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
Box::pin(self.inner.call(req))
|
||||
}
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
Box::pin(self.inner.call(req))
|
||||
}
|
||||
}
|
||||
|
||||
struct FactoryWrapper<SF>(SF);
|
||||
|
||||
impl<SF, Req, Cfg, Res, Err> AFPluginServiceFactory<Req> for FactoryWrapper<SF>
|
||||
where
|
||||
Req: 'static,
|
||||
Res: 'static,
|
||||
Err: 'static,
|
||||
SF: AFPluginServiceFactory<Req, Context = Cfg, Response = Res, Error = Err>,
|
||||
SF::Future: 'static,
|
||||
SF::Service: 'static + Send + Sync,
|
||||
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync + 'static,
|
||||
<SF as AFPluginServiceFactory<Req>>::Future: Send + Sync,
|
||||
Req: 'static,
|
||||
Res: 'static,
|
||||
Err: 'static,
|
||||
SF: AFPluginServiceFactory<Req, Context = Cfg, Response = Res, Error = Err>,
|
||||
SF::Future: 'static,
|
||||
SF::Service: 'static + Send + Sync,
|
||||
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync + 'static,
|
||||
<SF as AFPluginServiceFactory<Req>>::Future: Send + Sync,
|
||||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Service = BoxService<Req, Res, Err>;
|
||||
type Context = Cfg;
|
||||
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Service = BoxService<Req, Res, Err>;
|
||||
type Context = Cfg;
|
||||
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
|
||||
|
||||
fn new_service(&self, cfg: Cfg) -> Self::Future {
|
||||
let f = self.0.new_service(cfg);
|
||||
Box::pin(async { f.await.map(|s| Box::new(ServiceWrapper::new(s)) as Self::Service) })
|
||||
}
|
||||
fn new_service(&self, cfg: Cfg) -> Self::Future {
|
||||
let f = self.0.new_service(cfg);
|
||||
Box::pin(async {
|
||||
f.await
|
||||
.map(|s| Box::new(ServiceWrapper::new(s)) as Self::Service)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,155 +1,155 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures_core::ready;
|
||||
use pin_project::pin_project;
|
||||
|
||||
use crate::{
|
||||
errors::DispatchError,
|
||||
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
|
||||
response::{AFPluginEventResponse, AFPluginResponder},
|
||||
service::{AFPluginServiceFactory, Service, ServiceRequest, ServiceResponse},
|
||||
util::ready::*,
|
||||
errors::DispatchError,
|
||||
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
|
||||
response::{AFPluginEventResponse, AFPluginResponder},
|
||||
service::{AFPluginServiceFactory, Service, ServiceRequest, ServiceResponse},
|
||||
util::ready::*,
|
||||
};
|
||||
|
||||
/// A closure that is run every time for the specified plugin event
|
||||
pub trait AFPluginHandler<T, R>: Clone + 'static + Sync + Send
|
||||
where
|
||||
R: Future + Send + Sync,
|
||||
R::Output: AFPluginResponder,
|
||||
R: Future + Send + Sync,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
fn call(&self, param: T) -> R;
|
||||
fn call(&self, param: T) -> R;
|
||||
}
|
||||
|
||||
pub struct AFPluginHandlerService<H, T, R>
|
||||
where
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
handler: H,
|
||||
_phantom: PhantomData<(T, R)>,
|
||||
handler: H,
|
||||
_phantom: PhantomData<(T, R)>,
|
||||
}
|
||||
|
||||
impl<H, T, R> AFPluginHandlerService<H, T, R>
|
||||
where
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
pub fn new(handler: H) -> Self {
|
||||
Self {
|
||||
handler,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
pub fn new(handler: H) -> Self {
|
||||
Self {
|
||||
handler,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<H, T, R> Clone for AFPluginHandlerService<H, T, R>
|
||||
where
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
handler: self.handler.clone(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
handler: self.handler.clone(),
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, T, R> AFPluginServiceFactory<ServiceRequest> for AFPluginHandlerService<F, T, R>
|
||||
where
|
||||
F: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Send + Sync,
|
||||
R::Output: AFPluginResponder,
|
||||
F: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Send + Sync,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
type Response = ServiceResponse;
|
||||
type Error = DispatchError;
|
||||
type Service = Self;
|
||||
type Context = ();
|
||||
type Future = Ready<Result<Self::Service, Self::Error>>;
|
||||
type Response = ServiceResponse;
|
||||
type Error = DispatchError;
|
||||
type Service = Self;
|
||||
type Context = ();
|
||||
type Future = Ready<Result<Self::Service, Self::Error>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ready(Ok(self.clone()))
|
||||
}
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ready(Ok(self.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<H, T, R> Service<ServiceRequest> for AFPluginHandlerService<H, T, R>
|
||||
where
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
type Response = ServiceResponse;
|
||||
type Error = DispatchError;
|
||||
type Future = HandlerServiceFuture<H, T, R>;
|
||||
type Response = ServiceResponse;
|
||||
type Error = DispatchError;
|
||||
type Future = HandlerServiceFuture<H, T, R>;
|
||||
|
||||
fn call(&self, req: ServiceRequest) -> Self::Future {
|
||||
let (req, mut payload) = req.into_parts();
|
||||
let fut = T::from_request(&req, &mut payload);
|
||||
HandlerServiceFuture::Extract(fut, Some(req), self.handler.clone())
|
||||
}
|
||||
fn call(&self, req: ServiceRequest) -> Self::Future {
|
||||
let (req, mut payload) = req.into_parts();
|
||||
let fut = T::from_request(&req, &mut payload);
|
||||
HandlerServiceFuture::Extract(fut, Some(req), self.handler.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project(project = HandlerServiceProj)]
|
||||
pub enum HandlerServiceFuture<H, T, R>
|
||||
where
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
H: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
Extract(#[pin] T::Future, Option<AFPluginEventRequest>, H),
|
||||
Handle(#[pin] R, Option<AFPluginEventRequest>),
|
||||
Extract(#[pin] T::Future, Option<AFPluginEventRequest>, H),
|
||||
Handle(#[pin] R, Option<AFPluginEventRequest>),
|
||||
}
|
||||
|
||||
impl<F, T, R> Future for HandlerServiceFuture<F, T, R>
|
||||
where
|
||||
F: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
F: AFPluginHandler<T, R>,
|
||||
T: FromAFPluginRequest,
|
||||
R: Future + Sync + Send,
|
||||
R::Output: AFPluginResponder,
|
||||
{
|
||||
type Output = Result<ServiceResponse, DispatchError>;
|
||||
type Output = Result<ServiceResponse, DispatchError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
match self.as_mut().project() {
|
||||
HandlerServiceProj::Extract(fut, req, handle) => {
|
||||
match ready!(fut.poll(cx)) {
|
||||
Ok(params) => {
|
||||
let fut = handle.call(params);
|
||||
let state = HandlerServiceFuture::Handle(fut, req.take());
|
||||
self.as_mut().set(state);
|
||||
}
|
||||
Err(err) => {
|
||||
let req = req.take().unwrap();
|
||||
let system_err: DispatchError = err.into();
|
||||
let res: AFPluginEventResponse = system_err.into();
|
||||
return Poll::Ready(Ok(ServiceResponse::new(req, res)));
|
||||
}
|
||||
};
|
||||
}
|
||||
HandlerServiceProj::Handle(fut, req) => {
|
||||
let result = ready!(fut.poll(cx));
|
||||
let req = req.take().unwrap();
|
||||
let resp = result.respond_to(&req);
|
||||
return Poll::Ready(Ok(ServiceResponse::new(req, resp)));
|
||||
}
|
||||
}
|
||||
}
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
match self.as_mut().project() {
|
||||
HandlerServiceProj::Extract(fut, req, handle) => {
|
||||
match ready!(fut.poll(cx)) {
|
||||
Ok(params) => {
|
||||
let fut = handle.call(params);
|
||||
let state = HandlerServiceFuture::Handle(fut, req.take());
|
||||
self.as_mut().set(state);
|
||||
},
|
||||
Err(err) => {
|
||||
let req = req.take().unwrap();
|
||||
let system_err: DispatchError = err.into();
|
||||
let res: AFPluginEventResponse = system_err.into();
|
||||
return Poll::Ready(Ok(ServiceResponse::new(req, res)));
|
||||
},
|
||||
};
|
||||
},
|
||||
HandlerServiceProj::Handle(fut, req) => {
|
||||
let result = ready!(fut.poll(cx));
|
||||
let req = req.take().unwrap();
|
||||
let resp = result.respond_to(&req);
|
||||
return Poll::Ready(Ok(ServiceResponse::new(req, resp)));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! factory_tuple ({ $($param:ident)* } => {
|
||||
|
|
|
@ -1,57 +1,60 @@
|
|||
use std::future::Future;
|
||||
|
||||
use crate::{
|
||||
request::{payload::Payload, AFPluginEventRequest},
|
||||
response::AFPluginEventResponse,
|
||||
request::{payload::Payload, AFPluginEventRequest},
|
||||
response::AFPluginEventResponse,
|
||||
};
|
||||
|
||||
pub trait Service<Request> {
|
||||
type Response;
|
||||
type Error;
|
||||
type Future: Future<Output = Result<Self::Response, Self::Error>>;
|
||||
type Response;
|
||||
type Error;
|
||||
type Future: Future<Output = Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn call(&self, req: Request) -> Self::Future;
|
||||
fn call(&self, req: Request) -> Self::Future;
|
||||
}
|
||||
|
||||
/// Returns a future that can handle the request. For the moment, the request will be the
|
||||
/// `AFPluginRequest`
|
||||
pub trait AFPluginServiceFactory<Request> {
|
||||
type Response;
|
||||
type Error;
|
||||
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
|
||||
type Context;
|
||||
type Future: Future<Output = Result<Self::Service, Self::Error>>;
|
||||
type Response;
|
||||
type Error;
|
||||
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
|
||||
type Context;
|
||||
type Future: Future<Output = Result<Self::Service, Self::Error>>;
|
||||
|
||||
fn new_service(&self, cfg: Self::Context) -> Self::Future;
|
||||
fn new_service(&self, cfg: Self::Context) -> Self::Future;
|
||||
}
|
||||
|
||||
pub(crate) struct ServiceRequest {
|
||||
event_state: AFPluginEventRequest,
|
||||
payload: Payload,
|
||||
event_state: AFPluginEventRequest,
|
||||
payload: Payload,
|
||||
}
|
||||
|
||||
impl ServiceRequest {
|
||||
pub(crate) fn new(event_state: AFPluginEventRequest, payload: Payload) -> Self {
|
||||
Self { event_state, payload }
|
||||
pub(crate) fn new(event_state: AFPluginEventRequest, payload: Payload) -> Self {
|
||||
Self {
|
||||
event_state,
|
||||
payload,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn into_parts(self) -> (AFPluginEventRequest, Payload) {
|
||||
(self.event_state, self.payload)
|
||||
}
|
||||
#[inline]
|
||||
pub(crate) fn into_parts(self) -> (AFPluginEventRequest, Payload) {
|
||||
(self.event_state, self.payload)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ServiceResponse {
|
||||
request: AFPluginEventRequest,
|
||||
response: AFPluginEventResponse,
|
||||
request: AFPluginEventRequest,
|
||||
response: AFPluginEventResponse,
|
||||
}
|
||||
|
||||
impl ServiceResponse {
|
||||
pub fn new(request: AFPluginEventRequest, response: AFPluginEventResponse) -> Self {
|
||||
ServiceResponse { request, response }
|
||||
}
|
||||
pub fn new(request: AFPluginEventRequest, response: AFPluginEventResponse) -> Self {
|
||||
ServiceResponse { request, response }
|
||||
}
|
||||
|
||||
pub fn into_parts(self) -> (AFPluginEventRequest, AFPluginEventResponse) {
|
||||
(self.request, self.response)
|
||||
}
|
||||
pub fn into_parts(self) -> (AFPluginEventRequest, AFPluginEventResponse) {
|
||||
(self.request, self.response)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,32 +1,32 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub struct Ready<T> {
|
||||
val: Option<T>,
|
||||
val: Option<T>,
|
||||
}
|
||||
|
||||
impl<T> Ready<T> {
|
||||
#[inline]
|
||||
pub fn into_inner(mut self) -> T {
|
||||
self.val.take().unwrap()
|
||||
}
|
||||
#[inline]
|
||||
pub fn into_inner(mut self) -> T {
|
||||
self.val.take().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Unpin for Ready<T> {}
|
||||
|
||||
impl<T> Future for Ready<T> {
|
||||
type Output = T;
|
||||
type Output = T;
|
||||
|
||||
#[inline]
|
||||
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
|
||||
let val = self.val.take().expect("Ready polled after completion");
|
||||
Poll::Ready(val)
|
||||
}
|
||||
#[inline]
|
||||
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
|
||||
let val = self.val.take().expect("Ready polled after completion");
|
||||
Poll::Ready(val)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ready<T>(val: T) -> Ready<T> {
|
||||
Ready { val: Some(val) }
|
||||
Ready { val: Some(val) }
|
||||
}
|
||||
|
|
|
@ -3,23 +3,23 @@ use lib_dispatch::runtime::tokio_default_runtime;
|
|||
use std::sync::Arc;
|
||||
|
||||
pub async fn hello() -> String {
|
||||
"say hello".to_string()
|
||||
"say hello".to_string()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test() {
|
||||
let event = "1";
|
||||
let runtime = tokio_default_runtime().unwrap();
|
||||
let dispatch = Arc::new(AFPluginDispatcher::construct(runtime, || {
|
||||
vec![AFPlugin::new().event(event, hello)]
|
||||
}));
|
||||
let request = AFPluginRequest::new(event);
|
||||
let _ = AFPluginDispatcher::async_send_with_callback(dispatch.clone(), request, |resp| {
|
||||
Box::pin(async move {
|
||||
dbg!(&resp);
|
||||
})
|
||||
let event = "1";
|
||||
let runtime = tokio_default_runtime().unwrap();
|
||||
let dispatch = Arc::new(AFPluginDispatcher::construct(runtime, || {
|
||||
vec![AFPlugin::new().event(event, hello)]
|
||||
}));
|
||||
let request = AFPluginRequest::new(event);
|
||||
let _ = AFPluginDispatcher::async_send_with_callback(dispatch.clone(), request, |resp| {
|
||||
Box::pin(async move {
|
||||
dbg!(&resp);
|
||||
})
|
||||
.await;
|
||||
})
|
||||
.await;
|
||||
|
||||
std::mem::forget(dispatch);
|
||||
std::mem::forget(dispatch);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue