refactor cloud service triat

This commit is contained in:
appflowy 2022-01-10 23:45:59 +08:00
parent 46a3eb57fa
commit 855d396122
55 changed files with 1368 additions and 1204 deletions

11
backend/Cargo.lock generated
View file

@ -1247,7 +1247,6 @@ dependencies = [
name = "flowy-core" name = "flowy-core"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"backend-service",
"bincode", "bincode",
"bytes", "bytes",
"chrono", "chrono",
@ -1263,7 +1262,6 @@ dependencies = [
"flowy-derive", "flowy-derive",
"flowy-document", "flowy-document",
"flowy-error", "flowy-error",
"flowy-net",
"futures", "futures",
"futures-core", "futures-core",
"lazy_static", "lazy_static",
@ -1327,7 +1325,6 @@ name = "flowy-document"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"backend-service",
"bytecount", "bytecount",
"byteorder", "byteorder",
"bytes", "bytes",
@ -1342,9 +1339,7 @@ dependencies = [
"flowy-derive", "flowy-derive",
"flowy-error", "flowy-error",
"futures", "futures",
"futures-core",
"futures-util", "futures-util",
"lazy_static",
"lib-dispatch", "lib-dispatch",
"lib-infra", "lib-infra",
"lib-ot", "lib-ot",
@ -1386,11 +1381,15 @@ name = "flowy-net"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"backend-service",
"bytes", "bytes",
"dashmap", "dashmap",
"flowy-collaboration", "flowy-collaboration",
"flowy-core-data-model",
"flowy-derive", "flowy-derive",
"flowy-error", "flowy-error",
"flowy-user-data-model",
"lazy_static",
"lib-dispatch", "lib-dispatch",
"lib-infra", "lib-infra",
"lib-ws", "lib-ws",
@ -1455,7 +1454,6 @@ dependencies = [
name = "flowy-user" name = "flowy-user"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"backend-service",
"bytes", "bytes",
"dart-notify", "dart-notify",
"dashmap", "dashmap",
@ -1465,7 +1463,6 @@ dependencies = [
"flowy-database", "flowy-database",
"flowy-derive", "flowy-derive",
"flowy-error", "flowy-error",
"flowy-net",
"flowy-user-data-model", "flowy-user-data-model",
"futures-core", "futures-core",
"lazy_static", "lazy_static",

View file

@ -18,7 +18,7 @@ use flowy_collaboration::{
RepeatedRevision as RepeatedRevisionPB, RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB, Revision as RevisionPB,
}, },
sync::{DocumentPersistence, ServerDocumentManager}, sync::{ServerDocumentManager, ServerDocumentPersistence},
util::repeated_revision_from_repeated_revision_pb, util::repeated_revision_from_repeated_revision_pb,
}; };
use lib_infra::future::BoxResultFuture; use lib_infra::future::BoxResultFuture;
@ -81,7 +81,7 @@ impl Debug for HttpServerDocumentPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") } fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
} }
impl DocumentPersistence for HttpServerDocumentPersistence { impl ServerDocumentPersistence for HttpServerDocumentPersistence {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> { fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let params = DocumentId { let params = DocumentId {
doc_id: doc_id.to_string(), doc_id: doc_id.to_string(),

View file

@ -6,14 +6,17 @@ use backend::{
use backend_service::{ use backend_service::{
configuration::{get_client_server_configuration, ClientServerConfiguration}, configuration::{get_client_server_configuration, ClientServerConfiguration},
errors::ServerError, errors::ServerError,
http_request::*,
}; };
use flowy_collaboration::{ use flowy_collaboration::{
document::default::initial_delta_string, document::default::initial_delta_string,
entities::doc::{CreateDocParams, DocumentId, DocumentInfo}, entities::doc::{CreateDocParams, DocumentId, DocumentInfo},
}; };
use flowy_core_data_model::entities::prelude::*; use flowy_core_data_model::entities::prelude::*;
use flowy_document::server::{create_doc_request, read_doc_request}; use flowy_net::cloud::{
core::*,
document::{create_document_request, read_document_request},
user::*,
};
use flowy_user_data_model::entities::*; use flowy_user_data_model::entities::*;
use lib_infra::uuid_string; use lib_infra::uuid_string;
use sqlx::{Connection, Executor, PgConnection, PgPool}; use sqlx::{Connection, Executor, PgConnection, PgPool};
@ -153,13 +156,13 @@ impl TestUserServer {
pub async fn read_doc(&self, params: DocumentId) -> Option<DocumentInfo> { pub async fn read_doc(&self, params: DocumentId) -> Option<DocumentInfo> {
let url = format!("{}/api/doc", self.http_addr()); let url = format!("{}/api/doc", self.http_addr());
let doc = read_doc_request(self.user_token(), params, &url).await.unwrap(); let doc = read_document_request(self.user_token(), params, &url).await.unwrap();
doc doc
} }
pub async fn create_doc(&self, params: CreateDocParams) { pub async fn create_doc(&self, params: CreateDocParams) {
let url = format!("{}/api/doc", self.http_addr()); let url = format!("{}/api/doc", self.http_addr());
let _ = create_doc_request(self.user_token(), params, &url).await.unwrap(); let _ = create_document_request(self.user_token(), params, &url).await.unwrap();
} }
pub async fn register_user(&self) -> SignUpResponse { pub async fn register_user(&self) -> SignUpResponse {

View file

@ -10,13 +10,11 @@ flowy-core-data-model = { path = "../../../shared-lib/flowy-core-data-model" }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" } flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-derive = { path = "../../../shared-lib/flowy-derive" }
lib-ot = { path = "../../../shared-lib/lib-ot" } lib-ot = { path = "../../../shared-lib/lib-ot" }
backend-service = { path = "../../../shared-lib/backend-service" }
lib-infra = { path = "../../../shared-lib/lib-infra" } lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-document = { path = "../flowy-document" } flowy-document = { path = "../flowy-document" }
flowy-database = { path = "../flowy-database" } flowy-database = { path = "../flowy-database" }
flowy-error = { path = "../flowy-error", features = ["db", "backend"]} flowy-error = { path = "../flowy-error", features = ["db", "backend"]}
flowy-net = { path = "../flowy-net" }
dart-notify = { path = "../dart-notify" } dart-notify = { path = "../dart-notify" }
lib-dispatch = { path = "../lib-dispatch" } lib-dispatch = { path = "../lib-dispatch" }
lib-sqlite = { path = "../lib-sqlite" } lib-sqlite = { path = "../lib-sqlite" }

View file

@ -1,19 +1,16 @@
use std::{collections::HashMap, sync::Arc};
use chrono::Utc; use chrono::Utc;
use lazy_static::lazy_static;
use parking_lot::RwLock;
use flowy_collaboration::document::default::{initial_delta, initial_read_me}; use flowy_collaboration::document::default::{initial_delta, initial_read_me};
use flowy_core_data_model::{entities::view::CreateViewParams, user_default}; use flowy_core_data_model::{entities::view::CreateViewParams, user_default};
use flowy_net::entities::NetworkType; use lazy_static::lazy_static;
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
use crate::{ use crate::{
entities::workspace::RepeatedWorkspace, entities::workspace::RepeatedWorkspace,
errors::{FlowyError, FlowyResult}, errors::{FlowyError, FlowyResult},
module::{WorkspaceDatabase, WorkspaceUser}, module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser},
notify::{send_dart_notification, WorkspaceNotification}, notify::{send_dart_notification, WorkspaceNotification},
services::{server::Server, AppController, TrashController, ViewController, WorkspaceController}, services::{AppController, TrashController, ViewController, WorkspaceController},
}; };
lazy_static! { lazy_static! {
@ -22,7 +19,7 @@ lazy_static! {
pub struct CoreContext { pub struct CoreContext {
pub user: Arc<dyn WorkspaceUser>, pub user: Arc<dyn WorkspaceUser>,
pub(crate) server: Server, pub(crate) cloud_service: Arc<dyn CoreCloudService>,
pub(crate) database: Arc<dyn WorkspaceDatabase>, pub(crate) database: Arc<dyn WorkspaceDatabase>,
pub workspace_controller: Arc<WorkspaceController>, pub workspace_controller: Arc<WorkspaceController>,
pub(crate) app_controller: Arc<AppController>, pub(crate) app_controller: Arc<AppController>,
@ -33,7 +30,7 @@ pub struct CoreContext {
impl CoreContext { impl CoreContext {
pub(crate) fn new( pub(crate) fn new(
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
database: Arc<dyn WorkspaceDatabase>, database: Arc<dyn WorkspaceDatabase>,
workspace_controller: Arc<WorkspaceController>, workspace_controller: Arc<WorkspaceController>,
app_controller: Arc<AppController>, app_controller: Arc<AppController>,
@ -46,7 +43,7 @@ impl CoreContext {
Self { Self {
user, user,
server, cloud_service,
database, database,
workspace_controller, workspace_controller,
app_controller, app_controller,
@ -55,14 +52,14 @@ impl CoreContext {
} }
} }
pub fn network_state_changed(&self, new_type: NetworkType) { // pub fn network_state_changed(&self, new_type: NetworkType) {
match new_type { // match new_type {
NetworkType::UnknownNetworkType => {}, // NetworkType::UnknownNetworkType => {},
NetworkType::Wifi => {}, // NetworkType::Wifi => {},
NetworkType::Cell => {}, // NetworkType::Cell => {},
NetworkType::Ethernet => {}, // NetworkType::Ethernet => {},
} // }
} // }
pub async fn user_did_sign_in(&self, token: &str) -> FlowyResult<()> { pub async fn user_did_sign_in(&self, token: &str) -> FlowyResult<()> {
log::debug!("workspace initialize after sign in"); log::debug!("workspace initialize after sign in");

View file

@ -66,7 +66,7 @@ fn read_workspaces_on_server(
user_id: String, user_id: String,
params: WorkspaceId, params: WorkspaceId,
) -> Result<(), FlowyError> { ) -> Result<(), FlowyError> {
let (token, server) = (core.user.token()?, core.server.clone()); let (token, server) = (core.user.token()?, core.cloud_service.clone());
let app_ctrl = core.app_controller.clone(); let app_ctrl = core.app_controller.clone();
let view_ctrl = core.view_controller.clone(); let view_ctrl = core.view_controller.clone();
let conn = core.database.db_connection()?; let conn = core.database.db_connection()?;

View file

@ -1,13 +1,16 @@
use std::sync::Arc;
use crate::{ use crate::{
context::CoreContext, context::CoreContext,
entities::{
app::{App, AppId, CreateAppParams, UpdateAppParams},
trash::{RepeatedTrash, RepeatedTrashId},
view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId},
workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
},
errors::FlowyError, errors::FlowyError,
event::WorkspaceEvent, event::WorkspaceEvent,
event_handler::*, event_handler::*,
services::{ services::{
app::event_handler::*, app::event_handler::*,
server::construct_workspace_server,
trash::event_handler::*, trash::event_handler::*,
view::event_handler::*, view::event_handler::*,
workspace::event_handler::*, workspace::event_handler::*,
@ -17,11 +20,12 @@ use crate::{
WorkspaceController, WorkspaceController,
}, },
}; };
use backend_service::configuration::ClientServerConfiguration;
use flowy_database::DBConnection; use flowy_database::DBConnection;
use flowy_document::context::DocumentContext; use flowy_document::context::DocumentContext;
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use lib_infra::future::FutureResult;
use lib_sqlite::ConnectionPool; use lib_sqlite::ConnectionPool;
use std::sync::Arc;
pub trait WorkspaceDeps: WorkspaceUser + WorkspaceDatabase {} pub trait WorkspaceDeps: WorkspaceUser + WorkspaceDatabase {}
@ -44,16 +48,18 @@ pub fn init_core(
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>, database: Arc<dyn WorkspaceDatabase>,
flowy_document: Arc<DocumentContext>, flowy_document: Arc<DocumentContext>,
server_config: &ClientServerConfiguration, cloud_service: Arc<dyn CoreCloudService>,
) -> Arc<CoreContext> { ) -> Arc<CoreContext> {
let server = construct_workspace_server(server_config); let trash_controller = Arc::new(TrashController::new(
database.clone(),
let trash_controller = Arc::new(TrashController::new(database.clone(), server.clone(), user.clone())); cloud_service.clone(),
user.clone(),
));
let view_controller = Arc::new(ViewController::new( let view_controller = Arc::new(ViewController::new(
user.clone(), user.clone(),
database.clone(), database.clone(),
server.clone(), cloud_service.clone(),
trash_controller.clone(), trash_controller.clone(),
flowy_document, flowy_document,
)); ));
@ -62,19 +68,19 @@ pub fn init_core(
user.clone(), user.clone(),
database.clone(), database.clone(),
trash_controller.clone(), trash_controller.clone(),
server.clone(), cloud_service.clone(),
)); ));
let workspace_controller = Arc::new(WorkspaceController::new( let workspace_controller = Arc::new(WorkspaceController::new(
user.clone(), user.clone(),
database.clone(), database.clone(),
trash_controller.clone(), trash_controller.clone(),
server.clone(), cloud_service.clone(),
)); ));
Arc::new(CoreContext::new( Arc::new(CoreContext::new(
user, user,
server, cloud_service,
database, database,
workspace_controller, workspace_controller,
app_controller, app_controller,
@ -126,3 +132,41 @@ pub fn create(core: Arc<CoreContext>) -> Module {
module module
} }
pub trait CoreCloudService: Send + Sync {
fn init(&self);
// Workspace
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError>;
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError>;
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError>;
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError>;
// View
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError>;
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError>;
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError>;
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError>;
// App
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError>;
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError>;
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError>;
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError>;
// Trash
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>;
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>;
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError>;
}

View file

@ -4,11 +4,10 @@ use crate::{
trash::TrashType, trash::TrashType,
}, },
errors::*, errors::*,
module::{WorkspaceDatabase, WorkspaceUser}, module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser},
notify::*, notify::*,
services::{ services::{
app::sql::{AppTable, AppTableChangeset, AppTableSql}, app::sql::{AppTable, AppTableChangeset, AppTableSql},
server::Server,
TrashController, TrashController,
TrashEvent, TrashEvent,
}, },
@ -21,7 +20,7 @@ pub(crate) struct AppController {
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>, database: Arc<dyn WorkspaceDatabase>,
trash_can: Arc<TrashController>, trash_can: Arc<TrashController>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
} }
impl AppController { impl AppController {
@ -29,13 +28,13 @@ impl AppController {
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>, database: Arc<dyn WorkspaceDatabase>,
trash_can: Arc<TrashController>, trash_can: Arc<TrashController>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
) -> Self { ) -> Self {
Self { Self {
user, user,
database, database,
trash_can, trash_can,
server, cloud_service,
} }
} }
@ -115,14 +114,14 @@ impl AppController {
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
async fn create_app_on_server(&self, params: CreateAppParams) -> Result<App, FlowyError> { async fn create_app_on_server(&self, params: CreateAppParams) -> Result<App, FlowyError> {
let token = self.user.token()?; let token = self.user.token()?;
let app = self.server.create_app(&token, params).await?; let app = self.cloud_service.create_app(&token, params).await?;
Ok(app) Ok(app)
} }
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
fn update_app_on_server(&self, params: UpdateAppParams) -> Result<(), FlowyError> { fn update_app_on_server(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
let token = self.user.token()?; let token = self.user.token()?;
let server = self.server.clone(); let server = self.cloud_service.clone();
tokio::spawn(async move { tokio::spawn(async move {
match server.update_app(&token, params).await { match server.update_app(&token, params).await {
Ok(_) => {}, Ok(_) => {},
@ -138,7 +137,7 @@ impl AppController {
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> { fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> {
let token = self.user.token()?; let token = self.user.token()?;
let server = self.server.clone(); let server = self.cloud_service.clone();
let pool = self.database.db_pool()?; let pool = self.database.db_pool()?;
tokio::spawn(async move { tokio::spawn(async move {
// Opti: retry? // Opti: retry?

View file

@ -4,7 +4,6 @@ pub(crate) use view::controller::*;
pub(crate) use workspace::controller::*; pub(crate) use workspace::controller::*;
pub(crate) mod app; pub(crate) mod app;
pub(crate) mod server;
pub(crate) mod trash; pub(crate) mod trash;
pub(crate) mod view; pub(crate) mod view;
pub(crate) mod workspace; pub(crate) mod workspace;

View file

@ -1,69 +0,0 @@
mod server_api;
mod server_api_mock;
pub use server_api::*;
// TODO: ignore mock files in production
pub use server_api_mock::*;
use crate::{
entities::{
app::{App, AppId, CreateAppParams, UpdateAppParams},
trash::{RepeatedTrash, RepeatedTrashId},
view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId},
workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
},
errors::FlowyError,
};
use backend_service::configuration::ClientServerConfiguration;
use lib_infra::future::FutureResult;
use std::sync::Arc;
pub(crate) type Server = Arc<dyn WorkspaceServerAPI + Send + Sync>;
pub trait WorkspaceServerAPI {
fn init(&self);
// Workspace
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError>;
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError>;
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError>;
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError>;
// View
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError>;
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError>;
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError>;
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError>;
// App
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError>;
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError>;
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError>;
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError>;
// Trash
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>;
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>;
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError>;
}
pub(crate) fn construct_workspace_server(
config: &ClientServerConfiguration,
) -> Arc<dyn WorkspaceServerAPI + Send + Sync> {
if cfg!(feature = "http_server") {
Arc::new(WorkspaceHttpServer::new(config.clone()))
} else {
Arc::new(WorkspaceServerMock {})
}
}

View file

@ -1,170 +0,0 @@
use crate::{
entities::{
app::{App, AppId, CreateAppParams, UpdateAppParams},
trash::{RepeatedTrash, RepeatedTrashId},
view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId},
workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
},
errors::{ErrorCode, FlowyError},
notify::{send_dart_notification, WorkspaceNotification},
services::server::WorkspaceServerAPI,
};
use backend_service::{configuration::ClientServerConfiguration, http_request::*, middleware::*};
use lib_infra::future::FutureResult;
pub struct WorkspaceHttpServer {
config: ClientServerConfiguration,
}
impl WorkspaceHttpServer {
pub fn new(config: ClientServerConfiguration) -> WorkspaceHttpServer { Self { config } }
}
impl WorkspaceServerAPI for WorkspaceHttpServer {
fn init(&self) {
let mut rx = BACKEND_API_MIDDLEWARE.invalid_token_subscribe();
tokio::spawn(async move {
while let Ok(invalid_token) = rx.recv().await {
let error = FlowyError::new(ErrorCode::UserUnauthorized, "");
send_dart_notification(&invalid_token, WorkspaceNotification::UserUnauthorized)
.error(error)
.send()
}
});
}
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let workspace = create_workspace_request(&token, params, &url).await?;
Ok(workspace)
})
}
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let repeated_workspace = read_workspaces_request(&token, params, &url).await?;
Ok(repeated_workspace)
})
}
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let _ = update_workspace_request(&token, params, &url).await?;
Ok(())
})
}
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let _ = delete_workspace_request(&token, params, &url).await?;
Ok(())
})
}
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let view = create_view_request(&token, params, &url).await?;
Ok(view)
})
}
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let view = read_view_request(&token, params, &url).await?;
Ok(view)
})
}
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let _ = delete_view_request(&token, params, &url).await?;
Ok(())
})
}
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let _ = update_view_request(&token, params, &url).await?;
Ok(())
})
}
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let app = create_app_request(&token, params, &url).await?;
Ok(app)
})
}
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let app = read_app_request(&token, params, &url).await?;
Ok(app)
})
}
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let _ = update_app_request(&token, params, &url).await?;
Ok(())
})
}
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let _ = delete_app_request(&token, params, &url).await?;
Ok(())
})
}
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
let _ = create_trash_request(&token, params, &url).await?;
Ok(())
})
}
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
let _ = delete_trash_request(&token, params, &url).await?;
Ok(())
})
}
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
let repeated_trash = read_trash_request(&token, &url).await?;
Ok(repeated_trash)
})
}
}

