config user login

This commit is contained in:
appflowy 2021-08-20 21:09:21 +08:00
parent bda12233a1
commit e709e6bbbc
17 changed files with 175 additions and 52 deletions

View file

@ -1,15 +1,15 @@
use crate::config::MAX_PAYLOAD_SIZE; use crate::config::MAX_PAYLOAD_SIZE;
use actix_web::web; use actix_web::web;
use flowy_net::{errors::ServerError, response::*}; use flowy_net::{errors::NetworkError, response::*};
use futures::StreamExt; use futures::StreamExt;
use protobuf::{Message, ProtobufResult}; use protobuf::{Message, ProtobufResult};
pub async fn parse_from_payload<T: Message>(payload: web::Payload) -> Result<T, ServerError> { pub async fn parse_from_payload<T: Message>(payload: web::Payload) -> Result<T, NetworkError> {
let bytes = poll_payload(payload).await?; let bytes = poll_payload(payload).await?;
parse_from_bytes(&bytes) parse_from_bytes(&bytes)
} }
pub fn parse_from_bytes<T: Message>(bytes: &[u8]) -> Result<T, ServerError> { pub fn parse_from_bytes<T: Message>(bytes: &[u8]) -> Result<T, NetworkError> {
let result: ProtobufResult<T> = Message::parse_from_bytes(&bytes); let result: ProtobufResult<T> = Message::parse_from_bytes(&bytes);
match result { match result {
Ok(data) => Ok(data), Ok(data) => Ok(data),
@ -17,13 +17,13 @@ pub fn parse_from_bytes<T: Message>(bytes: &[u8]) -> Result<T, ServerError> {
} }
} }
pub async fn poll_payload(mut payload: web::Payload) -> Result<web::BytesMut, ServerError> { pub async fn poll_payload(mut payload: web::Payload) -> Result<web::BytesMut, NetworkError> {
let mut body = web::BytesMut::new(); let mut body = web::BytesMut::new();
while let Some(chunk) = payload.next().await { while let Some(chunk) = payload.next().await {
let chunk = chunk.map_err(|e| ServerError::InternalError(format!("{:?}", e)))?; let chunk = chunk.map_err(|e| NetworkError::InternalError(format!("{:?}", e)))?;
if (body.len() + chunk.len()) > MAX_PAYLOAD_SIZE { if (body.len() + chunk.len()) > MAX_PAYLOAD_SIZE {
let resp = ServerResponse::from_msg("Payload overflow", ServerCode::PayloadOverflow); let resp = FlowyResponse::from_msg("Payload overflow", ServerCode::PayloadOverflow);
return Err(ServerError::BadRequest(resp)); return Err(NetworkError::BadRequest(resp));
} }
body.extend_from_slice(&chunk); body.extend_from_slice(&chunk);
} }

View file

@ -1,5 +1,5 @@
mod helper; mod helper;
mod user; pub(crate) mod user;
pub(crate) mod ws; pub(crate) mod ws;
pub use user::*; pub use user::*;

View file

@ -10,7 +10,7 @@ use flowy_user::protobuf::SignUpParams;
use std::sync::Arc; use std::sync::Arc;
pub async fn user_register( pub async fn register(
_request: HttpRequest, _request: HttpRequest,
payload: Payload, payload: Payload,
auth: Data<Arc<Auth>>, auth: Data<Arc<Auth>>,
@ -18,7 +18,7 @@ pub async fn user_register(
let params: SignUpParams = parse_from_payload(payload).await?; let params: SignUpParams = parse_from_payload(payload).await?;
let _ = auth.sign_up(params)?; let _ = auth.sign_up(params)?;
let resp = ServerResponse::success(); let resp = FlowyResponse::success();
Ok(resp.into()) Ok(resp.into())
} }

View file

@ -16,6 +16,7 @@ pub fn run(app_ctx: Arc<AppContext>, listener: TcpListener) -> Result<Server, st
.wrap(middleware::Logger::default()) .wrap(middleware::Logger::default())
.data(web::JsonConfig::default().limit(4096)) .data(web::JsonConfig::default().limit(4096))
.service(ws_scope()) .service(ws_scope())
.service(user_scope())
.data(app_ctx.ws_server.clone()) .data(app_ctx.ws_server.clone())
.data(app_ctx.db_pool.clone()) .data(app_ctx.db_pool.clone())
.data(app_ctx.auth.clone()) .data(app_ctx.auth.clone())
@ -27,6 +28,10 @@ pub fn run(app_ctx: Arc<AppContext>, listener: TcpListener) -> Result<Server, st
fn ws_scope() -> Scope { web::scope("/ws").service(ws::start_connection) } fn ws_scope() -> Scope { web::scope("/ws").service(ws::start_connection) }
fn user_scope() -> Scope {
web::scope("/user").service(web::resource("/register").route(web::post().to(user::register)))
}
pub async fn init_app_context() -> Arc<AppContext> { pub async fn init_app_context() -> Arc<AppContext> {
let _ = flowy_log::Builder::new("flowy").env_filter("Debug").build(); let _ = flowy_log::Builder::new("flowy").env_filter("Debug").build();
let config = Arc::new(Config::new()); let config = Arc::new(Config::new());

View file

@ -1,4 +1,4 @@
use flowy_net::errors::ServerError; use flowy_net::errors::NetworkError;
use flowy_user::protobuf::SignUpParams; use flowy_user::protobuf::SignUpParams;
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
@ -10,5 +10,5 @@ pub struct Auth {
impl Auth { impl Auth {
pub fn new(db_pool: Arc<PgPool>) -> Self { Self { db_pool } } pub fn new(db_pool: Arc<PgPool>) -> Self { Self { db_pool } }
pub fn sign_up(&self, params: SignUpParams) -> Result<(), ServerError> { Ok(()) } pub fn sign_up(&self, params: SignUpParams) -> Result<(), NetworkError> { Ok(()) }
} }

View file

@ -1,6 +1,6 @@
use crate::ws_service::ClientMessage; use crate::ws_service::ClientMessage;
use actix::{Message, Recipient}; use actix::{Message, Recipient};
use flowy_net::errors::ServerError; use flowy_net::errors::NetworkError;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::Formatter; use std::fmt::Formatter;
@ -37,14 +37,14 @@ impl std::fmt::Display for SessionId {
} }
#[derive(Debug, Message, Clone)] #[derive(Debug, Message, Clone)]
#[rtype(result = "Result<(), ServerError>")] #[rtype(result = "Result<(), NetworkError>")]
pub struct Connect { pub struct Connect {
pub socket: Socket, pub socket: Socket,
pub sid: SessionId, pub sid: SessionId,
} }
#[derive(Debug, Message, Clone)] #[derive(Debug, Message, Clone)]
#[rtype(result = "Result<(), ServerError>")] #[rtype(result = "Result<(), NetworkError>")]
pub struct Disconnect { pub struct Disconnect {
pub sid: SessionId, pub sid: SessionId,
} }

View file

@ -4,7 +4,7 @@ use crate::ws_service::{
}; };
use actix::{Actor, Context, Handler}; use actix::{Actor, Context, Handler};
use dashmap::DashMap; use dashmap::DashMap;
use flowy_net::errors::ServerError; use flowy_net::errors::NetworkError;
pub struct WSServer { pub struct WSServer {
sessions: DashMap<SessionId, Session>, sessions: DashMap<SessionId, Session>,
@ -26,7 +26,7 @@ impl Actor for WSServer {
} }
impl Handler<Connect> for WSServer { impl Handler<Connect> for WSServer {
type Result = Result<(), ServerError>; type Result = Result<(), NetworkError>;
fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
let session: Session = msg.into(); let session: Session = msg.into();
self.sessions.insert(session.id.clone(), session); self.sessions.insert(session.id.clone(), session);
@ -36,7 +36,7 @@ impl Handler<Connect> for WSServer {
} }
impl Handler<Disconnect> for WSServer { impl Handler<Disconnect> for WSServer {
type Result = Result<(), ServerError>; type Result = Result<(), NetworkError>;
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
self.sessions.remove(&msg.sid); self.sessions.remove(&msg.sid);
Ok(()) Ok(())

View file

@ -14,6 +14,9 @@ serde_repr = "0.1"
actix-web = {version = "3", optional = true} actix-web = {version = "3", optional = true}
pin-project = "1.0.0" pin-project = "1.0.0"
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
log = "0.4"
bytes = "1.0"
lazy_static = "1.4.0"
[features] [features]
http = ["actix-web"] http = ["actix-web"]

View file

@ -0,0 +1,7 @@
use lazy_static::lazy_static;
pub const HOST: &'static str = "0.0.0.0:3030";
lazy_static! {
pub static ref SIGN_UP_URL: String = format!("{}/user/register", HOST);
}

View file

@ -1,30 +1,38 @@
use crate::response::ServerResponse; use crate::response::FlowyResponse;
use protobuf::ProtobufError; use protobuf::ProtobufError;
use std::fmt::{Formatter, Write}; use std::fmt::{Formatter, Write};
#[derive(Debug)] #[derive(Debug)]
pub enum ServerError { pub enum NetworkError {
InternalError(String), InternalError(String),
BadRequest(ServerResponse<String>), BadRequest(FlowyResponse<String>),
Unauthorized, Unauthorized,
} }
impl std::fmt::Display for ServerError { impl std::fmt::Display for NetworkError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self { match self {
ServerError::InternalError(_) => f.write_str("Internal Server Error"), NetworkError::InternalError(_) => f.write_str("Internal Server Error"),
ServerError::BadRequest(request) => { NetworkError::BadRequest(request) => {
let msg = format!("Bad Request: {:?}", request); let msg = format!("Bad Request: {:?}", request);
f.write_str(&msg) f.write_str(&msg)
}, },
ServerError::Unauthorized => f.write_str("Unauthorized"), NetworkError::Unauthorized => f.write_str("Unauthorized"),
} }
} }
} }
impl std::convert::From<ProtobufError> for ServerError { impl std::convert::From<ProtobufError> for NetworkError {
fn from(err: ProtobufError) -> Self { fn from(err: ProtobufError) -> Self {
let msg = format!("{:?}", err); let msg = format!("{:?}", err);
ServerError::InternalError(msg) NetworkError::InternalError(msg)
}
}
impl std::convert::From<reqwest::Error> for NetworkError {
fn from(error: reqwest::Error) -> Self {
let msg = format!("{:?}", error);
NetworkError::InternalError(msg)
} }
} }

View file

@ -1,4 +1,6 @@
pub mod errors; pub mod errors;
pub mod future; pub mod future;
pub mod config;
pub mod request;
pub mod response; pub mod response;

View file

@ -0,0 +1,3 @@
mod request;
pub use request::*;

View file

@ -0,0 +1,86 @@
use crate::errors::NetworkError;
use bytes::Bytes;
use protobuf::Message;
use reqwest::{Client, Response};
use std::{convert::TryFrom, time::Duration};
pub struct FlowyRequest {
client: Client,
}
impl FlowyRequest {
pub fn new() -> Self {
let client = default_client();
Self { client }
}
pub async fn get<T>(&self, url: &str) -> Result<T, NetworkError>
where
T: Message,
{
let url = url.to_owned();
let response = self.client.get(&url).send().await?;
parse_response(response).await
}
pub async fn post<T>(&self, url: &str, data: T) -> Result<T, NetworkError>
where
T: Message,
{
let url = url.to_owned();
let body = data.write_to_bytes()?;
let response = self.client.post(&url).body(body).send().await?;
parse_response(response).await
}
pub async fn post_data<T>(&self, url: &str, bytes: Vec<u8>) -> Result<T, NetworkError>
where
T: for<'a> TryFrom<&'a Vec<u8>>,
{
let url = url.to_owned();
let response = self.client.post(&url).body(bytes).send().await?;
let bytes = response.bytes().await?.to_vec();
let data = T::try_from(&bytes).map_err(|_e| panic!("")).unwrap();
Ok(data)
}
}
async fn parse_response<T>(response: Response) -> Result<T, NetworkError>
where
T: Message,
{
let bytes = response.bytes().await?;
parse_bytes(bytes)
}
fn parse_bytes<T>(bytes: Bytes) -> Result<T, NetworkError>
where
T: Message,
{
match Message::parse_from_bytes(&bytes) {
Ok(data) => Ok(data),
Err(e) => {
log::error!(
"Parse bytes for {:?} failed: {}",
std::any::type_name::<T>(),
e
);
Err(e.into())
},
}
}
fn default_client() -> Client {
let result = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(500))
.timeout(Duration::from_secs(5))
.build();
match result {
Ok(client) => client,
Err(e) => {
log::error!("Create reqwest client failed: {}", e);
reqwest::Client::new()
},
}
}

View file

@ -14,15 +14,15 @@ pub enum ServerCode {
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct ServerResponse<T> { pub struct FlowyResponse<T> {
pub msg: String, pub msg: String,
pub data: Option<T>, pub data: Option<T>,
pub code: ServerCode, pub code: ServerCode,
} }
impl<T: Serialize> ServerResponse<T> { impl<T: Serialize> FlowyResponse<T> {
pub fn new(data: Option<T>, msg: &str, code: ServerCode) -> Self { pub fn new(data: Option<T>, msg: &str, code: ServerCode) -> Self {
ServerResponse { FlowyResponse {
msg: msg.to_owned(), msg: msg.to_owned(),
data, data,
code, code,
@ -34,7 +34,7 @@ impl<T: Serialize> ServerResponse<T> {
} }
} }
impl ServerResponse<String> { impl FlowyResponse<String> {
pub fn success() -> Self { Self::from_msg("", ServerCode::Success) } pub fn success() -> Self { Self::from_msg("", ServerCode::Success) }
pub fn from_msg(msg: &str, code: ServerCode) -> Self { pub fn from_msg(msg: &str, code: ServerCode) -> Self {

View file

@ -1,30 +1,30 @@
use crate::{errors::ServerError, response::*}; use crate::{errors::NetworkError, response::*};
use actix_web::{body::Body, error::ResponseError, HttpResponse}; use actix_web::{body::Body, error::ResponseError, HttpResponse};
use serde::Serialize; use serde::Serialize;
impl ResponseError for ServerError { impl ResponseError for NetworkError {
fn error_response(&self) -> HttpResponse { fn error_response(&self) -> HttpResponse {
match self { match self {
ServerError::InternalError(msg) => { NetworkError::InternalError(msg) => {
let resp = ServerResponse::from_msg(&msg, ServerCode::InternalError); let resp = FlowyResponse::from_msg(&msg, ServerCode::InternalError);
HttpResponse::InternalServerError().json(resp) HttpResponse::InternalServerError().json(resp)
}, },
ServerError::BadRequest(ref resp) => HttpResponse::BadRequest().json(resp), NetworkError::BadRequest(ref resp) => HttpResponse::BadRequest().json(resp),
ServerError::Unauthorized => { NetworkError::Unauthorized => {
let resp = ServerResponse::from_msg("Unauthorized", ServerCode::Unauthorized); let resp = FlowyResponse::from_msg("Unauthorized", ServerCode::Unauthorized);
HttpResponse::Unauthorized().json(resp) HttpResponse::Unauthorized().json(resp)
}, },
} }
} }
} }
impl<T: Serialize> std::convert::Into<HttpResponse> for ServerResponse<T> { impl<T: Serialize> std::convert::Into<HttpResponse> for FlowyResponse<T> {
fn into(self) -> HttpResponse { fn into(self) -> HttpResponse {
match serde_json::to_string(&self) { match serde_json::to_string(&self) {
Ok(body) => HttpResponse::Ok().body(Body::from(body)), Ok(body) => HttpResponse::Ok().body(Body::from(body)),
Err(e) => { Err(e) => {
let msg = format!("Serial error: {:?}", e); let msg = format!("Serial error: {:?}", e);
ServerError::InternalError(msg).error_response() NetworkError::InternalError(msg).error_response()
}, },
} }
} }

View file

@ -1,4 +1,4 @@
use crate::response::{ServerCode, ServerResponse}; use crate::response::{FlowyResponse, ServerCode};
use serde::{ use serde::{
de::{self, MapAccess, Visitor}, de::{self, MapAccess, Visitor},
Deserialize, Deserialize,
@ -8,7 +8,7 @@ use serde::{
use std::{fmt, marker::PhantomData, str::FromStr}; use std::{fmt, marker::PhantomData, str::FromStr};
pub trait ServerData<'a>: Serialize + Deserialize<'a> + FromStr<Err = ()> {} pub trait ServerData<'a>: Serialize + Deserialize<'a> + FromStr<Err = ()> {}
impl<'de, T: ServerData<'de>> Deserialize<'de> for ServerResponse<T> { impl<'de, T: ServerData<'de>> Deserialize<'de> for FlowyResponse<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where where
D: Deserializer<'de>, D: Deserializer<'de>,
@ -18,7 +18,7 @@ impl<'de, T: ServerData<'de>> Deserialize<'de> for ServerResponse<T> {
where where
T: ServerData<'de>, T: ServerData<'de>,
{ {
type Value = ServerResponse<T>; type Value = FlowyResponse<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct Duration") formatter.write_str("struct Duration")

View file

@ -3,8 +3,7 @@ use crate::{
errors::{ErrorBuilder, UserErrCode, UserError}, errors::{ErrorBuilder, UserErrCode, UserError},
}; };
use flowy_infra::uuid; use flowy_net::{future::ResultFuture, request::FlowyRequest};
use flowy_net::future::ResultFuture;
use std::sync::Arc; use std::sync::Arc;
pub(crate) trait UserServer { pub(crate) trait UserServer {
@ -27,13 +26,23 @@ impl UserServerImpl {}
impl UserServer for UserServerImpl { impl UserServer for UserServerImpl {
fn sign_up(&self, _params: SignUpParams) -> ResultFuture<SignUpResponse, UserError> { fn sign_up(&self, _params: SignUpParams) -> ResultFuture<SignUpResponse, UserError> {
ResultFuture::new(async { // let bytes: Vec<u8> = params.try_into().unwrap();
Ok(SignUpResponse { // ResultFuture::new(async move {
uid: "".to_string(), // match FlowyRequest::new()
name: "".to_string(), // .post_data::<SignUpResponse>("SIGN_UP_URL.as_ref()", bytes)
email: "".to_string(), // .await
}) // {
}) // Ok(a) => {},
// Err(err) => {},
// }
//
// Ok(SignUpResponse {
// uid: "".to_string(),
// name: "".to_string(),
// email: "".to_string(),
// })
// })
unimplemented!()
} }
fn sign_in(&self, _params: SignInParams) -> ResultFuture<SignInResponse, UserError> { fn sign_in(&self, _params: SignInParams) -> ResultFuture<SignInResponse, UserError> {