feat: ws reconnect (#5)

* feat: ws reconnect

* chore: update collab rev
This commit is contained in:
Nathan.fooo 2023-05-10 20:54:10 +08:00 committed by GitHub
parent 90ae1d5fb6
commit 8b9e6584d2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 83 additions and 40 deletions

5
Cargo.lock generated
View file

@ -824,7 +824,6 @@ dependencies = [
[[package]]
name = "collab"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"anyhow",
"bytes",
@ -841,7 +840,6 @@ dependencies = [
[[package]]
name = "collab-client-ws"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"bytes",
"collab-sync",
@ -859,7 +857,6 @@ dependencies = [
[[package]]
name = "collab-persistence"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"bincode",
"chrono",
@ -879,7 +876,6 @@ dependencies = [
[[package]]
name = "collab-plugins"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"collab",
"collab-client-ws",
@ -895,7 +891,6 @@ dependencies = [
[[package]]
name = "collab-sync"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"bytes",
"collab",

View file

@ -88,11 +88,11 @@ members = [
]
[patch.crates-io]
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-client-ws = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-sync= { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" }
collab-client-ws = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" }
collab-sync= { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" }
collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" }
collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" }
#collab = { path = "./crates/AppFlowy-Collab/collab" }
#collab-client-ws = { path = "./crates/AppFlowy-Collab/collab-client-ws" }

View file

@ -10,4 +10,7 @@ database:
username: "postgres"
password: "password"
database_name: "appflowy"
websocket:
heartbeat_interval: 8
client_timeout: 10
redis_uri: "redis://127.0.0.1:6379"

View file

@ -10,32 +10,36 @@ use bytes::Bytes;
use std::ops::Deref;
use collab_plugins::sync::msg::CollabMessage;
use std::sync::Arc;
use std::time::{Duration, Instant};
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct CollabSession {
user: Arc<WSUser>,
hb: Instant,
pub server: Addr<CollabServer>,
heartbeat_interval: Duration,
client_timeout: Duration,
}
impl CollabSession {
pub fn new(user: WSUser, server: Addr<CollabServer>) -> Self {
pub fn new(
user: WSUser,
server: Addr<CollabServer>,
heartbeat_interval: Duration,
client_timeout: Duration,
) -> Self {
Self {
user: Arc::new(user),
hb: Instant::now(),
server,
heartbeat_interval,
client_timeout,
}
}
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
ctx.run_interval(self.heartbeat_interval, |act, ctx| {
if Instant::now().duration_since(act.hb) > act.client_timeout {
act.server.do_send(Disconnect {
user: act.user.clone(),
});

View file

@ -30,7 +30,9 @@ pub struct CollabServer {
collab_id_gen: Arc<Mutex<CollabIDGen>>,
/// Memory cache for fast lookup of collab_id from object_id
collab_id_by_object_id: Arc<DashMap<String, CollabId>>,
/// Keep track of all collab groups
collab_groups: Arc<RwLock<HashMap<CollabId, CollabGroup>>>,
/// Keep track of all client streams
client_streams: Arc<RwLock<HashMap<Arc<WSUser>, WSClientStream>>>,
}

View file

@ -5,6 +5,7 @@ use actix_web::web::{Data, Path, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse, Result, Scope};
use actix_web_actors::ws;
use secrecy::Secret;
use std::time::Duration;
use websocket::entities::WSUser;
use websocket::{CollabServer, CollabSession};
@ -23,7 +24,12 @@ pub async fn establish_ws_connection(
) -> Result<HttpResponse> {
tracing::trace!("{:?}", request);
let user = LoggedUser::from_token(&state.config.application.server_key, token.as_str())?;
let client = CollabSession::new(user.into(), server.get_ref().clone());
let client = CollabSession::new(
user.into(),
server.get_ref().clone(),
Duration::from_secs(state.config.websocket.heartbeat_interval as u64),
Duration::from_secs(state.config.websocket.client_timeout as u64),
);
match ws::start(client, &request, payload) {
Ok(response) => Ok(response),
Err(e) => {

View file

@ -20,6 +20,7 @@ use snowflake::Snowflake;
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::net::TcpListener;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing_actix_web::TracingLogger;
@ -69,7 +70,7 @@ pub async fn run(
.map(|(_, server_key)| Key::from(server_key.expose_secret().as_bytes()))
.unwrap_or_else(Key::generate);
let collab_server = CollabServer::new(state.rocksdb.clone()).unwrap().start();
let collab_server_addr = CollabServer::new(state.rocksdb.clone()).unwrap().start();
let mut server = HttpServer::new(move || {
App::new()
.wrap(
@ -84,7 +85,7 @@ pub async fn run(
.app_data(web::JsonConfig::default().limit(4096))
.service(user_scope())
.service(ws_scope())
.app_data(Data::new(collab_server.clone()))
.app_data(Data::new(collab_server_addr.clone()))
.app_data(Data::new(state.clone()))
});

View file

@ -8,7 +8,8 @@ use std::path::PathBuf;
#[derive(serde::Deserialize, Clone, Debug)]
pub struct Config {
pub database: DatabaseSetting,
pub application: ApplicationSettings,
pub application: ApplicationSetting,
pub websocket: WebsocketSetting,
pub redis_uri: Secret<String>,
}
@ -21,7 +22,7 @@ pub struct Config {
// it to 0.0.0.0 in our Docker images.
//
#[derive(serde::Deserialize, Clone, Debug)]
pub struct ApplicationSettings {
pub struct ApplicationSetting {
#[serde(deserialize_with = "deserialize_number_from_string")]
pub port: u16,
pub host: String,
@ -30,7 +31,7 @@ pub struct ApplicationSettings {
pub tls_config: Option<TlsConfig>,
}
impl ApplicationSettings {
impl ApplicationSetting {
pub fn use_https(&self) -> bool {
match &self.tls_config {
None => false,
@ -89,7 +90,7 @@ pub fn get_configuration() -> Result<Config, config::ConfigError> {
let configuration_dir = base_path.join("configuration");
let environment: Environment = std::env::var("APP_ENVIRONMENT")
.unwrap_or_else(|_| "local".into())
.unwrap_or_else(|_| "local".to_string())
.try_into()
.expect("Failed to parse APP_ENVIRONMENT.");
@ -143,3 +144,8 @@ impl TryFrom<String> for Environment {
}
}
}
#[derive(serde::Deserialize, Clone, Debug)]
pub struct WebsocketSetting {
pub heartbeat_interval: u8,
pub client_timeout: u8,
}

View file

@ -1,5 +1,5 @@
use crate::util::{spawn_server, TestUser};
use collab_client_ws::WSClient;
use collab_client_ws::{WSClient, WSClientConfig};
#[actix_rt::test]
async fn ws_conn_test() {
@ -8,6 +8,6 @@ async fn ws_conn_test() {
let token = test_user.register(&server).await;
let address = format!("{}/{}", server.ws_addr, token);
let client = WSClient::new(address, 100);
let client = WSClient::new(address, WSClientConfig::default());
let _ = client.connect().await;
}

View file

@ -20,7 +20,7 @@ use sqlx::{Connection, Executor, PgConnection, PgPool};
// Ensure that the `tracing` stack is only initialised once using `once_cell`
static TRACING: Lazy<()> = Lazy::new(|| {
let level = "debug".to_string();
let level = "trace".to_string();
let mut filters = vec![];
filters.push(format!("appflowy_server={}", level));
filters.push(format!("collab_client_ws={}", level));

View file

@ -1,7 +1,7 @@
use collab::core::collab::MutexCollab;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab_client_ws::{WSClient, WSMessageHandler};
use collab_client_ws::{WSBusinessHandler, WSClient, WSClientConfig};
use collab_plugins::disk::kv::rocks_kv::RocksCollabDB;
use collab_plugins::disk::rocksdb::RocksdbDiskPlugin;
use collab_plugins::sync::SyncPlugin;
@ -9,6 +9,7 @@ use std::net::SocketAddr;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
pub async fn spawn_client(
@ -16,10 +17,13 @@ pub async fn spawn_client(
object_id: &str,
address: String,
) -> std::io::Result<TestClient> {
let ws_client = WSClient::new(address, 100);
let ws_client = WSClient::new(address, WSClientConfig::default());
let addr = ws_client.connect().await.unwrap().unwrap();
let origin = origin_from_tcp_stream(&addr);
let handler = ws_client.subscribe("collab".to_string()).await.unwrap();
let handler = ws_client
.subscribe_business("collab".to_string())
.await
.unwrap();
//
let (sink, stream) = (handler.sink(), handler.stream());
@ -60,7 +64,7 @@ pub struct TestClient {
cleaner: Cleaner,
#[allow(dead_code)]
handlers: Vec<Arc<WSMessageHandler>>,
handlers: Vec<Arc<WSBusinessHandler>>,
}
struct Cleaner(PathBuf);
@ -88,3 +92,7 @@ impl Deref for TestClient {
&self.collab
}
}
pub async fn wait(secs: u64) {
tokio::time::sleep(Duration::from_secs(secs)).await;
}

View file

@ -1,2 +1,3 @@
mod client;
mod test;
mod ws_reconnect;

View file

@ -1,7 +1,6 @@
use crate::util::{spawn_server, TestUser};
use crate::ws::client::spawn_client;
use crate::ws::client::{spawn_client, wait};
use serde_json::json;
use std::time::Duration;
#[actix_rt::test]
async fn ws_conn_test() {
@ -11,12 +10,12 @@ async fn ws_conn_test() {
let address = format!("{}/{}", server.ws_addr, token);
let client = spawn_client(1, "1", address).await.unwrap();
wait_a_sec().await;
wait(1).await;
{
let collab = client.lock();
collab.insert("1", "a");
}
wait_a_sec().await;
wait(1).await;
let value = server.get_doc("1");
assert_json_diff::assert_json_eq!(
@ -26,7 +25,3 @@ async fn ws_conn_test() {
})
);
}
async fn wait_a_sec() {
tokio::time::sleep(Duration::from_secs(2)).await;
}

22
tests/ws/ws_reconnect.rs Normal file
View file

@ -0,0 +1,22 @@
use crate::util::{spawn_server, TestUser};
use collab_client_ws::{WSClient, WSClientConfig};
#[actix_rt::test]
async fn ws_retry_connect() {
let server = spawn_server().await;
let test_user = TestUser::generate();
let token = test_user.register(&server).await;
let address = format!("{}/{}", server.ws_addr, token);
let ws_client = WSClient::new(
address,
WSClientConfig {
buffer_capacity: 100,
ping_per_secs: 2,
retry_connect_per_pings: 5,
},
);
let _addr = ws_client.connect().await.unwrap().unwrap();
// wait(20).await;
}