View file

@ -1,116 +0,0 @@
use crate::{
entities::{
app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams},
trash::{RepeatedTrash, RepeatedTrashId},
view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId},
workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
},
errors::FlowyError,
services::server::WorkspaceServerAPI,
};
use lib_infra::{future::FutureResult, timestamp, uuid_string};
pub struct WorkspaceServerMock {}
impl WorkspaceServerAPI for WorkspaceServerMock {
fn init(&self) {}
fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
let time = timestamp();
let workspace = Workspace {
id: uuid_string(),
name: params.name,
desc: params.desc,
apps: RepeatedApp::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(workspace) })
}
fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
FutureResult::new(async {
let repeated_workspace = RepeatedWorkspace { items: vec![] };
Ok(repeated_workspace)
})
}
fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
let time = timestamp();
let view = View {
id: params.view_id,
belong_to_id: params.belong_to_id,
name: params.name,
desc: params.desc,
view_type: params.view_type,
version: 0,
belongings: RepeatedView::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(view) })
}
fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult<Option<View>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
let time = timestamp();
let app = App {
id: uuid_string(),
workspace_id: params.workspace_id,
name: params.name,
desc: params.desc,
belongings: RepeatedView::default(),
version: 0,
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(app) })
}
fn read_app(&self, _token: &str, _params: AppId) -> FutureResult<Option<App>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn read_trash(&self, _token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
FutureResult::new(async {
let repeated_trash = RepeatedTrash { items: vec![] };
Ok(repeated_trash)
})
}
}

View file

