diff --git a/rust-lib/flowy-observable/src/dart/stream_sender.rs b/rust-lib/flowy-observable/src/dart/stream_sender.rs index 7f7f77ecb1..05f14cb06d 100644 --- a/rust-lib/flowy-observable/src/dart/stream_sender.rs +++ b/rust-lib/flowy-observable/src/dart/stream_sender.rs @@ -39,7 +39,7 @@ impl RustStreamSender { } } - pub fn post(observable_subject: ObservableSubject) -> Result<(), String> { + pub fn post(_observable_subject: ObservableSubject) -> Result<(), String> { #[cfg(feature = "dart")] match R2F_STREAM_SENDER.read() { Ok(stream) => stream.inner_post(observable_subject), diff --git a/rust-lib/flowy-ot/src/client/document/data.rs b/rust-lib/flowy-ot/src/client/document/data.rs index 046c531cd1..df5e005307 100644 --- a/rust-lib/flowy-ot/src/client/document/data.rs +++ b/rust-lib/flowy-ot/src/client/document/data.rs @@ -1,6 +1,5 @@ use crate::{client::DocumentData, errors::OTError}; use serde::{Deserialize, Serialize}; -use serde_json::Error; impl> DocumentData for T { fn into_string(self) -> Result { Ok(self.as_ref().to_string()) } diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000000..01e1651b4e --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "server" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +actix = "0.10" +actix-web = "3" +actix-http = "2.2.1" +actix-web-actors = "3" +actix-codec = "0.3" + + +futures = "0.3.15" +bytes = "0.5" +toml = "0.5.8" +dashmap = "4.0" +log = "0.4.14" +serde_json = "1.0" +serde = { version = "1.0", features = ["derive"] } +serde_repr = "0.1" + +[lib] +path = "src/lib.rs" + +[[bin]] +name = "flowy_server" +path = "src/main.rs" \ No newline at end of file diff --git a/server/rustfmt.toml b/server/rustfmt.toml new file mode 100644 index 0000000000..5f2617dc55 --- /dev/null +++ b/server/rustfmt.toml @@ -0,0 +1,18 @@ +# https://rust-lang.github.io/rustfmt/?version=master&search= +max_width = 100 +tab_spaces = 4 +fn_single_line = true +match_block_trailing_comma = true +normalize_comments = true +wrap_comments = true +use_field_init_shorthand = true +use_try_shorthand = true +normalize_doc_attributes = true +report_todo = "Always" +report_fixme = "Always" +imports_layout = "HorizontalVertical" +merge_imports = true +reorder_modules = true +reorder_imports = true +enum_discrim_align_threshold = 20 +edition = "2018" diff --git a/server/src/config/config.rs b/server/src/config/config.rs new file mode 100644 index 0000000000..430cc05de2 --- /dev/null +++ b/server/src/config/config.rs @@ -0,0 +1,41 @@ +use std::convert::TryFrom; + +pub struct Config { + pub http_port: u16, +} + +impl Config { + pub fn new() -> Self { Config { http_port: 3030 } } + + pub fn server_addr(&self) -> String { format!("0.0.0.0:{}", self.http_port) } +} + +pub enum Environment { + Local, + Production, +} + +impl Environment { + #[allow(dead_code)] + pub fn as_str(&self) -> &'static str { + match self { + Environment::Local => "local", + Environment::Production => "production", + } + } +} + +impl TryFrom for Environment { + type Error = String; + + fn try_from(s: String) -> Result { + match s.to_lowercase().as_str() { + "local" => Ok(Self::Local), + "production" => Ok(Self::Production), + other => Err(format!( + "{} is not a supported environment. Use either `local` or `production`.", + other + )), + } + } +} diff --git a/server/src/config/const_define.rs b/server/src/config/const_define.rs new file mode 100644 index 0000000000..b0eb7de9c4 --- /dev/null +++ b/server/src/config/const_define.rs @@ -0,0 +1,4 @@ +use std::time::Duration; + +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(8); +pub const PING_TIMEOUT: Duration = Duration::from_secs(60); diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs new file mode 100644 index 0000000000..8d0e0324bf --- /dev/null +++ b/server/src/config/mod.rs @@ -0,0 +1,5 @@ +mod config; +mod const_define; + +pub use config::*; +pub use const_define::*; diff --git a/server/src/context.rs b/server/src/context.rs new file mode 100644 index 0000000000..43a94600d0 --- /dev/null +++ b/server/src/context.rs @@ -0,0 +1,19 @@ +use crate::{config::Config, ws::WSServer}; +use actix::Addr; +use std::sync::Arc; + +pub struct AppContext { + pub config: Arc, + pub server: Addr, +} + +impl AppContext { + pub fn new(server: Addr) -> Self { + AppContext { + config: Arc::new(Config::new()), + server, + } + } + + pub fn ws_server(&self) -> Addr { self.server.clone() } +} diff --git a/server/src/errors.rs b/server/src/errors.rs new file mode 100644 index 0000000000..acbf82d2ed --- /dev/null +++ b/server/src/errors.rs @@ -0,0 +1,3 @@ +pub struct ServerError {} + +// pub enum ErrorCode {} diff --git a/server/src/lib.rs b/server/src/lib.rs new file mode 100644 index 0000000000..3124c024ba --- /dev/null +++ b/server/src/lib.rs @@ -0,0 +1,6 @@ +mod config; +mod context; +mod errors; +mod routers; +pub mod startup; +mod ws; diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000000..46cecc2c9c --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,10 @@ +use server::startup::{init_app_context, run}; +use std::net::TcpListener; + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + let app_ctx = init_app_context().await; + let listener = + TcpListener::bind(app_ctx.config.server_addr()).expect("Failed to bind server address"); + run(app_ctx, listener)?.await +} diff --git a/server/src/routers/mod.rs b/server/src/routers/mod.rs new file mode 100644 index 0000000000..15edb0affa --- /dev/null +++ b/server/src/routers/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod ws; + +pub use ws::*; diff --git a/server/src/routers/ws.rs b/server/src/routers/ws.rs new file mode 100644 index 0000000000..d33730deb3 --- /dev/null +++ b/server/src/routers/ws.rs @@ -0,0 +1,22 @@ +use crate::ws::{entities::SessionId, WSServer, WSSession}; +use actix::Addr; +use actix_web::{ + get, + web::{Data, Path, Payload}, + Error, + HttpRequest, + HttpResponse, +}; +use actix_web_actors::ws; + +#[get("/{token}")] +pub async fn start_connection( + request: HttpRequest, + payload: Payload, + Path(token): Path, + server: Data>, +) -> Result { + let ws = WSSession::new(SessionId::new(token), server.get_ref().clone()); + let response = ws::start(ws, &request, payload)?; + Ok(response.into()) +} diff --git a/server/src/startup.rs b/server/src/startup.rs new file mode 100644 index 0000000000..4c12a6f427 --- /dev/null +++ b/server/src/startup.rs @@ -0,0 +1,25 @@ +use crate::{context::AppContext, routers::*, ws::WSServer}; +use actix::Actor; +use actix_web::{dev::Server, middleware, web, App, HttpServer, Scope}; +use std::{net::TcpListener, sync::Arc}; + +pub fn run(app_ctx: Arc, listener: TcpListener) -> Result { + let server = HttpServer::new(move || { + App::new() + .wrap(middleware::Logger::default()) + .data(web::JsonConfig::default().limit(4096)) + .service(ws_scope()) + .data(app_ctx.ws_server()) + }) + .listen(listener)? + .run(); + Ok(server) +} + +fn ws_scope() -> Scope { web::scope("/ws").service(ws::start_connection) } + +pub async fn init_app_context() -> Arc { + let ws_server = WSServer::new().start(); + let ctx = AppContext::new(ws_server); + Arc::new(ctx) +} diff --git a/server/src/ws/entities/connect.rs b/server/src/ws/entities/connect.rs new file mode 100644 index 0000000000..0f691dbb61 --- /dev/null +++ b/server/src/ws/entities/connect.rs @@ -0,0 +1,33 @@ +use crate::{errors::ServerError, ws::Packet}; +use actix::{Message, Recipient}; +use serde::{Deserialize, Serialize}; +use std::fmt::Formatter; + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct SessionId { + pub id: String, +} + +impl SessionId { + pub fn new(id: String) -> Self { SessionId { id } } +} + +impl std::fmt::Display for SessionId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let desc = format!("{}", &self.id); + f.write_str(&desc) + } +} + +#[derive(Debug, Message, Clone)] +#[rtype(result = "Result<(), ServerError>")] +pub struct Connect { + pub socket: Recipient, + pub sid: SessionId, +} + +#[derive(Debug, Message, Clone)] +#[rtype(result = "Result<(), ServerError>")] +pub struct Disconnect { + pub sid: SessionId, +} diff --git a/server/src/ws/entities/mod.rs b/server/src/ws/entities/mod.rs new file mode 100644 index 0000000000..05703e4a96 --- /dev/null +++ b/server/src/ws/entities/mod.rs @@ -0,0 +1,5 @@ +pub use connect::*; +pub use packet::*; + +mod connect; +pub mod packet; diff --git a/server/src/ws/entities/packet.rs b/server/src/ws/entities/packet.rs new file mode 100644 index 0000000000..03bfe4bf22 --- /dev/null +++ b/server/src/ws/entities/packet.rs @@ -0,0 +1,37 @@ +use crate::ws::entities::SessionId; +use actix::Message; +use bytes::Bytes; +use std::fmt::Formatter; + +#[derive(Debug, Clone)] +pub enum Frame { + Text(String), + Binary(Bytes), + Connect(SessionId), + Disconnect(String), +} + +#[derive(Debug, Message, Clone)] +#[rtype(result = "()")] +pub struct Packet { + pub sid: SessionId, + pub frame: Frame, +} + +impl Packet { + pub fn new(sid: SessionId, frame: Frame) -> Self { Packet { sid, frame } } +} + +impl std::fmt::Display for Packet { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let content = match &self.frame { + Frame::Text(t) => format!("[Text]: {}", t), + Frame::Binary(_) => "[Binary message]".to_owned(), + Frame::Connect(_) => "Connect".to_owned(), + Frame::Disconnect(_) => "Disconnect".to_owned(), + }; + + let desc = format!("{}:{}", &self.sid, content); + f.write_str(&desc) + } +} diff --git a/server/src/ws/mod.rs b/server/src/ws/mod.rs new file mode 100644 index 0000000000..6216359df2 --- /dev/null +++ b/server/src/ws/mod.rs @@ -0,0 +1,7 @@ +pub use entities::packet::*; +pub use ws_server::*; +pub use ws_session::*; + +pub(crate) mod entities; +mod ws_server; +mod ws_session; diff --git a/server/src/ws/ws_server.rs b/server/src/ws/ws_server.rs new file mode 100644 index 0000000000..093094aa18 --- /dev/null +++ b/server/src/ws/ws_server.rs @@ -0,0 +1,57 @@ +use crate::{ + errors::ServerError, + ws::{ + entities::{Connect, Disconnect, SessionId}, + Packet, + WSSession, + }, +}; +use actix::{Actor, Context, Handler}; +use dashmap::DashMap; + +pub struct WSServer { + session_map: DashMap, +} + +impl WSServer { + pub fn new() -> Self { + Self { + session_map: DashMap::new(), + } + } + + pub fn send(&self, _packet: Packet) { unimplemented!() } +} + +impl Actor for WSServer { + type Context = Context; + fn started(&mut self, _ctx: &mut Self::Context) {} +} + +impl Handler for WSServer { + type Result = Result<(), ServerError>; + fn handle(&mut self, _msg: Connect, _ctx: &mut Context) -> Self::Result { + unimplemented!() + } +} + +impl Handler for WSServer { + type Result = Result<(), ServerError>; + fn handle(&mut self, _msg: Disconnect, _: &mut Context) -> Self::Result { + unimplemented!() + } +} + +impl Handler for WSServer { + type Result = (); + + fn handle(&mut self, _packet: Packet, _ctx: &mut Context) -> Self::Result { + unimplemented!() + } +} + +impl actix::Supervised for WSServer { + fn restarting(&mut self, _ctx: &mut Context) { + log::warn!("restarting"); + } +} diff --git a/server/src/ws/ws_session.rs b/server/src/ws/ws_session.rs new file mode 100644 index 0000000000..2e10b8fcd1 --- /dev/null +++ b/server/src/ws/ws_session.rs @@ -0,0 +1,163 @@ +use crate::{ + config::{HEARTBEAT_INTERVAL, PING_TIMEOUT}, + ws::{ + entities::{Connect, Disconnect, SessionId}, + Frame, + Packet, + WSServer, + }, +}; +use actix::{ + fut, + Actor, + ActorContext, + ActorFuture, + Addr, + AsyncContext, + ContextFutureSpawner, + Handler, + Running, + StreamHandler, + WrapFuture, +}; + +use actix_web_actors::{ws, ws::Message::Text}; +use std::time::Instant; + +pub struct WSSession { + sid: SessionId, + server: Addr, + hb: Instant, +} + +impl WSSession { + pub fn new(sid: SessionId, server: Addr) -> Self { + Self { + sid, + hb: Instant::now(), + server, + } + } + + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |ws_session, ctx| { + if Instant::now().duration_since(ws_session.hb) > PING_TIMEOUT { + ws_session.server.do_send(Disconnect { + sid: ws_session.sid.clone(), + }); + ctx.stop(); + return; + } + ctx.ping(b""); + }); + } + + fn connect(&self, ctx: &mut ws::WebsocketContext) { + self.hb(ctx); + let socket = ctx.address().recipient(); + let connect = Connect { + socket, + sid: self.sid.clone(), + }; + self.server + .send(connect) + .into_actor(self) + .then(|res, _ws_session, _ctx| { + match res { + Ok(Ok(_)) => {}, + Ok(Err(_e)) => { + unimplemented!() + }, + Err(_e) => unimplemented!(), + } + fut::ready(()) + }) + .wait(ctx); + } + + fn send(&self, frame: Frame) { + let msg = Packet::new(self.sid.clone(), frame); + self.server.do_send(msg); + } +} + +impl Actor for WSSession { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { self.connect(ctx); } + + fn stopping(&mut self, _: &mut Self::Context) -> Running { + self.server.do_send(Disconnect { + sid: self.sid.clone(), + }); + + Running::Stop + } +} + +impl StreamHandler> for WSSession { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Ping(msg)) => { + log::debug!("Receive {} ping {:?}", &self.sid, &msg); + self.hb = Instant::now(); + ctx.pong(&msg); + }, + Ok(ws::Message::Pong(msg)) => { + log::debug!("Receive {} pong {:?}", &self.sid, &msg); + self.send(Frame::Connect(self.sid.clone())); + self.hb = Instant::now(); + }, + Ok(ws::Message::Binary(bin)) => { + log::debug!(" Receive {} binary", &self.sid); + self.send(Frame::Binary(bin)); + }, + Ok(ws::Message::Close(reason)) => { + log::debug!("Receive {} close {:?}", &self.sid, &reason); + ctx.close(reason); + ctx.stop(); + }, + Ok(ws::Message::Continuation(c)) => { + log::debug!("Receive {} continues message {:?}", &self.sid, &c); + }, + Ok(ws::Message::Nop) => { + log::debug!("Receive Nop message"); + }, + Ok(Text(s)) => { + log::debug!("Receive {} text {:?}", &self.sid, &s); + self.send(Frame::Text(s)); + }, + + Err(e) => { + let msg = format!("{} error: {:?}", &self.sid, e); + ctx.text(&msg); + log::error!("stream {}", msg); + ctx.stop(); + }, + } + } +} + +impl Handler for WSSession { + type Result = (); + + fn handle(&mut self, msg: Packet, ctx: &mut Self::Context) { + match msg.frame { + Frame::Text(text) => { + ctx.text(text); + }, + Frame::Binary(binary) => { + ctx.binary(binary); + }, + Frame::Connect(sid) => { + let connect_msg = format!("{} connect", &sid); + ctx.text(connect_msg); + }, + Frame::Disconnect(text) => { + log::debug!("Session start disconnecting {}", self.sid); + ctx.text(text); + ctx.stop(); + }, + } + } +}