@ -1,9 +1,9 @@
use crate::{ use crate::{
entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType}, entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType},
errors::{FlowyError, FlowyResult}, errors::{FlowyError, FlowyResult},
module::{WorkspaceDatabase, WorkspaceUser}, module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser},
notify::{send_anonymous_dart_notification, WorkspaceNotification}, notify::{send_anonymous_dart_notification, WorkspaceNotification},
services::{server::Server, trash::sql::TrashTableSql}, services::trash::sql::TrashTableSql,
}; };
use crossbeam_utils::thread; use crossbeam_utils::thread;
use flowy_database::SqliteConnection; use flowy_database::SqliteConnection;
@ -13,18 +13,22 @@ use tokio::sync::{broadcast, mpsc};
pub struct TrashController { pub struct TrashController {
pub database: Arc<dyn WorkspaceDatabase>, pub database: Arc<dyn WorkspaceDatabase>,
notify: broadcast::Sender<TrashEvent>, notify: broadcast::Sender<TrashEvent>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
} }
impl TrashController { impl TrashController {
pub fn new(database: Arc<dyn WorkspaceDatabase>, server: Server, user: Arc<dyn WorkspaceUser>) -> Self { pub fn new(
database: Arc<dyn WorkspaceDatabase>,
cloud_service: Arc<dyn CoreCloudService>,
user: Arc<dyn WorkspaceUser>,
) -> Self {
let (tx, _) = broadcast::channel(10); let (tx, _) = broadcast::channel(10);
Self { Self {
database, database,
notify: tx, notify: tx,
server, cloud_service,
user, user,
} }
} }
@ -194,7 +198,7 @@ impl TrashController {
fn create_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> { fn create_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
let token = self.user.token()?; let token = self.user.token()?;
let trash_identifiers = trash.into(); let trash_identifiers = trash.into();
let server = self.server.clone(); let server = self.cloud_service.clone();
// TODO: retry? // TODO: retry?
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
match server.create_trash(&token, trash_identifiers).await { match server.create_trash(&token, trash_identifiers).await {
@ -209,7 +213,7 @@ impl TrashController {
fn delete_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> { fn delete_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
let token = self.user.token()?; let token = self.user.token()?;
let trash_identifiers = trash.into(); let trash_identifiers = trash.into();
let server = self.server.clone(); let server = self.cloud_service.clone();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
match server.delete_trash(&token, trash_identifiers).await { match server.delete_trash(&token, trash_identifiers).await {
Ok(_) => {}, Ok(_) => {},
@ -222,7 +226,7 @@ impl TrashController {
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
fn read_trash_on_server(&self) -> FlowyResult<()> { fn read_trash_on_server(&self) -> FlowyResult<()> {
let token = self.user.token()?; let token = self.user.token()?;
let server = self.server.clone(); let server = self.cloud_service.clone();
let pool = self.database.db_pool()?; let pool = self.database.db_pool()?;
tokio::spawn(async move { tokio::spawn(async move {
@ -255,7 +259,7 @@ impl TrashController {
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
async fn delete_all_trash_on_server(&self) -> FlowyResult<()> { async fn delete_all_trash_on_server(&self) -> FlowyResult<()> {
let token = self.user.token()?; let token = self.user.token()?;
let server = self.server.clone(); let server = self.cloud_service.clone();
server.delete_trash(&token, RepeatedTrashId::all()).await server.delete_trash(&token, RepeatedTrashId::all()).await
} }
} }

View file

@ -13,10 +13,9 @@ use crate::{
view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId}, view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId},
}, },
errors::{FlowyError, FlowyResult}, errors::{FlowyError, FlowyResult},
module::{WorkspaceDatabase, WorkspaceUser}, module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser},
notify::{send_dart_notification, WorkspaceNotification}, notify::{send_dart_notification, WorkspaceNotification},
services::{ services::{
server::Server,
view::sql::{ViewTable, ViewTableChangeset, ViewTableSql}, view::sql::{ViewTable, ViewTableChangeset, ViewTableSql},
TrashController, TrashController,
TrashEvent, TrashEvent,
@ -31,7 +30,7 @@ const LATEST_VIEW_ID: &str = "latest_view_id";
pub(crate) struct ViewController { pub(crate) struct ViewController {
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
database: Arc<dyn WorkspaceDatabase>, database: Arc<dyn WorkspaceDatabase>,
trash_controller: Arc<TrashController>, trash_controller: Arc<TrashController>,
document_ctx: Arc<DocumentContext>, document_ctx: Arc<DocumentContext>,
@ -41,13 +40,13 @@ impl ViewController {
pub(crate) fn new( pub(crate) fn new(
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>, database: Arc<dyn WorkspaceDatabase>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
trash_can: Arc<TrashController>, trash_can: Arc<TrashController>,
document_ctx: Arc<DocumentContext>, document_ctx: Arc<DocumentContext>,
) -> Self { ) -> Self {
Self { Self {
user, user,
server, cloud_service,
database, database,
trash_controller: trash_can, trash_controller: trash_can,
document_ctx, document_ctx,
@ -238,14 +237,14 @@ impl ViewController {
#[tracing::instrument(skip(self), err)] #[tracing::instrument(skip(self), err)]
async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, FlowyError> { async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, FlowyError> {
let token = self.user.token()?; let token = self.user.token()?;
let view = self.server.create_view(&token, params).await?; let view = self.cloud_service.create_view(&token, params).await?;
Ok(view) Ok(view)
} }
#[tracing::instrument(skip(self), err)] #[tracing::instrument(skip(self), err)]
fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> { fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
let token = self.user.token()?; let token = self.user.token()?;
let server = self.server.clone(); let server = self.cloud_service.clone();
tokio::spawn(async move { tokio::spawn(async move {
match server.update_view(&token, params).await { match server.update_view(&token, params).await {
Ok(_) => {}, Ok(_) => {},
@ -261,7 +260,7 @@ impl ViewController {
#[tracing::instrument(skip(self), err)] #[tracing::instrument(skip(self), err)]
fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> { fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> {
let token = self.user.token()?; let token = self.user.token()?;
let server = self.server.clone(); let server = self.cloud_service.clone();
let pool = self.database.db_pool()?; let pool = self.database.db_pool()?;
// TODO: Retry with RetryAction? // TODO: Retry with RetryAction?
tokio::spawn(async move { tokio::spawn(async move {

View file

@ -1,10 +1,9 @@
use crate::{ use crate::{
errors::*, errors::*,
module::{WorkspaceDatabase, WorkspaceUser}, module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser},
notify::*, notify::*,
services::{ services::{
read_local_workspace_apps, read_local_workspace_apps,
server::Server,
workspace::sql::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql}, workspace::sql::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
TrashController, TrashController,
}, },
@ -17,7 +16,7 @@ pub struct WorkspaceController {
pub user: Arc<dyn WorkspaceUser>, pub user: Arc<dyn WorkspaceUser>,
pub(crate) database: Arc<dyn WorkspaceDatabase>, pub(crate) database: Arc<dyn WorkspaceDatabase>,
pub(crate) trash_controller: Arc<TrashController>, pub(crate) trash_controller: Arc<TrashController>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
} }
impl WorkspaceController { impl WorkspaceController {
@ -25,13 +24,13 @@ impl WorkspaceController {
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>, database: Arc<dyn WorkspaceDatabase>,
trash_can: Arc<TrashController>, trash_can: Arc<TrashController>,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
) -> Self { ) -> Self {
Self { Self {
user, user,
database, database,
trash_controller: trash_can, trash_controller: trash_can,
server, cloud_service,
} }
} }
@ -182,13 +181,13 @@ impl WorkspaceController {
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
async fn create_workspace_on_server(&self, params: CreateWorkspaceParams) -> Result<Workspace, FlowyError> { async fn create_workspace_on_server(&self, params: CreateWorkspaceParams) -> Result<Workspace, FlowyError> {
let token = self.user.token()?; let token = self.user.token()?;
let workspace = self.server.create_workspace(&token, params).await?; let workspace = self.cloud_service.create_workspace(&token, params).await?;
Ok(workspace) Ok(workspace)
} }
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), FlowyError> { fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), FlowyError> {
let (token, server) = (self.user.token()?, self.server.clone()); let (token, server) = (self.user.token()?, self.cloud_service.clone());
tokio::spawn(async move { tokio::spawn(async move {
match server.update_workspace(&token, params).await { match server.update_workspace(&token, params).await {
Ok(_) => {}, Ok(_) => {},
@ -206,7 +205,7 @@ impl WorkspaceController {
let params = WorkspaceId { let params = WorkspaceId {
workspace_id: Some(workspace_id.to_string()), workspace_id: Some(workspace_id.to_string()),
}; };
let (token, server) = (self.user.token()?, self.server.clone()); let (token, server) = (self.user.token()?, self.cloud_service.clone());
tokio::spawn(async move { tokio::spawn(async move {
match server.delete_workspace(&token, params).await { match server.delete_workspace(&token, params).await {
Ok(_) => {}, Ok(_) => {},

View file

@ -1,5 +1,5 @@
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
use crate::{module::WorkspaceUser, services::server::Server}; use crate::module::{CoreCloudService, WorkspaceUser};
use lib_infra::retry::Action; use lib_infra::retry::Action;
use pin_project::pin_project; use pin_project::pin_project;
use std::{ use std::{
@ -10,12 +10,12 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
pub(crate) type Builder<Fut> = Box<dyn Fn(String, Server) -> Fut + Send + Sync>; pub(crate) type Builder<Fut> = Box<dyn Fn(String, Arc<dyn CoreCloudService>) -> Fut + Send + Sync>;
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) struct RetryAction<Fut, T, E> { pub(crate) struct RetryAction<Fut, T, E> {
token: String, token: String,
server: Server, cloud_service: Arc<dyn CoreCloudService>,
user: Arc<dyn WorkspaceUser>, user: Arc<dyn WorkspaceUser>,
builder: Builder<Fut>, builder: Builder<Fut>,
phantom: PhantomData<(T, E)>, phantom: PhantomData<(T, E)>,
@ -23,15 +23,15 @@ pub(crate) struct RetryAction<Fut, T, E> {
impl<Fut, T, E> RetryAction<Fut, T, E> { impl<Fut, T, E> RetryAction<Fut, T, E> {
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) fn new<F>(server: Server, user: Arc<dyn WorkspaceUser>, builder: F) -> Self pub(crate) fn new<F>(cloud_service: Arc<dyn CoreCloudService>, user: Arc<dyn WorkspaceUser>, builder: F) -> Self
where where
Fut: Future<Output = Result<T, E>> + Send + Sync + 'static, Fut: Future<Output = Result<T, E>> + Send + Sync + 'static,
F: Fn(String, Server) -> Fut + Send + Sync + 'static, F: Fn(String, Arc<dyn CoreCloudService>) -> Fut + Send + Sync + 'static,
{ {
let token = user.token().unwrap_or_else(|_| "".to_owned()); let token = user.token().unwrap_or_else(|_| "".to_owned());
Self { Self {
token, token,
server, cloud_service,
user, user,
builder: Box::new(builder), builder: Box::new(builder),
phantom: PhantomData, phantom: PhantomData,
@ -50,7 +50,7 @@ where
type Error = E; type Error = E;
fn run(&mut self) -> Self::Future { fn run(&mut self) -> Self::Future {
let fut = (self.builder)(self.token.clone(), self.server.clone()); let fut = (self.builder)(self.token.clone(), self.cloud_service.clone());
Box::pin(RetryActionFut { fut: Box::pin(fut) }) Box::pin(RetryActionFut { fut: Box::pin(fut) })
} }
} }

View file

@ -9,7 +9,6 @@ edition = "2018"
[dependencies] [dependencies]
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" } flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-derive = { path = "../../../shared-lib/flowy-derive" }
backend-service = { path = "../../../shared-lib/backend-service" }
lib-ot = { path = "../../../shared-lib/lib-ot" } lib-ot = { path = "../../../shared-lib/lib-ot" }
lib-ws = { path = "../../../shared-lib/lib-ws" } lib-ws = { path = "../../../shared-lib/lib-ws" }
lib-infra = { path = "../../../shared-lib/lib-infra" } lib-infra = { path = "../../../shared-lib/lib-infra" }
@ -24,7 +23,6 @@ diesel = {version = "1.4.8", features = ["sqlite"]}
diesel_derives = {version = "1.4.1", features = ["sqlite"]} diesel_derives = {version = "1.4.1", features = ["sqlite"]}
protobuf = {version = "2.18.0"} protobuf = {version = "2.18.0"}
unicode-segmentation = "1.8" unicode-segmentation = "1.8"
lazy_static = "1.4.0"
log = "0.4.14" log = "0.4.14"
tokio = {version = "1", features = ["sync"]} tokio = {version = "1", features = ["sync"]}
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }
@ -38,7 +36,6 @@ url = "2.2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = {version = "1.0"} serde_json = {version = "1.0"}
chrono = "0.4.19" chrono = "0.4.19"
futures-core = { version = "0.3", default-features = false }
futures-util = "0.3.15" futures-util = "0.3.15"
byteorder = {version = "1.3.4"} byteorder = {version = "1.3.4"}
async-stream = "0.3.2" async-stream = "0.3.2"
@ -52,7 +49,6 @@ flowy-net = { path = "../flowy-net" }
color-eyre = { version = "0.5", default-features = false } color-eyre = { version = "0.5", default-features = false }
criterion = "0.3" criterion = "0.3"
rand = "0.7.3" rand = "0.7.3"
env_logger = "0.8.2"
[features] [features]

View file

@ -1,10 +1,8 @@
use crate::errors::FlowyError;
use backend_service::configuration::ClientServerConfiguration;
use crate::{ use crate::{
controller::DocumentController, controller::DocumentController,
core::{DocumentWSReceivers, DocumentWebSocket}, core::{DocumentWSReceivers, DocumentWebSocket},
server::construct_doc_server, errors::FlowyError,
DocumentCloudService,
}; };
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use std::sync::Arc; use std::sync::Arc;
@ -26,10 +24,14 @@ impl DocumentContext {
user: Arc<dyn DocumentUser>, user: Arc<dyn DocumentUser>,
ws_receivers: Arc<DocumentWSReceivers>, ws_receivers: Arc<DocumentWSReceivers>,
ws_sender: Arc<dyn DocumentWebSocket>, ws_sender: Arc<dyn DocumentWebSocket>,
server_config: &ClientServerConfiguration, cloud_service: Arc<dyn DocumentCloudService>,
) -> DocumentContext { ) -> DocumentContext {
let server = construct_doc_server(server_config); let doc_ctrl = Arc::new(DocumentController::new(
let doc_ctrl = Arc::new(DocumentController::new(server, user.clone(), ws_receivers, ws_sender)); cloud_service,
user.clone(),
ws_receivers,
ws_sender,
));
Self { Self {
controller: doc_ctrl, controller: doc_ctrl,
user, user,

View file

@ -8,7 +8,7 @@ use crate::{
WSStateReceiver, WSStateReceiver,
}, },
errors::FlowyError, errors::FlowyError,
server::Server, DocumentCloudService,
}; };
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
@ -22,7 +22,7 @@ use lib_infra::future::FutureResult;
use std::sync::Arc; use std::sync::Arc;
pub struct DocumentController { pub struct DocumentController {
server: Server, cloud_service: Arc<dyn DocumentCloudService>,
ws_receivers: Arc<DocumentWSReceivers>, ws_receivers: Arc<DocumentWSReceivers>,
ws_sender: Arc<dyn DocumentWebSocket>, ws_sender: Arc<dyn DocumentWebSocket>,
open_cache: Arc<OpenDocCache>, open_cache: Arc<OpenDocCache>,
@ -31,14 +31,14 @@ pub struct DocumentController {
impl DocumentController { impl DocumentController {
pub(crate) fn new( pub(crate) fn new(
server: Server, cloud_service: Arc<dyn DocumentCloudService>,
user: Arc<dyn DocumentUser>, user: Arc<dyn DocumentUser>,
ws_receivers: Arc<DocumentWSReceivers>, ws_receivers: Arc<DocumentWSReceivers>,
ws_sender: Arc<dyn DocumentWebSocket>, ws_sender: Arc<dyn DocumentWebSocket>,
) -> Self { ) -> Self {
let open_cache = Arc::new(OpenDocCache::new()); let open_cache = Arc::new(OpenDocCache::new());
Self { Self {
server, cloud_service,
ws_receivers, ws_receivers,
ws_sender, ws_sender,
open_cache, open_cache,
@ -119,7 +119,7 @@ impl DocumentController {
let rev_manager = self.make_rev_manager(doc_id, pool.clone())?; let rev_manager = self.make_rev_manager(doc_id, pool.clone())?;
let server = Arc::new(RevisionServerImpl { let server = Arc::new(RevisionServerImpl {
token, token,
server: self.server.clone(), server: self.cloud_service.clone(),
}); });
let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?; let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?;
self.ws_receivers.add(doc_id, doc_editor.ws_handler()); self.ws_receivers.add(doc_id, doc_editor.ws_handler());
@ -136,7 +136,7 @@ impl DocumentController {
struct RevisionServerImpl { struct RevisionServerImpl {
token: String, token: String,
server: Server, server: Arc<dyn DocumentCloudService>,
} }
impl RevisionServer for RevisionServerImpl { impl RevisionServer for RevisionServerImpl {
@ -149,7 +149,7 @@ impl RevisionServer for RevisionServerImpl {
let token = self.token.clone(); let token = self.token.clone();
FutureResult::new(async move { FutureResult::new(async move {
match server.read_doc(&token, params).await? { match server.read_document(&token, params).await? {
None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")), None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
Some(doc) => Ok(doc), Some(doc) => Ok(doc),
} }

View file

@ -3,7 +3,6 @@ pub(crate) mod controller;
pub mod core; pub mod core;
mod notify; mod notify;
pub mod protobuf; pub mod protobuf;
pub mod server;
mod ws_receivers; mod ws_receivers;
#[macro_use] #[macro_use]
@ -12,3 +11,15 @@ extern crate flowy_database;
pub mod errors { pub mod errors {
pub use flowy_error::{internal_error, ErrorCode, FlowyError}; pub use flowy_error::{internal_error, ErrorCode, FlowyError};
} }
use crate::errors::FlowyError;
use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams};
use lib_infra::future::FutureResult;
pub trait DocumentCloudService: Send + Sync {
fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError>;
fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError>;
fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError>;
}

View file

@ -1,29 +0,0 @@
use backend_service::{request::ResponseMiddleware, response::FlowyResponse};
use lazy_static::lazy_static;
use std::sync::Arc;
lazy_static! {
pub(crate) static ref MIDDLEWARE: Arc<DocMiddleware> = Arc::new(DocMiddleware {});
}
pub(crate) struct DocMiddleware {}
impl ResponseMiddleware for DocMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {
log::error!("document user is unauthorized");
match token {
None => {},
Some(_token) => {
// let error =
// FlowyError::new(ErrorCode::UserUnauthorized, "");
// observable(token,
// WorkspaceObservable::UserUnauthorized).error(error).
// build()
},
}
}
}
}
}

View file

@ -1,31 +0,0 @@
mod middleware;
mod server_api;
mod server_api_mock;
pub use server_api::*;
// TODO: ignore mock files in production
use crate::errors::FlowyError;
use backend_service::configuration::ClientServerConfiguration;
use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams};
use lib_infra::future::FutureResult;
pub use server_api_mock::*;
use std::sync::Arc;
pub(crate) type Server = Arc<dyn DocumentServerAPI + Send + Sync>;
pub trait DocumentServerAPI {
fn create_doc(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError>;
fn read_doc(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError>;
fn update_doc(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError>;
}
pub(crate) fn construct_doc_server(
server_config: &ClientServerConfiguration,
) -> Arc<dyn DocumentServerAPI + Send + Sync> {
if cfg!(feature = "http_server") {
Arc::new(DocServer::new(server_config.clone()))
} else {
Arc::new(DocServerMock {})
}
}

View file

@ -1,67 +0,0 @@
use crate::{errors::FlowyError, server::DocumentServerAPI};
use backend_service::{configuration::*, request::HttpRequestBuilder};
use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams};
use lib_infra::future::FutureResult;
pub struct DocServer {
config: ClientServerConfiguration,
}
impl DocServer {
pub fn new(config: ClientServerConfiguration) -> Self { Self { config } }
}
impl DocumentServerAPI for DocServer {
fn create_doc(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { create_doc_request(&token, params, &url).await })
}
fn read_doc(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { read_doc_request(&token, params, &url).await })
}
fn update_doc(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { reset_doc_request(&token, params, &url).await })
}
}
pub(crate) fn request_builder() -> HttpRequestBuilder {
HttpRequestBuilder::new().middleware(super::middleware::MIDDLEWARE.clone())
}
pub async fn create_doc_request(token: &str, params: CreateDocParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn read_doc_request(token: &str, params: DocumentId, url: &str) -> Result<Option<DocumentInfo>, FlowyError> {
let doc = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.option_response()
.await?;
Ok(doc)
}
pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}

View file

@ -1,29 +0,0 @@
use flowy_collaboration::{
document::default::initial_delta_string,
entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
};
use lib_infra::future::FutureResult;
use crate::{errors::FlowyError, server::DocumentServerAPI};
pub struct DocServerMock {}
impl DocumentServerAPI for DocServerMock {
fn create_doc(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn read_doc(&self, _token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let doc = DocumentInfo {
doc_id: params.doc_id,
text: initial_delta_string(),
rev_id: 0,
base_rev_id: 0,
};
FutureResult::new(async { Ok(Some(doc)) })
}
fn update_doc(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}

View file

@ -91,9 +91,8 @@ impl TestBuilder {
pub fn new() -> Self { pub fn new() -> Self {
static INIT: Once = Once::new(); static INIT: Once = Once::new();
INIT.call_once(|| { INIT.call_once(|| {
color_eyre::install().unwrap(); let _ = color_eyre::install();
std::env::set_var("RUST_LOG", LEVEL); std::env::set_var("RUST_LOG", LEVEL);
env_logger::init();
}); });
Self { Self {

View file

@ -10,6 +10,10 @@ lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error" } flowy-error = { path = "../flowy-error" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-derive = { path = "../../../shared-lib/flowy-derive" }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"} flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"}
backend-service = { path = "../../../shared-lib/backend-service" }
flowy-core-data-model = { path = "../../../shared-lib/flowy-core-data-model" }
flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model"}
lazy_static = "1.4.0"
lib-infra = { path = "../../../shared-lib/lib-infra" } lib-infra = { path = "../../../shared-lib/lib-infra" }
protobuf = {version = "2.18.0"} protobuf = {version = "2.18.0"}
lib-ws = { path = "../../../shared-lib/lib-ws" } lib-ws = { path = "../../../shared-lib/lib-ws" }

View file

@ -0,0 +1,476 @@
use backend_service::{
configuration::{ClientServerConfiguration, HEADER_TOKEN},
errors::ServerError,
request::{HttpRequestBuilder, ResponseMiddleware},
response::FlowyResponse,
};
use flowy_core_data_model::entities::{
app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams},
trash::{RepeatedTrash, RepeatedTrashId},
view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId},
workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId},
};
use flowy_error::FlowyError;
use lazy_static::lazy_static;
use lib_infra::{future::FutureResult, timestamp, uuid_string};
use std::sync::Arc;
use tokio::sync::broadcast;
pub struct CoreHttpCloudService {
config: ClientServerConfiguration,
}
impl CoreHttpCloudService {
pub fn new(config: ClientServerConfiguration) -> CoreHttpCloudService { Self { config } }
}
impl CoreHttpCloudService {
pub fn init(&self) {}
pub fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let workspace = create_workspace_request(&token, params, &url).await?;
Ok(workspace)
})
}
pub fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let repeated_workspace = read_workspaces_request(&token, params, &url).await?;
Ok(repeated_workspace)
})
}
pub fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let _ = update_workspace_request(&token, params, &url).await?;
Ok(())
})
}
pub fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.workspace_url();
FutureResult::new(async move {
let _ = delete_workspace_request(&token, params, &url).await?;
Ok(())
})
}
pub fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let view = create_view_request(&token, params, &url).await?;
Ok(view)
})
}
pub fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let view = read_view_request(&token, params, &url).await?;
Ok(view)
})
}
pub fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let _ = delete_view_request(&token, params, &url).await?;
Ok(())
})
}
pub fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.view_url();
FutureResult::new(async move {
let _ = update_view_request(&token, params, &url).await?;
Ok(())
})
}
pub fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let app = create_app_request(&token, params, &url).await?;
Ok(app)
})
}
pub fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let app = read_app_request(&token, params, &url).await?;
Ok(app)
})
}
pub fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let _ = update_app_request(&token, params, &url).await?;
Ok(())
})
}
pub fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.app_url();
FutureResult::new(async move {
let _ = delete_app_request(&token, params, &url).await?;
Ok(())
})
}
pub fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
let _ = create_trash_request(&token, params, &url).await?;
Ok(())
})
}
pub fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
let _ = delete_trash_request(&token, params, &url).await?;
Ok(())
})
}
pub fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
let token = token.to_owned();
let url = self.config.trash_url();
FutureResult::new(async move {
let repeated_trash = read_trash_request(&token, &url).await?;
Ok(repeated_trash)
})
}
}
pub struct CoreLocalCloudService {}
impl CoreLocalCloudService {
pub fn new(_config: &ClientServerConfiguration) -> Self { CoreLocalCloudService {} }
}
impl CoreLocalCloudService {
pub fn init(&self) {}
pub fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
let time = timestamp();
let workspace = Workspace {
id: uuid_string(),
name: params.name,
desc: params.desc,
apps: RepeatedApp::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(workspace) })
}
pub fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
FutureResult::new(async {
let repeated_workspace = RepeatedWorkspace { items: vec![] };
Ok(repeated_workspace)
})
}
pub fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
let time = timestamp();
let view = View {
id: params.view_id,
belong_to_id: params.belong_to_id,
name: params.name,
desc: params.desc,
view_type: params.view_type,
version: 0,
belongings: RepeatedView::default(),
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(view) })
}
pub fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult<Option<View>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
pub fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
let time = timestamp();
let app = App {
id: uuid_string(),
workspace_id: params.workspace_id,
name: params.name,
desc: params.desc,
belongings: RepeatedView::default(),
version: 0,
modified_time: time,
create_time: time,
};
FutureResult::new(async { Ok(app) })
}
pub fn read_app(&self, _token: &str, _params: AppId) -> FutureResult<Option<App>, FlowyError> {
FutureResult::new(async { Ok(None) })
}
pub fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn read_trash(&self, _token: &str) -> FutureResult<RepeatedTrash, FlowyError> {
FutureResult::new(async {
let repeated_trash = RepeatedTrash { items: vec![] };
Ok(repeated_trash)
})
}
}
lazy_static! {
static ref MIDDLEWARE: Arc<CoreResponseMiddleware> = Arc::new(CoreResponseMiddleware::new());
}
pub struct CoreResponseMiddleware {
invalid_token_sender: broadcast::Sender<String>,
}
impl CoreResponseMiddleware {
fn new() -> Self {
let (sender, _) = broadcast::channel(10);
CoreResponseMiddleware {
invalid_token_sender: sender,
}
}
pub fn invalid_token_subscribe(&self) -> broadcast::Receiver<String> { self.invalid_token_sender.subscribe() }
}
impl ResponseMiddleware for CoreResponseMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {
tracing::error!("user is unauthorized");
match token {
None => {},
Some(token) => match self.invalid_token_sender.send(token.clone()) {
Ok(_) => {},
Err(e) => tracing::error!("{:?}", e),
},
}
}
}
}
}
fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new().middleware(MIDDLEWARE.clone()) }
pub async fn create_workspace_request(
token: &str,
params: CreateWorkspaceParams,
url: &str,
) -> Result<Workspace, ServerError> {
let workspace = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response()
.await?;
Ok(workspace)
}
pub async fn read_workspaces_request(
token: &str,
params: WorkspaceId,
url: &str,
) -> Result<RepeatedWorkspace, ServerError> {
let repeated_workspace = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response::<RepeatedWorkspace>()
.await?;
Ok(repeated_workspace)
}
pub async fn update_workspace_request(
token: &str,
params: UpdateWorkspaceParams,
url: &str,
) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_workspace_request(token: &str, params: WorkspaceId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(url)
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
// App
pub async fn create_app_request(token: &str, params: CreateAppParams, url: &str) -> Result<App, ServerError> {
let app = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response()
.await?;
Ok(app)
}
pub async fn read_app_request(token: &str, params: AppId, url: &str) -> Result<Option<App>, ServerError> {
let app = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.option_response()
.await?;
Ok(app)
}
pub async fn update_app_request(token: &str, params: UpdateAppParams, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_app_request(token: &str, params: AppId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
// View
pub async fn create_view_request(token: &str, params: CreateViewParams, url: &str) -> Result<View, ServerError> {
let view = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response()
.await?;
Ok(view)
}
pub async fn read_view_request(token: &str, params: ViewId, url: &str) -> Result<Option<View>, ServerError> {
let view = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.option_response()
.await?;
Ok(view)
}
pub async fn update_view_request(token: &str, params: UpdateViewParams, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_view_request(token: &str, params: RepeatedViewId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn create_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn read_trash_request(token: &str, url: &str) -> Result<RepeatedTrash, ServerError> {
let repeated_trash = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.response::<RepeatedTrash>()
.await?;
Ok(repeated_trash)
}

View file

@ -0,0 +1,134 @@
use backend_service::{
configuration::*,
request::{HttpRequestBuilder, ResponseMiddleware},
response::FlowyResponse,
};
use flowy_collaboration::{
document::default::initial_delta_string,
entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
};
use flowy_error::FlowyError;
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
use std::sync::Arc;
pub struct DocumentHttpCloudService {
config: ClientServerConfiguration,
}
impl DocumentHttpCloudService {
pub fn new(config: ClientServerConfiguration) -> Self { Self { config } }
}
impl DocumentHttpCloudService {
pub fn create_document_request(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { create_document_request(&token, params, &url).await })
}
pub fn read_document_request(
&self,
token: &str,
params: DocumentId,
) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { read_document_request(&token, params, &url).await })
}
pub fn update_document_request(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { reset_doc_request(&token, params, &url).await })
}
}
pub struct DocumentLocalCloudService {}
impl DocumentLocalCloudService {
pub fn create_document_request(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn read_document_request(
&self,
_token: &str,
params: DocumentId,
) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let doc = DocumentInfo {
doc_id: params.doc_id,
text: initial_delta_string(),
rev_id: 0,
base_rev_id: 0,
};
FutureResult::new(async { Ok(Some(doc)) })
}
pub fn update_document_request(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}
pub async fn create_document_request(token: &str, params: CreateDocParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn read_document_request(
token: &str,
params: DocumentId,
url: &str,
) -> Result<Option<DocumentInfo>, FlowyError> {
let doc = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.option_response()
.await?;
Ok(doc)
}
pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new().middleware(MIDDLEWARE.clone()) }
lazy_static! {
pub(crate) static ref MIDDLEWARE: Arc<DocumentResponseMiddleware> = Arc::new(DocumentResponseMiddleware {});
}
pub(crate) struct DocumentResponseMiddleware {}
impl ResponseMiddleware for DocumentResponseMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {
tracing::error!("document user is unauthorized");
match token {
None => {},
Some(_token) => {
// let error =
// FlowyError::new(ErrorCode::UserUnauthorized, "");
// observable(token,
// WorkspaceObservable::UserUnauthorized).error(error).
// build()
},
}
}
}
}
}

View file

@ -0,0 +1,3 @@
pub mod core;
pub mod document;
pub mod user;

View file

@ -0,0 +1,155 @@
use backend_service::{configuration::*, errors::ServerError, request::HttpRequestBuilder};
use flowy_error::FlowyError;
use flowy_user_data_model::entities::{
SignInParams,
SignInResponse,
SignUpParams,
SignUpResponse,
UpdateUserParams,
UserProfile,
};
use lib_infra::{future::FutureResult, uuid_string};
pub struct UserHttpCloudService {
config: ClientServerConfiguration,
}
impl UserHttpCloudService {
pub fn new(config: &ClientServerConfiguration) -> Self { Self { config: config.clone() } }
}
impl UserHttpCloudService {
pub fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
let url = self.config.sign_up_url();
FutureResult::new(async move {
let resp = user_sign_up_request(params, &url).await?;
Ok(resp)
})
}
pub fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
let url = self.config.sign_in_url();
FutureResult::new(async move {
let resp = user_sign_in_request(params, &url).await?;
Ok(resp)
})
}
pub fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.sign_out_url();
FutureResult::new(async move {
let _ = user_sign_out_request(&token, &url).await;
Ok(())
})
}
pub fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.user_profile_url();
FutureResult::new(async move {
let _ = update_user_profile_request(&token, params, &url).await?;
Ok(())
})
}
pub fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> {
let token = token.to_owned();
let url = self.config.user_profile_url();
FutureResult::new(async move {
let profile = get_user_profile_request(&token, &url).await?;
Ok(profile)
})
}
pub fn ws_addr(&self) -> String { self.config.ws_addr() }
}
pub struct UserLocalCloudService();
impl UserLocalCloudService {
pub fn new(_config: &ClientServerConfiguration) -> Self { Self() }
}
impl UserLocalCloudService {
pub fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
let uid = uuid_string();
FutureResult::new(async move {
Ok(SignUpResponse {
user_id: uid.clone(),
name: params.name,
email: params.email,
token: uid,
})
})
}
pub fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
let user_id = uuid_string();
FutureResult::new(async {
Ok(SignInResponse {
user_id: user_id.clone(),
name: params.name,
email: params.email,
token: user_id,
})
})
}
pub fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
pub fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
pub fn get_user(&self, _token: &str) -> FutureResult<UserProfile, FlowyError> {
FutureResult::new(async { Ok(UserProfile::default()) })
}
pub fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() }
}
pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result<SignUpResponse, ServerError> {
let response = request_builder()
.post(&url.to_owned())
.protobuf(params)?
.response()
.await?;
Ok(response)
}
pub async fn user_sign_in_request(params: SignInParams, url: &str) -> Result<SignInResponse, ServerError> {
let response = request_builder()
.post(&url.to_owned())
.protobuf(params)?
.response()
.await?;
Ok(response)
}
pub async fn user_sign_out_request(token: &str, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.send()
.await?;
Ok(())
}
pub async fn get_user_profile_request(token: &str, url: &str) -> Result<UserProfile, ServerError> {
let user_profile = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.response()
.await?;
Ok(user_profile)
}
pub async fn update_user_profile_request(token: &str, params: UpdateUserParams, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new() }

View file

@ -1,3 +1,4 @@
pub mod cloud;
pub mod entities; pub mod entities;
mod event; mod event;
mod handlers; mod handlers;

View file

@ -13,7 +13,7 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender};
pub struct LocalDocumentServer { pub struct LocalDocumentServer {
pub doc_manager: Arc<ServerDocumentManager>, pub doc_manager: Arc<ServerDocumentManager>,
sender: mpsc::UnboundedSender<WebSocketRawMessage>, sender: mpsc::UnboundedSender<WebSocketRawMessage>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
} }
impl LocalDocumentServer { impl LocalDocumentServer {
@ -64,7 +64,7 @@ impl LocalDocumentServer {
struct LocalDocumentUser { struct LocalDocumentUser {
user_id: String, user_id: String,
ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>, ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
} }
impl RevisionUser for LocalDocumentUser { impl RevisionUser for LocalDocumentUser {

View file

@ -30,7 +30,7 @@ impl std::default::Default for LocalServerDocumentPersistence {
} }
} }
impl DocumentPersistence for LocalServerDocumentPersistence { impl ServerDocumentPersistence for LocalServerDocumentPersistence {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> { fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let inner = self.inner.clone(); let inner = self.inner.clone();
let doc_id = doc_id.to_owned(); let doc_id = doc_id.to_owned();

View file

@ -0,0 +1,211 @@
use backend_service::configuration::ClientServerConfiguration;
use flowy_core::{
errors::FlowyError,
module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser},
prelude::{
App,
AppId,
CreateAppParams,
CreateViewParams,
CreateWorkspaceParams,
RepeatedTrash,
RepeatedTrashId,
RepeatedViewId,
RepeatedWorkspace,
UpdateAppParams,
UpdateViewParams,
UpdateWorkspaceParams,
View,
ViewId,
Workspace,
WorkspaceId,
},
};
use flowy_database::ConnectionPool;
use flowy_net::cloud::core::{CoreHttpCloudService, CoreLocalCloudService};
use flowy_user::services::user::UserSession;
use lib_infra::future::FutureResult;
use std::sync::Arc;
pub struct CoreDepsResolver();
impl CoreDepsResolver {
pub fn resolve(
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> (
Arc<dyn WorkspaceUser>,
Arc<dyn WorkspaceDatabase>,
Arc<dyn CoreCloudService>,
) {
let user: Arc<dyn WorkspaceUser> = Arc::new(WorkspaceUserImpl(user_session.clone()));
let database: Arc<dyn WorkspaceDatabase> = Arc::new(WorkspaceDatabaseImpl(user_session));
let cloud_service = make_core_cloud_service(server_config);
(user, database, cloud_service)
}
}
struct WorkspaceDatabaseImpl(Arc<UserSession>);
impl WorkspaceDatabase for WorkspaceDatabaseImpl {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
self.0.db_pool().map_err(|e| FlowyError::internal().context(e))
}
}
struct WorkspaceUserImpl(Arc<UserSession>);
impl WorkspaceUser for WorkspaceUserImpl {
fn user_id(&self) -> Result<String, FlowyError> { self.0.user_id().map_err(|e| FlowyError::internal().context(e)) }
fn token(&self) -> Result<String, FlowyError> { self.0.token().map_err(|e| FlowyError::internal().context(e)) }
}
fn make_core_cloud_service(config: &ClientServerConfiguration) -> Arc<dyn CoreCloudService> {
if cfg!(feature = "http_server") {
Arc::new(CoreHttpCloudServiceAdaptor::new(config))
} else {
Arc::new(CoreLocalCloudServiceAdaptor::new(config))
}
}
struct CoreHttpCloudServiceAdaptor(CoreHttpCloudService);
impl CoreHttpCloudServiceAdaptor {
fn new(config: &ClientServerConfiguration) -> Self { Self(CoreHttpCloudService::new(config.clone())) }
}
impl CoreCloudService for CoreHttpCloudServiceAdaptor {
fn init(&self) {
// let mut rx = BACKEND_API_MIDDLEWARE.invalid_token_subscribe();
// tokio::spawn(async move {
// while let Ok(invalid_token) = rx.recv().await {
// let error = FlowyError::new(ErrorCode::UserUnauthorized, "");
// send_dart_notification(&invalid_token,
// WorkspaceNotification::UserUnauthorized) .error(error)
// .send()
// }
// });
self.0.init()
}
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
self.0.create_workspace(token, params)
}
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
self.0.read_workspace(token, params)
}
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
self.0.update_workspace(token, params)
}
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
self.0.delete_workspace(token, params)
}
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
self.0.create_view(token, params)
}
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
self.0.read_view(token, params)
}
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
self.0.delete_view(token, params)
}
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
self.0.update_view(token, params)
}
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
self.0.create_app(token, params)
}
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
self.0.read_app(token, params)
}
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
self.0.update_app(token, params)
}
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
self.0.delete_app(token, params)
}
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.create_trash(token, params)
}
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.delete_trash(token, params)
}
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> { self.0.read_trash(token) }
}
struct CoreLocalCloudServiceAdaptor(CoreLocalCloudService);
impl CoreLocalCloudServiceAdaptor {
fn new(config: &ClientServerConfiguration) -> Self { Self(CoreLocalCloudService::new(config)) }
}
impl CoreCloudService for CoreLocalCloudServiceAdaptor {
fn init(&self) { self.0.init() }
fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult<Workspace, FlowyError> {
self.0.create_workspace(token, params)
}
fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<RepeatedWorkspace, FlowyError> {
self.0.read_workspace(token, params)
}
fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> {
self.0.update_workspace(token, params)
}
fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> {
self.0.delete_workspace(token, params)
}
fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult<View, FlowyError> {
self.0.create_view(token, params)
}
fn read_view(&self, token: &str, params: ViewId) -> FutureResult<Option<View>, FlowyError> {
self.0.read_view(token, params)
}
fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> {
self.0.delete_view(token, params)
}
fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> {
self.0.update_view(token, params)
}
fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult<App, FlowyError> {
self.0.create_app(token, params)
}
fn read_app(&self, token: &str, params: AppId) -> FutureResult<Option<App>, FlowyError> {
self.0.read_app(token, params)
}
fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> {
self.0.update_app(token, params)
}
fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> {
self.0.delete_app(token, params)
}
fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.create_trash(token, params)
}
fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> {
self.0.delete_trash(token, params)
}
fn read_trash(&self, token: &str) -> FutureResult<RepeatedTrash, FlowyError> { self.0.read_trash(token) }
}

View file

@ -1,50 +1,58 @@
use backend_service::configuration::ClientServerConfiguration;
use bytes::Bytes; use bytes::Bytes;
use flowy_collaboration::entities::ws::DocumentClientWSData; use flowy_collaboration::entities::{
doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
ws::DocumentClientWSData,
};
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use flowy_document::{ use flowy_document::{
context::DocumentUser, context::DocumentUser,
core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver}, core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
errors::{internal_error, FlowyError}, errors::{internal_error, FlowyError},
DocumentCloudService,
};
use flowy_net::{
cloud::document::{DocumentHttpCloudService, DocumentLocalCloudService},
services::ws_conn::FlowyWebSocketConnect,
}; };
use flowy_net::services::ws_conn::FlowyWebSocketConnect;
use flowy_user::services::user::UserSession; use flowy_user::services::user::UserSession;
use lib_infra::future::FutureResult;
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
use std::{convert::TryInto, path::Path, sync::Arc}; use std::{convert::TryInto, path::Path, sync::Arc};
pub struct DocumentDependencies {
pub user: Arc<dyn DocumentUser>,
pub ws_receivers: Arc<DocumentWSReceivers>,
pub ws_sender: Arc<dyn DocumentWebSocket>,
pub cloud_service: Arc<dyn DocumentCloudService>,
}
pub struct DocumentDepsResolver(); pub struct DocumentDepsResolver();
impl DocumentDepsResolver { impl DocumentDepsResolver {
pub fn resolve( pub fn resolve(
ws_conn: Arc<FlowyWebSocketConnect>, ws_conn: Arc<FlowyWebSocketConnect>,
user_session: Arc<UserSession>, user_session: Arc<UserSession>,
) -> ( server_config: &ClientServerConfiguration,
Arc<dyn DocumentUser>, ) -> DocumentDependencies {
Arc<DocumentWSReceivers>, let user = Arc::new(DocumentUserImpl(user_session));
Arc<dyn DocumentWebSocket>, let ws_sender = Arc::new(DocumentWebSocketImpl(ws_conn.clone()));
) {
let user = Arc::new(DocumentUserImpl { user: user_session });
let ws_sender = Arc::new(DocumentWebSocketAdapter {
ws_conn: ws_conn.clone(),
});
let ws_receivers = Arc::new(DocumentWSReceivers::new()); let ws_receivers = Arc::new(DocumentWSReceivers::new());
let receiver = Arc::new(WSMessageReceiverAdaptor(ws_receivers.clone())); let receiver = Arc::new(WSMessageReceiverImpl(ws_receivers.clone()));
ws_conn.add_ws_message_receiver(receiver).unwrap(); ws_conn.add_ws_message_receiver(receiver).unwrap();
(user, ws_receivers, ws_sender) let cloud_service = make_document_cloud_service(server_config);
DocumentDependencies {
user,
ws_receivers,
ws_sender,
cloud_service,
}
} }
} }
struct DocumentUserImpl { struct DocumentUserImpl(Arc<UserSession>);
user: Arc<UserSession>,
}
impl DocumentUserImpl {}
impl DocumentUser for DocumentUserImpl { impl DocumentUser for DocumentUserImpl {
fn user_dir(&self) -> Result<String, FlowyError> { fn user_dir(&self) -> Result<String, FlowyError> {
let dir = self let dir = self.0.user_dir().map_err(|e| FlowyError::unauthorized().context(e))?;
.user
.user_dir()
.map_err(|e| FlowyError::unauthorized().context(e))?;
let doc_dir = format!("{}/document", dir); let doc_dir = format!("{}/document", dir);
if !Path::new(&doc_dir).exists() { if !Path::new(&doc_dir).exists() {
@ -53,35 +61,77 @@ impl DocumentUser for DocumentUserImpl {
Ok(doc_dir) Ok(doc_dir)
} }
fn user_id(&self) -> Result<String, FlowyError> { self.user.user_id() } fn user_id(&self) -> Result<String, FlowyError> { self.0.user_id() }
fn token(&self) -> Result<String, FlowyError> { self.user.token() } fn token(&self) -> Result<String, FlowyError> { self.0.token() }
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> { self.user.db_pool() } fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> { self.0.db_pool() }
} }
struct DocumentWebSocketAdapter { struct DocumentWebSocketImpl(Arc<FlowyWebSocketConnect>);
ws_conn: Arc<FlowyWebSocketConnect>, impl DocumentWebSocket for DocumentWebSocketImpl {
}
impl DocumentWebSocket for DocumentWebSocketAdapter {
fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError> { fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap(); let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage { let msg = WebSocketRawMessage {
module: WSModule::Doc, module: WSModule::Doc,
data: bytes.to_vec(), data: bytes.to_vec(),
}; };
let sender = self.ws_conn.ws_sender()?; let sender = self.0.ws_sender()?;
sender.send(msg).map_err(internal_error)?; sender.send(msg).map_err(internal_error)?;
Ok(()) Ok(())
} }
fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_conn.subscribe_websocket_state() } fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() }
} }
struct WSMessageReceiverAdaptor(Arc<DocumentWSReceivers>); struct WSMessageReceiverImpl(Arc<DocumentWSReceivers>);
impl WSMessageReceiver for WSMessageReceiverImpl {
impl WSMessageReceiver for WSMessageReceiverAdaptor {
fn source(&self) -> WSModule { WSModule::Doc } fn source(&self) -> WSModule { WSModule::Doc }
fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
} }
fn make_document_cloud_service(server_config: &ClientServerConfiguration) -> Arc<dyn DocumentCloudService> {
if cfg!(feature = "http_server") {
Arc::new(DocumentHttpCloudServiceAdaptor::new(server_config.clone()))
} else {
Arc::new(DocumentLocalCloudServiceAdaptor::new())
}
}
struct DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService);
impl DocumentHttpCloudServiceAdaptor {
fn new(config: ClientServerConfiguration) -> Self {
DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService::new(config))
}
}
impl DocumentCloudService for DocumentHttpCloudServiceAdaptor {
fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
self.0.create_document_request(token, params)
}
fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
self.0.read_document_request(token, params)
}
fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
self.0.update_document_request(token, params)
}
}
struct DocumentLocalCloudServiceAdaptor(DocumentLocalCloudService);
impl DocumentLocalCloudServiceAdaptor {
fn new() -> Self { Self(DocumentLocalCloudService {}) }
}
impl DocumentCloudService for DocumentLocalCloudServiceAdaptor {
fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> {
self.0.create_document_request(token, params)
}
fn read_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentInfo>, FlowyError> {
self.0.read_document_request(token, params)
}
fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
self.0.update_document_request(token, params)
}
}

View file

@ -1,5 +1,7 @@
mod core_deps;
mod document_deps; mod document_deps;
mod workspace_deps; mod user_deps;
pub use core_deps::*;
pub use document_deps::*; pub use document_deps::*;
pub use workspace_deps::*; pub use user_deps::*;

View file

@ -0,0 +1,58 @@
use crate::FlowyError;
use backend_service::configuration::ClientServerConfiguration;
use flowy_net::cloud::user::{UserHttpCloudService, UserLocalCloudService};
use flowy_user::{
entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
module::UserCloudService,
};
use lib_infra::future::FutureResult;
use std::sync::Arc;
pub struct UserDepsResolver();
impl UserDepsResolver {
pub fn resolve(server_config: &ClientServerConfiguration) -> Arc<dyn UserCloudService> {
make_user_cloud_service(server_config)
}
}
fn make_user_cloud_service(config: &ClientServerConfiguration) -> Arc<dyn UserCloudService> {
if cfg!(feature = "http_server") {
Arc::new(UserHttpCloudServiceAdaptor(UserHttpCloudService::new(config)))
} else {
Arc::new(UserLocalCloudServiceAdaptor(UserLocalCloudService::new(config)))
}
}
struct UserHttpCloudServiceAdaptor(UserHttpCloudService);
impl UserCloudService for UserHttpCloudServiceAdaptor {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> { self.0.sign_up(params) }
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> { self.0.sign_in(params) }
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) }
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
self.0.update_user(token, params)
}
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> { self.0.get_user(token) }
fn ws_addr(&self) -> String { self.0.ws_addr() }
}
struct UserLocalCloudServiceAdaptor(UserLocalCloudService);
impl UserCloudService for UserLocalCloudServiceAdaptor {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> { self.0.sign_up(params) }
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> { self.0.sign_in(params) }
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) }
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
self.0.update_user(token, params)
}
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> { self.0.get_user(token) }
fn ws_addr(&self) -> String { self.0.ws_addr() }
}

View file

@ -1,49 +0,0 @@
use flowy_core::{
errors::FlowyError,
module::{WorkspaceDatabase, WorkspaceUser},
};
use flowy_database::ConnectionPool;
use flowy_user::services::user::UserSession;
use std::sync::Arc;
pub struct WorkspaceDepsResolver {
inner: Arc<Resolver>,
}
struct Resolver {
pub(crate) user_session: Arc<UserSession>,
}
impl WorkspaceDepsResolver {
pub fn new(user_session: Arc<UserSession>) -> Self {
Self {
inner: Arc::new(Resolver { user_session }),
}
}
pub fn split_into(self) -> (Arc<dyn WorkspaceUser>, Arc<dyn WorkspaceDatabase>) {
let user: Arc<dyn WorkspaceUser> = self.inner.clone();
let database: Arc<dyn WorkspaceDatabase> = self.inner;
(user, database)
}
}
impl WorkspaceDatabase for Resolver {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
self.user_session
.db_pool()
.map_err(|e| FlowyError::internal().context(e))
}
}
impl WorkspaceUser for Resolver {
fn user_id(&self) -> Result<String, FlowyError> {
self.user_session
.user_id()
.map_err(|e| FlowyError::internal().context(e))
}
fn token(&self) -> Result<String, FlowyError> {
self.user_session.token().map_err(|e| FlowyError::internal().context(e))
}
}

View file

@ -1,6 +1,6 @@
mod deps_resolve; mod deps_resolve;
pub mod module; pub mod module;
use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver}; use crate::deps_resolve::*;
use backend_service::configuration::ClientServerConfiguration; use backend_service::configuration::ClientServerConfiguration;
use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core}; use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core};
use flowy_document::context::DocumentContext; use flowy_document::context::DocumentContext;
@ -12,7 +12,7 @@ use flowy_net::{
}, },
}; };
use flowy_user::{ use flowy_user::{
prelude::UserStatus, entities::UserStatus,
services::user::{UserSession, UserSessionConfig}, services::user::{UserSession, UserSessionConfig},
}; };
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
@ -103,7 +103,7 @@ impl FlowySDK {
config.server_config.ws_addr(), config.server_config.ws_addr(),
default_web_socket(), default_web_socket(),
)); ));
let user_session = mk_user_session(&config); let user_session = mk_user_session(&config, &config.server_config);
let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config); let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config);
let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config); let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config);
@ -186,9 +186,9 @@ async fn _listen_user_status(
} }
} }
async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, core: Arc<CoreContext>) { async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, _core: Arc<CoreContext>) {
while let Ok(new_type) = subscribe.recv().await { while let Ok(_new_type) = subscribe.recv().await {
core.network_state_changed(new_type); // core.network_state_changed(new_type);
} }
} }
@ -209,10 +209,11 @@ fn init_log(config: &FlowySDKConfig) {
} }
} }
fn mk_user_session(config: &FlowySDKConfig) -> Arc<UserSession> { fn mk_user_session(config: &FlowySDKConfig, server_config: &ClientServerConfiguration) -> Arc<UserSession> {
let session_cache_key = format!("{}_session_cache", &config.name); let session_cache_key = format!("{}_session_cache", &config.name);
let user_config = UserSessionConfig::new(&config.root, &config.server_config, &session_cache_key); let user_config = UserSessionConfig::new(&config.root, &session_cache_key);
Arc::new(UserSession::new(user_config)) let cloud_service = UserDepsResolver::resolve(server_config);
Arc::new(UserSession::new(user_config, cloud_service))
} }
fn mk_core_context( fn mk_core_context(
@ -220,9 +221,8 @@ fn mk_core_context(
flowy_document: &Arc<DocumentContext>, flowy_document: &Arc<DocumentContext>,
server_config: &ClientServerConfiguration, server_config: &ClientServerConfiguration,
) -> Arc<CoreContext> { ) -> Arc<CoreContext> {
let workspace_deps = WorkspaceDepsResolver::new(user_session.clone()); let (user, database, cloud_service) = CoreDepsResolver::resolve(user_session.clone(), server_config);
let (user, database) = workspace_deps.split_into(); init_core(user, database, flowy_document.clone(), cloud_service)
init_core(user, database, flowy_document.clone(), server_config)
} }
fn default_web_socket() -> Arc<dyn FlowyRawWebSocket> { fn default_web_socket() -> Arc<dyn FlowyRawWebSocket> {
@ -234,10 +234,15 @@ fn default_web_socket() -> Arc<dyn FlowyRawWebSocket> {
} }
pub fn mk_document( pub fn mk_document(
ws_manager: &Arc<FlowyWebSocketConnect>, ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>, user_session: &Arc<UserSession>,
server_config: &ClientServerConfiguration, server_config: &ClientServerConfiguration,
) -> Arc<DocumentContext> { ) -> Arc<DocumentContext> {
let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager.clone(), user_session.clone()); let dependencies = DocumentDepsResolver::resolve(ws_conn.clone(), user_session.clone(), server_config);
Arc::new(DocumentContext::new(user, ws_receivers, ws_sender, server_config)) Arc::new(DocumentContext::new(
dependencies.user,
dependencies.ws_receivers,
dependencies.ws_sender,
dependencies.cloud_service,
))
} }

View file

@ -7,14 +7,11 @@ edition = "2018"
[dependencies] [dependencies]
flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model" } flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model" }
backend-service = { path = "../../../shared-lib/backend-service" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-derive = { path = "../../../shared-lib/flowy-derive" }
lib-infra = { path = "../../../shared-lib/lib-infra" } lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true}
derive_more = {version = "0.99", features = ["display"]} derive_more = {version = "0.99", features = ["display"]}
flowy-database = { path = "../flowy-database" } flowy-database = { path = "../flowy-database" }
flowy-net = { path = "../flowy-net" }
dart-notify = { path = "../dart-notify" } dart-notify = { path = "../dart-notify" }
lib-dispatch = { path = "../lib-dispatch" } lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error", features = ["db", "backend"] } flowy-error = { path = "../flowy-error", features = ["db", "backend"] }

View file

@ -10,10 +10,6 @@ mod sql_tables;
#[macro_use] #[macro_use]
extern crate flowy_database; extern crate flowy_database;
pub mod prelude {
pub use crate::{entities::*, services::server::*};
}
pub mod errors { pub mod errors {
pub use flowy_error::{internal_error, ErrorCode, FlowyError}; pub use flowy_error::{internal_error, ErrorCode, FlowyError};
} }

View file

@ -1,6 +1,13 @@
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use crate::{event::UserEvent, handlers::*, services::user::UserSession}; use crate::{
entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
errors::FlowyError,
event::UserEvent,
handlers::*,
services::user::UserSession,
};
use lib_infra::future::FutureResult;
use std::sync::Arc; use std::sync::Arc;
pub fn create(user_session: Arc<UserSession>) -> Module { pub fn create(user_session: Arc<UserSession>) -> Module {
@ -15,3 +22,12 @@ pub fn create(user_session: Arc<UserSession>) -> Module {
.event(UserEvent::UpdateUser, update_user_handler) .event(UserEvent::UpdateUser, update_user_handler)
.event(UserEvent::CheckUser, check_user_handler) .event(UserEvent::CheckUser, check_user_handler)
} }
pub trait UserCloudService: Send + Sync {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError>;
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError>;
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError>;
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError>;
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError>;
fn ws_addr(&self) -> String;
}

View file

@ -1,2 +1 @@
pub mod server;
pub mod user; pub mod user;

View file

@ -1,31 +0,0 @@
mod server_api;
mod server_api_mock;
pub use server_api::*;
pub use server_api_mock::*;
use std::sync::Arc;
pub(crate) type Server = Arc<dyn UserServerAPI + Send + Sync>;
use crate::{
entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
errors::FlowyError,
};
use backend_service::configuration::ClientServerConfiguration;
use lib_infra::future::FutureResult;
pub trait UserServerAPI {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError>;
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError>;
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError>;
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError>;
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError>;
fn ws_addr(&self) -> String;
}
pub(crate) fn construct_user_server(config: &ClientServerConfiguration) -> Arc<dyn UserServerAPI + Send + Sync> {
if cfg!(feature = "http_server") {
Arc::new(UserHttpServer::new(config.clone()))
} else {
Arc::new(UserServerMock {})
}
}

View file

@ -1,88 +0,0 @@
use crate::{
entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
errors::FlowyError,
services::server::UserServerAPI,
};
use backend_service::{configuration::*, http_request::*};
use lib_infra::future::FutureResult;
pub struct UserHttpServer {
config: ClientServerConfiguration,
}
impl UserHttpServer {
pub fn new(config: ClientServerConfiguration) -> Self { Self { config } }
}
impl UserServerAPI for UserHttpServer {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
let url = self.config.sign_up_url();
FutureResult::new(async move {
let resp = user_sign_up_request(params, &url).await?;
Ok(resp)
})
}
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
let url = self.config.sign_in_url();
FutureResult::new(async move {
let resp = user_sign_in_request(params, &url).await?;
Ok(resp)
})
}
fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.sign_out_url();
FutureResult::new(async move {
let _ = user_sign_out_request(&token, &url).await;
Ok(())
})
}
fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.user_profile_url();
FutureResult::new(async move {
let _ = update_user_profile_request(&token, params, &url).await?;
Ok(())
})
}
fn get_user(&self, token: &str) -> FutureResult<UserProfile, FlowyError> {
let token = token.to_owned();
let url = self.config.user_profile_url();
FutureResult::new(async move {
let profile = get_user_profile_request(&token, &url).await?;
Ok(profile)
})
}
fn ws_addr(&self) -> String { self.config.ws_addr() }
}
// use crate::notify::*;
// use backend_service::response::FlowyResponse;
// use flowy_user_data_model::errors::ErrorCode;
// struct Middleware {}
//
//
//
// impl ResponseMiddleware for Middleware {
// fn receive_response(&self, token: &Option<String>, response:
// &FlowyResponse) { if let Some(error) = &response.error {
// if error.is_unauthorized() {
// log::error!("user unauthorized");
// match token {
// None => {},
// Some(token) => {
// let error =
// FlowyError::new(ErrorCode::UserUnauthorized, "");
// dart_notify(token, UserNotification::UserUnauthorized)
// .error(error) .send()
// },
// }
// }
// }
// }
// }

View file

@ -1,49 +0,0 @@
use crate::{
entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
errors::FlowyError,
};
use crate::services::server::UserServerAPI;
use lib_infra::{future::FutureResult, uuid_string};
pub struct UserServerMock {}
impl UserServerMock {}
impl UserServerAPI for UserServerMock {
fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError> {
let uid = uuid_string();
FutureResult::new(async move {
Ok(SignUpResponse {
user_id: uid.clone(),
name: params.name,
email: params.email,
token: uid,
})
})
}
fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError> {
let user_id = uuid_string();
FutureResult::new(async {
Ok(SignInResponse {
user_id: user_id.clone(),
name: params.name,
email: params.email,
token: user_id,
})
})
}
fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn get_user(&self, _token: &str) -> FutureResult<UserProfile, FlowyError> {
FutureResult::new(async { Ok(UserProfile::default()) })
}
fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() }
}

View file

@ -1,9 +1,3 @@
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::mpsc;
use backend_service::configuration::ClientServerConfiguration;
use flowy_database::{ use flowy_database::{
kv::KV, kv::KV,
query_dsl::*, query_dsl::*,
@ -14,29 +8,29 @@ use flowy_database::{
}; };
use flowy_user_data_model::entities::{SignInResponse, SignUpResponse}; use flowy_user_data_model::entities::{SignInResponse, SignUpResponse};
use lib_sqlite::ConnectionPool; use lib_sqlite::ConnectionPool;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::{ use crate::{
entities::{SignInParams, SignUpParams, UpdateUserParams, UserProfile}, entities::{SignInParams, SignUpParams, UpdateUserParams, UserProfile},
errors::{ErrorCode, FlowyError}, errors::{ErrorCode, FlowyError},
module::UserCloudService,
notify::*, notify::*,
services::{ services::user::{database::UserDB, notifier::UserNotifier},
server::{construct_user_server, Server},
user::{database::UserDB, notifier::UserNotifier},
},
sql_tables::{UserTable, UserTableChangeset}, sql_tables::{UserTable, UserTableChangeset},
}; };
pub struct UserSessionConfig { pub struct UserSessionConfig {
root_dir: String, root_dir: String,
server_config: ClientServerConfiguration,
session_cache_key: String, session_cache_key: String,
} }
impl UserSessionConfig { impl UserSessionConfig {
pub fn new(root_dir: &str, server_config: &ClientServerConfiguration, session_cache_key: &str) -> Self { pub fn new(root_dir: &str, session_cache_key: &str) -> Self {
Self { Self {
root_dir: root_dir.to_owned(), root_dir: root_dir.to_owned(),
server_config: server_config.clone(),
session_cache_key: session_cache_key.to_owned(), session_cache_key: session_cache_key.to_owned(),
} }
} }
@ -45,21 +39,19 @@ impl UserSessionConfig {
pub struct UserSession { pub struct UserSession {
database: UserDB, database: UserDB,
config: UserSessionConfig, config: UserSessionConfig,
#[allow(dead_code)] cloud_service: Arc<dyn UserCloudService>,
server: Server,
session: RwLock<Option<Session>>, session: RwLock<Option<Session>>,
pub notifier: UserNotifier, pub notifier: UserNotifier,
} }
impl UserSession { impl UserSession {
pub fn new(config: UserSessionConfig) -> Self { pub fn new(config: UserSessionConfig, cloud_service: Arc<dyn UserCloudService>) -> Self {
let db = UserDB::new(&config.root_dir); let db = UserDB::new(&config.root_dir);
let server = construct_user_server(&config.server_config);
let notifier = UserNotifier::new(); let notifier = UserNotifier::new();
Self { Self {
database: db, database: db,
config, config,
server, cloud_service,
session: RwLock::new(None), session: RwLock::new(None),
notifier, notifier,
} }
@ -92,7 +84,7 @@ impl UserSession {
if self.is_login(&params.email) { if self.is_login(&params.email) {
self.user_profile().await self.user_profile().await
} else { } else {
let resp = self.server.sign_in(params).await?; let resp = self.cloud_service.sign_in(params).await?;
let session: Session = resp.clone().into(); let session: Session = resp.clone().into();
let _ = self.set_session(Some(session))?; let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?; let user_table = self.save_user(resp.into()).await?;
@ -107,7 +99,7 @@ impl UserSession {
if self.is_login(&params.email) { if self.is_login(&params.email) {
self.user_profile().await self.user_profile().await
} else { } else {
let resp = self.server.sign_up(params).await?; let resp = self.cloud_service.sign_up(params).await?;
let session: Session = resp.clone().into(); let session: Session = resp.clone().into();
let _ = self.set_session(Some(session))?; let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?; let user_table = self.save_user(resp.into()).await?;
@ -180,7 +172,7 @@ impl UserSession {
impl UserSession { impl UserSession {
fn read_user_profile_on_server(&self, token: &str) -> Result<(), FlowyError> { fn read_user_profile_on_server(&self, token: &str) -> Result<(), FlowyError> {
let server = self.server.clone(); let server = self.cloud_service.clone();
let token = token.to_owned(); let token = token.to_owned();
tokio::spawn(async move { tokio::spawn(async move {
match server.get_user(&token).await { match server.get_user(&token).await {
@ -200,7 +192,7 @@ impl UserSession {
} }
async fn update_user_on_server(&self, token: &str, params: UpdateUserParams) -> Result<(), FlowyError> { async fn update_user_on_server(&self, token: &str, params: UpdateUserParams) -> Result<(), FlowyError> {
let server = self.server.clone(); let server = self.cloud_service.clone();
let token = token.to_owned(); let token = token.to_owned();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
match server.update_user(&token, params).await { match server.update_user(&token, params).await {
@ -216,7 +208,7 @@ impl UserSession {
} }
async fn sign_out_on_server(&self, token: &str) -> Result<(), FlowyError> { async fn sign_out_on_server(&self, token: &str) -> Result<(), FlowyError> {
let server = self.server.clone(); let server = self.cloud_service.clone();
let token = token.to_owned(); let token = token.to_owned();
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
match server.sign_out(&token).await { match server.sign_out(&token).await {
@ -273,7 +265,7 @@ impl UserSession {
} }
pub async fn update_user( pub async fn update_user(
_server: Server, _cloud_service: Arc<dyn UserCloudService>,
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
params: UpdateUserParams, params: UpdateUserParams,
) -> Result<(), FlowyError> { ) -> Result<(), FlowyError> {

View file

@ -1,6 +1,7 @@
use crate::helper::*; use crate::helper::*;
use flowy_test::{event_builder::UserModuleEventBuilder, FlowySDKTest}; use flowy_test::{event_builder::UserModuleEventBuilder, FlowySDKTest};
use flowy_user::{errors::ErrorCode, event::UserEvent::*, prelude::*}; use flowy_user::{errors::ErrorCode, event::UserEvent::*};
use flowy_user_data_model::entities::{SignInRequest, SignUpRequest, UserProfile};
#[tokio::test] #[tokio::test]
async fn sign_up_with_invalid_email() { async fn sign_up_with_invalid_email() {

View file

@ -1,6 +1,7 @@
use crate::helper::*; use crate::helper::*;
use flowy_test::{event_builder::UserModuleEventBuilder, FlowySDKTest}; use flowy_test::{event_builder::UserModuleEventBuilder, FlowySDKTest};
use flowy_user::{errors::ErrorCode, event::UserEvent::*, prelude::*}; use flowy_user::{errors::ErrorCode, event::UserEvent::*};
use flowy_user_data_model::entities::{UpdateUserRequest, UserProfile};
use lib_infra::uuid_string; use lib_infra::uuid_string;
use serial_test::*; use serial_test::*;

View file

@ -1,219 +0,0 @@
use crate::{configuration::HEADER_TOKEN, errors::ServerError, request::HttpRequestBuilder};
use flowy_core_data_model::entities::prelude::*;
use flowy_user_data_model::entities::prelude::*;
pub(crate) fn request_builder() -> HttpRequestBuilder {
HttpRequestBuilder::new().middleware(crate::middleware::BACKEND_API_MIDDLEWARE.clone())
}
pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result<SignUpResponse, ServerError> {
let response = request_builder()
.post(&url.to_owned())
.protobuf(params)?
.response()
.await?;
Ok(response)
}
pub async fn user_sign_in_request(params: SignInParams, url: &str) -> Result<SignInResponse, ServerError> {
let response = request_builder()
.post(&url.to_owned())
.protobuf(params)?
.response()
.await?;
Ok(response)
}
pub async fn user_sign_out_request(token: &str, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.send()
.await?;
Ok(())
}
pub async fn get_user_profile_request(token: &str, url: &str) -> Result<UserProfile, ServerError> {
let user_profile = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.response()
.await?;
Ok(user_profile)
}
pub async fn update_user_profile_request(token: &str, params: UpdateUserParams, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn create_workspace_request(
token: &str,
params: CreateWorkspaceParams,
url: &str,
) -> Result<Workspace, ServerError> {
let workspace = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response()
.await?;
Ok(workspace)
}
pub async fn read_workspaces_request(
token: &str,
params: WorkspaceId,
url: &str,
) -> Result<RepeatedWorkspace, ServerError> {
let repeated_workspace = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response::<RepeatedWorkspace>()
.await?;
Ok(repeated_workspace)
}
pub async fn update_workspace_request(
token: &str,
params: UpdateWorkspaceParams,
url: &str,
) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_workspace_request(token: &str, params: WorkspaceId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(url)
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
// App
pub async fn create_app_request(token: &str, params: CreateAppParams, url: &str) -> Result<App, ServerError> {
let app = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response()
.await?;
Ok(app)
}
pub async fn read_app_request(token: &str, params: AppId, url: &str) -> Result<Option<App>, ServerError> {
let app = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.option_response()
.await?;
Ok(app)
}
pub async fn update_app_request(token: &str, params: UpdateAppParams, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_app_request(token: &str, params: AppId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
// View
pub async fn create_view_request(token: &str, params: CreateViewParams, url: &str) -> Result<View, ServerError> {
let view = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.response()
.await?;
Ok(view)
}
pub async fn read_view_request(token: &str, params: ViewId, url: &str) -> Result<Option<View>, ServerError> {
let view = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.option_response()
.await?;
Ok(view)
}
pub async fn update_view_request(token: &str, params: UpdateViewParams, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_view_request(token: &str, params: RepeatedViewId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn create_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.post(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn delete_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> {
let _ = request_builder()
.delete(&url.to_owned())
.header(HEADER_TOKEN, token)
.protobuf(params)?
.send()
.await?;
Ok(())
}
pub async fn read_trash_request(token: &str, url: &str) -> Result<RepeatedTrash, ServerError> {
let repeated_trash = request_builder()
.get(&url.to_owned())
.header(HEADER_TOKEN, token)
.response::<RepeatedTrash>()
.await?;
Ok(repeated_trash)
}

View file

@ -1,6 +1,4 @@
pub mod configuration; pub mod configuration;
pub mod errors; pub mod errors;
pub mod http_request;
pub mod middleware;
pub mod request; pub mod request;
pub mod response; pub mod response;

View file

@ -1,39 +0,0 @@
use crate::{request::ResponseMiddleware, response::FlowyResponse};
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::sync::broadcast;
lazy_static! {
pub static ref BACKEND_API_MIDDLEWARE: Arc<WorkspaceMiddleware> = Arc::new(WorkspaceMiddleware::new());
}
pub struct WorkspaceMiddleware {
invalid_token_sender: broadcast::Sender<String>,
}
impl WorkspaceMiddleware {
fn new() -> Self {
let (sender, _) = broadcast::channel(10);
WorkspaceMiddleware {
invalid_token_sender: sender,
}
}
pub fn invalid_token_subscribe(&self) -> broadcast::Receiver<String> { self.invalid_token_sender.subscribe() }
}
impl ResponseMiddleware for WorkspaceMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {
log::error!("user is unauthorized");
match token {
None => {},
Some(token) => match self.invalid_token_sender.send(token.clone()) {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
},
}
}
}
}
}

View file

@ -16,7 +16,7 @@ use tokio::{
task::spawn_blocking, task::spawn_blocking,
}; };
pub trait DocumentPersistence: Send + Sync + Debug { pub trait ServerDocumentPersistence: Send + Sync + Debug {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>; fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn create_document( fn create_document(
@ -40,11 +40,11 @@ pub trait DocumentPersistence: Send + Sync + Debug {
pub struct ServerDocumentManager { pub struct ServerDocumentManager {
open_doc_map: Arc<RwLock<HashMap<String, Arc<OpenDocHandle>>>>, open_doc_map: Arc<RwLock<HashMap<String, Arc<OpenDocHandle>>>>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
} }
impl ServerDocumentManager { impl ServerDocumentManager {
pub fn new(persistence: Arc<dyn DocumentPersistence>) -> Self { pub fn new(persistence: Arc<dyn ServerDocumentPersistence>) -> Self {
Self { Self {
open_doc_map: Arc::new(RwLock::new(HashMap::new())), open_doc_map: Arc::new(RwLock::new(HashMap::new())),
persistence, persistence,
@ -169,12 +169,12 @@ impl std::ops::Drop for ServerDocumentManager {
struct OpenDocHandle { struct OpenDocHandle {
doc_id: String, doc_id: String,
sender: mpsc::Sender<DocumentCommand>, sender: mpsc::Sender<DocumentCommand>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
users: DashMap<String, Arc<dyn RevisionUser>>, users: DashMap<String, Arc<dyn RevisionUser>>,
} }
impl OpenDocHandle { impl OpenDocHandle {
fn new(doc: DocumentInfo, persistence: Arc<dyn DocumentPersistence>) -> Result<Self, CollaborateError> { fn new(doc: DocumentInfo, persistence: Arc<dyn ServerDocumentPersistence>) -> Result<Self, CollaborateError> {
let doc_id = doc.doc_id.clone(); let doc_id = doc.doc_id.clone();
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let users = DashMap::new(); let users = DashMap::new();
@ -257,17 +257,17 @@ enum DocumentCommand {
ApplyRevisions { ApplyRevisions {
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevisionPB,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },
Ping { Ping {
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
rev_id: i64, rev_id: i64,
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },
Reset { Reset {
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevisionPB,
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },

View file

@ -6,7 +6,7 @@ use crate::{
}, },
errors::CollaborateError, errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::DocumentPersistence, sync::ServerDocumentPersistence,
util::*, util::*,
}; };
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
@ -54,7 +54,7 @@ impl RevisionSynchronizer {
&self, &self,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevisionPB,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone(); let doc_id = self.doc_id.clone();
if repeated_revision.get_items().is_empty() { if repeated_revision.get_items().is_empty() {
@ -115,7 +115,7 @@ impl RevisionSynchronizer {
pub async fn pong( pub async fn pong(
&self, &self,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
client_rev_id: i64, client_rev_id: i64,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone(); let doc_id = self.doc_id.clone();
@ -144,7 +144,7 @@ impl RevisionSynchronizer {
#[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)] #[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)]
pub async fn reset( pub async fn reset(
&self, &self,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone(); let doc_id = self.doc_id.clone();
@ -191,7 +191,11 @@ impl RevisionSynchronizer {
pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) } pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
async fn is_applied_before(&self, new_revision: &RevisionPB, persistence: &Arc<dyn DocumentPersistence>) -> bool { async fn is_applied_before(
&self,
new_revision: &RevisionPB,
persistence: &Arc<dyn ServerDocumentPersistence>,
) -> bool {
let rev_ids = Some(vec![new_revision.rev_id]); let rev_ids = Some(vec![new_revision.rev_id]);
if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await { if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await {
if let Some(revision) = revisions.first() { if let Some(revision) = revisions.first() {
@ -207,7 +211,7 @@ impl RevisionSynchronizer {
async fn push_revisions_to_user( async fn push_revisions_to_user(
&self, &self,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn ServerDocumentPersistence>,
from: i64, from: i64,
to: i64, to: i64,
) { ) {

View file

@ -446,7 +446,7 @@ fn invert_from_other<T: Attributes>(
tracing::trace!("invert op: {} [{}:{}]", operation, start, end); tracing::trace!("invert op: {} [{}:{}]", operation, start, end);
let other_ops = DeltaIter::from_interval(other, Interval::new(start, end)).ops(); let other_ops = DeltaIter::from_interval(other, Interval::new(start, end)).ops();
other_ops.into_iter().for_each(|other_op| match operation { other_ops.into_iter().for_each(|other_op| match operation {
Operation::Delete(n) => { Operation::Delete(_n) => {
// tracing::trace!("invert delete: {} by add {}", n, other_op); // tracing::trace!("invert delete: {} by add {}", n, other_op);
base.add(other_op); base.add(other_op);
}, },