fix: retain cycle

This commit is contained in:
Nathan 2025-04-23 15:59:53 +08:00
parent 1977cf6637
commit 7fc2fd56f0
37 changed files with 509 additions and 267 deletions

View file

@ -171,7 +171,7 @@ SPEC CHECKSUMS:
super_native_extensions: 85efee3a7495b46b04befcfc86ed12069264ebf3
url_launcher_macos: c82c93949963e55b228a30115bd219499a6fe404
webview_flutter_wkwebview: 0982481e3d9c78fd5c6f62a002fcd24fc791f1e4
window_manager: 3a1844359a6295ab1e47659b1a777e36773cd6e8
window_manager: 990c8e348c4da2a93b81da638245d40554ec9436
PODFILE CHECKSUM: 0532f3f001ca3110b8be345d6491fff690e95823

View file

@ -100,7 +100,7 @@ impl EventIntegrationTest {
Self::new_with_config(config).await
}
pub fn skip_clean(&mut self) {
pub fn skip_auto_remove_temp_dir(&mut self) {
self.cleaner.should_clean.store(false, Ordering::Release);
}

View file

@ -60,7 +60,7 @@ impl EventIntegrationTest {
pub async fn sign_up_as_anon(&self) -> SignUpContext {
let password = login_password();
let email = unique_email();
let email = "anon@appflowy.io".to_string();
let payload = SignUpPayloadPB {
email,
name: "appflowy".to_string(),

View file

@ -39,7 +39,7 @@ async fn af_cloud_upload_big_file_test() {
// Simulate a restart
let config = test.config.clone();
test.skip_clean();
test.skip_auto_remove_temp_dir();
drop(test);
tokio::time::sleep(Duration::from_secs(3)).await;

View file

@ -12,3 +12,34 @@ async fn af_cloud_sign_up_test() {
let user = test.af_cloud_sign_in_with_email(&email).await.unwrap();
assert_eq!(user.email, email);
}
#[tokio::test]
async fn af_cloud_sign_up_then_switch_to_anon_test() {
// user_localhost_af_cloud_with_nginx().await;
use_localhost_af_cloud().await;
let mut test = EventIntegrationTest::new().await;
test.skip_auto_remove_temp_dir();
let email = generate_test_email();
let user = test.af_cloud_sign_in_with_email(&email).await.unwrap();
assert_eq!(user.email, email);
test.sign_out().await;
let config = test.config.clone();
drop(test);
let mut test = EventIntegrationTest::new_with_config(config.clone()).await;
test.skip_auto_remove_temp_dir();
test.sign_up_as_anon().await;
drop(test);
let mut test = EventIntegrationTest::new_with_config(config.clone()).await;
test.skip_auto_remove_temp_dir();
let user = test.af_cloud_sign_in_with_email(&email).await.unwrap();
assert_eq!(user.email, email);
test.sign_out().await;
drop(test);
let mut test = EventIntegrationTest::new_with_config(config).await;
test.skip_auto_remove_temp_dir();
test.sign_up_as_anon().await;
}

View file

@ -69,6 +69,11 @@ pub struct AIManager {
pub store_preferences: Arc<KVStorePreferences>,
server_models: Arc<RwLock<ServerModelsCache>>,
}
impl Drop for AIManager {
fn drop(&mut self) {
tracing::trace!("[Drop] drop ai manager");
}
}
impl AIManager {
pub fn new(

View file

@ -21,7 +21,7 @@ impl DatabaseDepsResolver {
pub async fn resolve(
authenticate_user: Weak<AuthenticateUser>,
task_scheduler: Arc<RwLock<TaskDispatcher>>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
ai_service: Arc<dyn DatabaseAIService>,
ai_manager: Arc<AIManager>,

View file

@ -1,7 +1,6 @@
use crate::deps_resolve::CollabSnapshotSql;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use flowy_database2::DatabaseManager;
use flowy_document::entities::{DocumentSnapshotData, DocumentSnapshotMeta};
use flowy_document::manager::{DocumentManager, DocumentSnapshotService, DocumentUserService};
use flowy_document_pub::cloud::DocumentCloudService;
@ -15,8 +14,7 @@ pub struct DocumentDepsResolver();
impl DocumentDepsResolver {
pub fn resolve(
authenticate_user: Weak<AuthenticateUser>,
_database_manager: &Arc<DatabaseManager>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn StorageService>,
) -> Arc<DocumentManager> {

View file

@ -7,10 +7,16 @@ use flowy_folder::entities::CreateViewParams;
use flowy_folder::share::ImportType;
use flowy_folder::view_operation::{FolderOperationHandler, ImportedData};
use lib_infra::async_trait::async_trait;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use uuid::Uuid;
pub struct ChatFolderOperation(pub Arc<AIManager>);
pub struct ChatFolderOperation(pub Weak<AIManager>);
impl ChatFolderOperation {
fn ai_manager(&self) -> Result<Arc<AIManager>, FlowyError> {
self.0.upgrade().ok_or_else(FlowyError::ref_drop)
}
}
#[async_trait]
impl FolderOperationHandler for ChatFolderOperation {
@ -19,15 +25,15 @@ impl FolderOperationHandler for ChatFolderOperation {
}
async fn open_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
self.0.open_chat(view_id).await
self.ai_manager()?.open_chat(view_id).await
}
async fn close_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
self.0.close_chat(view_id).await
self.ai_manager()?.close_chat(view_id).await
}
async fn delete_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
self.0.delete_chat(view_id).await
self.ai_manager()?.delete_chat(view_id).await
}
async fn duplicate_view(&self, _view_id: &Uuid) -> Result<Bytes, FlowyError> {
@ -51,7 +57,7 @@ impl FolderOperationHandler for ChatFolderOperation {
_layout: ViewLayout,
) -> Result<(), FlowyError> {
self
.0
.ai_manager()?
.create_chat(&user_id, parent_view_id, view_id)
.await?;
Ok(())

View file

@ -19,21 +19,26 @@ use flowy_user::services::data_import::{load_collab_by_object_id, load_collab_by
use lib_infra::async_trait::async_trait;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use uuid::Uuid;
pub struct DatabaseFolderOperation(pub Arc<DatabaseManager>);
pub struct DatabaseFolderOperation(pub Weak<DatabaseManager>);
impl DatabaseFolderOperation {
fn database_manager(&self) -> Result<Arc<DatabaseManager>, FlowyError> {
self.0.upgrade().ok_or_else(FlowyError::ref_drop)
}
}
#[async_trait]
impl FolderOperationHandler for DatabaseFolderOperation {
async fn open_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
self.0.open_database_view(view_id).await?;
self.database_manager()?.open_database_view(view_id).await?;
Ok(())
}
async fn close_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
self
.0
.database_manager()?
.close_database_view(view_id.to_string().as_str())
.await?;
Ok(())
@ -41,7 +46,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
async fn delete_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
match self
.0
.database_manager()?
.delete_database_view(view_id.to_string().as_str())
.await
{
@ -58,17 +63,18 @@ impl FolderOperationHandler for DatabaseFolderOperation {
) -> Result<GatherEncodedCollab, FlowyError> {
let workspace_id = _user.workspace_id()?;
let view_id_str = view_id.to_string();
let database_manager = self.database_manager()?;
// get the collab_object_id for the database.
//
// the collab object_id for the database is not the view_id,
// we should use the view_id to get the database_id
let oid = self.0.get_database_id_with_view_id(&view_id_str).await?;
let row_oids = self
.0
let oid = database_manager
.get_database_id_with_view_id(&view_id_str)
.await?;
let row_oids = database_manager
.get_database_row_ids_with_view_id(&view_id_str)
.await?;
let row_metas = self
.0
let row_metas = database_manager
.get_database_row_metas_with_view_id(view_id, row_oids.clone())
.await?;
let row_document_ids = row_metas
@ -79,7 +85,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
.into_iter()
.map(|oid| oid.into_inner())
.collect::<Vec<_>>();
let database_metas = self.0.get_all_databases_meta().await;
let database_metas = database_manager.get_all_databases_meta().await;
let uid = _user
.user_id()
@ -178,14 +184,14 @@ impl FolderOperationHandler for DatabaseFolderOperation {
let duplicated_view_id =
String::from_utf8(data.to_vec()).map_err(|_| FlowyError::invalid_data())?;
let encoded_collab = self
.0
.database_manager()?
.duplicate_database(&duplicated_view_id, &params.view_id.to_string())
.await?;
Ok(Some(encoded_collab))
},
ViewData::Data(data) => {
let encoded_collab = self
.0
.database_manager()?
.create_database_with_data(&params.view_id.to_string(), data.to_vec())
.await?;
Ok(Some(encoded_collab))
@ -207,7 +213,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
let database_view_id = params.view_id.to_string();
let database_parent_view_id = params.parent_view_id.to_string();
self
.0
.database_manager()?
.create_linked_view(
name,
layout.into(),
@ -245,7 +251,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
);
},
};
let result = self.0.import_database(data).await;
let result = self.database_manager()?.import_database(data).await;
match result {
Ok(_) => Ok(()),
Err(err) => {
@ -276,7 +282,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
})
.await??;
let result = self
.0
.database_manager()?
.import_csv(view_id.to_string(), content, format)
.await?;
Ok(
@ -309,7 +315,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
let content =
String::from_utf8(data).map_err(|e| FlowyError::invalid_data().with_context(e))?;
let _ = self
.0
.database_manager()?
.import_csv(view_id.to_string(), content, CSVFormat::Original)
.await?;
Ok(())
@ -327,7 +333,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
if old.layout != new.layout {
self
.0
.database_manager()?
.update_database_layout(&new.id, database_layout)
.await?;
Ok(())

View file

@ -18,11 +18,17 @@ use lib_dispatch::prelude::ToBytes;
use lib_infra::async_trait::async_trait;
use std::convert::TryFrom;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use tokio::sync::RwLock;
use uuid::Uuid;
pub struct DocumentFolderOperation(pub Arc<DocumentManager>);
pub struct DocumentFolderOperation(pub Weak<DocumentManager>);
impl DocumentFolderOperation {
fn document_manager(&self) -> Result<Arc<DocumentManager>, FlowyError> {
self.0.upgrade().ok_or_else(FlowyError::ref_drop)
}
}
#[async_trait]
impl FolderOperationHandler for DocumentFolderOperation {
fn name(&self) -> &str {
@ -34,7 +40,7 @@ impl FolderOperationHandler for DocumentFolderOperation {
uid: i64,
workspace_view_builder: Arc<RwLock<NestedViewBuilder>>,
) -> Result<(), FlowyError> {
let manager = self.0.clone();
let manager = self.document_manager()?;
let mut write_guard = workspace_view_builder.write().await;
// Create a view named "Getting started" with an icon ⭐️ and the built-in README data.
@ -60,18 +66,18 @@ impl FolderOperationHandler for DocumentFolderOperation {
}
async fn open_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
self.0.open_document(view_id).await?;
self.document_manager()?.open_document(view_id).await?;
Ok(())
}
/// Close the document view.
async fn close_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
self.0.close_document(view_id).await?;
self.document_manager()?.close_document(view_id).await?;
Ok(())
}
async fn delete_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
match self.0.delete_document(view_id).await {
match self.document_manager()?.delete_document(view_id).await {
Ok(_) => tracing::trace!("Delete document: {}", view_id),
Err(e) => tracing::error!("🔴delete document failed: {}", e),
}
@ -79,7 +85,11 @@ impl FolderOperationHandler for DocumentFolderOperation {
}
async fn duplicate_view(&self, view_id: &Uuid) -> Result<Bytes, FlowyError> {
let data: DocumentDataPB = self.0.get_document_data(view_id).await?.into();
let data: DocumentDataPB = self
.document_manager()?
.get_document_data(view_id)
.await?
.into();
let data_bytes = data.into_bytes().map_err(|_| FlowyError::invalid_data())?;
Ok(data_bytes)
}
@ -107,7 +117,7 @@ impl FolderOperationHandler for DocumentFolderOperation {
ViewData::Empty => None,
};
let encoded_collab = self
.0
.document_manager()?
.create_document(user_id, &params.view_id, data.map(|d| d.into()))
.await?;
Ok(Some(encoded_collab))
@ -123,7 +133,11 @@ impl FolderOperationHandler for DocumentFolderOperation {
layout: ViewLayout,
) -> Result<(), FlowyError> {
debug_assert_eq!(layout, ViewLayout::Document);
match self.0.create_document(user_id, view_id, None).await {
match self
.document_manager()?
.create_document(user_id, view_id, None)
.await
{
Ok(_) => Ok(()),
Err(err) => {
if err.is_already_exists() {
@ -145,7 +159,7 @@ impl FolderOperationHandler for DocumentFolderOperation {
) -> Result<Vec<ImportedData>, FlowyError> {
let data = DocumentDataPB::try_from(Bytes::from(bytes))?;
let encoded_collab = self
.0
.document_manager()?
.create_document(uid, view_id, Some(data.into()))
.await?;
Ok(vec![(

View file

@ -34,7 +34,7 @@ impl FolderDepsResolver {
pub async fn resolve(
authenticate_user: Weak<AuthenticateUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
server_provider: Arc<ServerProvider>,
server_provider: Weak<ServerProvider>,
folder_indexer: Arc<FolderIndexManagerImpl>,
store_preferences: Arc<KVStorePreferences>,
) -> Arc<FolderManager> {
@ -57,9 +57,9 @@ impl FolderDepsResolver {
pub fn register_handlers(
folder_manager: &Arc<FolderManager>,
document_manager: Arc<DocumentManager>,
database_manager: Arc<DatabaseManager>,
chat_manager: Arc<AIManager>,
document_manager: Weak<DocumentManager>,
database_manager: Weak<DatabaseManager>,
chat_manager: Weak<AIManager>,
) {
let document_folder_operation = Arc::new(DocumentFolderOperation(document_manager));
folder_manager.register_operation_handler(ViewLayout::Document, document_folder_operation);

View file

@ -2,7 +2,7 @@ use crate::server_layer::ServerProvider;
use collab_folder::hierarchy_builder::ParentChildViews;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use flowy_database2::DatabaseManager;
use flowy_error::FlowyResult;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder::manager::FolderManager;
use flowy_folder_pub::entities::ImportFrom;
use flowy_sqlite::kv::KVStorePreferences;
@ -11,7 +11,7 @@ use flowy_user::user_manager::UserManager;
use flowy_user_pub::workspace_service::UserWorkspaceService;
use lib_infra::async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use tracing::info;
use uuid::Uuid;
@ -20,11 +20,11 @@ pub struct UserDepsResolver();
impl UserDepsResolver {
pub async fn resolve(
authenticate_user: Arc<AuthenticateUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
server_provider: Arc<ServerProvider>,
collab_builder: Weak<AppFlowyCollabBuilder>,
server_provider: Weak<ServerProvider>,
store_preference: Arc<KVStorePreferences>,
database_manager: Arc<DatabaseManager>,
folder_manager: Arc<FolderManager>,
database_manager: Weak<DatabaseManager>,
folder_manager: Weak<FolderManager>,
) -> Arc<UserManager> {
let workspace_service_impl = Arc::new(UserWorkspaceServiceImpl {
database_manager,
@ -33,7 +33,7 @@ impl UserDepsResolver {
UserManager::new(
server_provider,
store_preference,
Arc::downgrade(&collab_builder),
collab_builder,
authenticate_user,
workspace_service_impl,
)
@ -41,8 +41,8 @@ impl UserDepsResolver {
}
pub struct UserWorkspaceServiceImpl {
pub database_manager: Arc<DatabaseManager>,
pub folder_manager: Arc<FolderManager>,
pub database_manager: Weak<DatabaseManager>,
pub folder_manager: Weak<FolderManager>,
}
#[async_trait]
@ -58,12 +58,16 @@ impl UserWorkspaceService for UserWorkspaceServiceImpl {
ImportFrom::AnonUser => {
self
.folder_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)?
.insert_views_as_spaces(views, orphan_views)
.await?;
},
ImportFrom::AppFlowyDataFolder => {
self
.folder_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)?
.insert_views_with_parent(views, orphan_views, parent_view_id)
.await?;
},
@ -77,6 +81,8 @@ impl UserWorkspaceService for UserWorkspaceServiceImpl {
) -> FlowyResult<()> {
self
.database_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)?
.update_database_indexing(ids_by_database_id)
.await?;
Ok(())
@ -87,6 +93,8 @@ impl UserWorkspaceService for UserWorkspaceServiceImpl {
// Log the error and continue
if let Err(err) = self
.folder_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)?
.remove_indices_for_workspace(workspace_id)
.await
{

View file

@ -67,6 +67,13 @@ pub struct AppFlowyCore {
pub search_manager: Arc<SearchManager>,
pub ai_manager: Arc<AIManager>,
pub storage_manager: Arc<StorageManager>,
pub collab_builder: Arc<AppFlowyCollabBuilder>,
}
impl Drop for AppFlowyCore {
fn drop(&mut self) {
tracing::trace!("[Drop] drop appflowy core");
}
}
impl AppFlowyCore {
@ -174,7 +181,7 @@ impl AppFlowyCore {
let folder_manager = FolderDepsResolver::resolve(
Arc::downgrade(&authenticate_user),
collab_builder.clone(),
server_provider.clone(),
Arc::downgrade(&server_provider),
folder_indexer.clone(),
store_preference.clone(),
)
@ -198,7 +205,7 @@ impl AppFlowyCore {
let database_manager = DatabaseDepsResolver::resolve(
Arc::downgrade(&authenticate_user),
task_dispatcher.clone(),
collab_builder.clone(),
Arc::downgrade(&collab_builder),
server_provider.clone(),
server_provider.clone(),
ai_manager.clone(),
@ -207,19 +214,18 @@ impl AppFlowyCore {
let document_manager = DocumentDepsResolver::resolve(
Arc::downgrade(&authenticate_user),
&database_manager,
collab_builder.clone(),
Arc::downgrade(&collab_builder),
server_provider.clone(),
Arc::downgrade(&storage_manager.storage_service),
);
let user_manager = UserDepsResolver::resolve(
authenticate_user.clone(),
collab_builder.clone(),
server_provider.clone(),
Arc::downgrade(&collab_builder),
Arc::downgrade(&server_provider),
store_preference.clone(),
database_manager.clone(),
folder_manager.clone(),
Arc::downgrade(&database_manager),
Arc::downgrade(&folder_manager),
)
.await;
@ -233,9 +239,9 @@ impl AppFlowyCore {
// Register the folder operation handlers
register_handlers(
&folder_manager,
document_manager.clone(),
database_manager.clone(),
ai_manager.clone(),
Arc::downgrade(&document_manager),
Arc::downgrade(&database_manager),
Arc::downgrade(&ai_manager),
);
(
@ -253,14 +259,14 @@ impl AppFlowyCore {
.await;
let user_status_callback = UserStatusCallbackImpl {
user_manager: user_manager.clone(),
collab_builder,
folder_manager: folder_manager.clone(),
database_manager: database_manager.clone(),
document_manager: document_manager.clone(),
server_provider: server_provider.clone(),
storage_manager: storage_manager.clone(),
ai_manager: ai_manager.clone(),
user_manager: Arc::downgrade(&user_manager),
collab_builder: Arc::downgrade(&collab_builder),
folder_manager: Arc::downgrade(&folder_manager),
database_manager: Arc::downgrade(&database_manager),
document_manager: Arc::downgrade(&document_manager),
server_provider: Arc::downgrade(&server_provider),
storage_manager: Arc::downgrade(&storage_manager),
ai_manager: Arc::downgrade(&ai_manager),
runtime: runtime.clone(),
};
@ -268,16 +274,13 @@ impl AppFlowyCore {
database_manager: Arc::downgrade(&database_manager),
document_manager: Arc::downgrade(&document_manager),
};
let cloned_user_manager = Arc::downgrade(&user_manager);
if let Some(user_manager) = cloned_user_manager.upgrade() {
if let Err(err) = user_manager
.init_with_callback(user_status_callback, collab_interact_impl)
.await
{
error!("Init user failed: {}", err)
}
if let Err(err) = user_manager
.init_with_callback(user_status_callback, collab_interact_impl)
.await
{
error!("Init user failed: {}", err)
}
#[allow(clippy::arc_with_non_send_sync)]
let event_dispatcher = Arc::new(AFPluginDispatcher::new(
runtime,
@ -305,6 +308,7 @@ impl AppFlowyCore {
search_manager,
ai_manager,
storage_manager,
collab_builder,
}
}

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use anyhow::Context;
use client_api::entity::billing_dto::SubscriptionPlan;
@ -24,20 +24,63 @@ use lib_infra::async_trait::async_trait;
use uuid::Uuid;
pub(crate) struct UserStatusCallbackImpl {
pub(crate) user_manager: Arc<UserManager>,
pub(crate) collab_builder: Arc<AppFlowyCollabBuilder>,
pub(crate) folder_manager: Arc<FolderManager>,
pub(crate) database_manager: Arc<DatabaseManager>,
pub(crate) document_manager: Arc<DocumentManager>,
pub(crate) server_provider: Arc<ServerProvider>,
pub(crate) storage_manager: Arc<StorageManager>,
pub(crate) ai_manager: Arc<AIManager>,
pub(crate) user_manager: Weak<UserManager>,
pub(crate) collab_builder: Weak<AppFlowyCollabBuilder>,
pub(crate) folder_manager: Weak<FolderManager>,
pub(crate) database_manager: Weak<DatabaseManager>,
pub(crate) document_manager: Weak<DocumentManager>,
pub(crate) server_provider: Weak<ServerProvider>,
pub(crate) storage_manager: Weak<StorageManager>,
pub(crate) ai_manager: Weak<AIManager>,
// By default, all callback will run on the caller thread. If you don't want to block the caller
// thread, you can use runtime to spawn a new task.
pub(crate) runtime: Arc<AFPluginRuntime>,
}
impl UserStatusCallbackImpl {
fn user_manager(&self) -> Result<Arc<UserManager>, FlowyError> {
self.user_manager.upgrade().ok_or_else(FlowyError::ref_drop)
}
fn folder_manager(&self) -> Result<Arc<FolderManager>, FlowyError> {
self
.folder_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
fn database_manager(&self) -> Result<Arc<DatabaseManager>, FlowyError> {
self
.database_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
fn document_manager(&self) -> Result<Arc<DocumentManager>, FlowyError> {
self
.document_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
fn server_provider(&self) -> Result<Arc<ServerProvider>, FlowyError> {
self
.server_provider
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
fn storage_manager(&self) -> Result<Arc<StorageManager>, FlowyError> {
self
.storage_manager
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
fn ai_manager(&self) -> Result<Arc<AIManager>, FlowyError> {
self.ai_manager.upgrade().ok_or_else(FlowyError::ref_drop)
}
async fn folder_init_data_source(
&self,
user_id: i64,
@ -50,8 +93,8 @@ impl UserStatusCallbackImpl {
});
}
let doc_state_result = self
.folder_manager
.cloud_service
.folder_manager()?
.cloud_service()?
.get_folder_doc_state(workspace_id, user_id, CollabType::Folder, workspace_id)
.await;
resolve_data_source(auth_type, doc_state_result)
@ -64,7 +107,7 @@ impl UserStatusCallbackImpl {
object_id: &Uuid,
) -> FlowyResult<bool> {
let db = self
.user_manager
.user_manager()?
.get_collab_db(user_id)?
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Collab db is not initialized"))?;
@ -87,17 +130,17 @@ impl UserStatusCallback for UserStatusCallbackImpl {
) -> FlowyResult<()> {
if let Some(cloud_config) = cloud_config {
self
.server_provider
.server_provider()?
.set_enable_sync(user_id, cloud_config.enable_sync);
if cloud_config.enable_encrypt {
self
.server_provider
.server_provider()?
.set_encrypt_secret(cloud_config.encrypt_secret.clone());
}
}
self
.folder_manager
.folder_manager()?
.initialize(
user_id,
workspace_id,
@ -107,12 +150,12 @@ impl UserStatusCallback for UserStatusCallbackImpl {
)
.await?;
self
.database_manager
.database_manager()?
.initialize(user_id, auth_type == &AuthType::Local)
.await?;
self.document_manager.initialize(user_id).await?;
self.document_manager()?.initialize(user_id).await?;
let cloned_ai_manager = self.ai_manager.clone();
let cloned_ai_manager = self.ai_manager()?;
let workspace_id = *workspace_id;
self.runtime.spawn(async move {
if let Err(err) = cloned_ai_manager
@ -142,20 +185,20 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.folder_init_data_source(user_id, workspace_id, auth_type)
.await?;
self
.folder_manager
.folder_manager()?
.initialize_after_sign_in(user_id, data_source)
.await?;
self
.database_manager
.database_manager()?
.initialize_after_sign_in(user_id, auth_type.is_local())
.await?;
self
.document_manager
.document_manager()?
.initialize_after_sign_in(user_id)
.await?;
self
.ai_manager
.ai_manager()?
.initialize_after_sign_in(workspace_id)
.await?;
@ -182,7 +225,7 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.await?;
self
.folder_manager
.folder_manager()?
.initialize_after_sign_up(
user_profile.uid,
&user_profile.token,
@ -194,26 +237,26 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.context("FolderManager error")?;
self
.database_manager
.database_manager()?
.initialize_after_sign_up(user_profile.uid, auth_type.is_local())
.await
.context("DatabaseManager error")?;
self
.document_manager
.document_manager()?
.initialize_after_sign_up(user_profile.uid)
.await
.context("DocumentManager error")?;
self
.ai_manager
.ai_manager()?
.initialize_after_sign_up(workspace_id)
.await?;
Ok(())
}
async fn on_token_expired(&self, _token: &str, user_id: i64) -> FlowyResult<()> {
self.folder_manager.clear(user_id).await;
self.folder_manager()?.clear(user_id).await;
Ok(())
}
@ -229,23 +272,23 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.await?;
self
.folder_manager
.folder_manager()?
.initialize_after_open_workspace(user_id, data_source)
.await?;
self
.database_manager
.database_manager()?
.initialize_after_open_workspace(user_id, auth_type.is_local())
.await?;
self
.document_manager
.document_manager()?
.initialize_after_open_workspace(user_id)
.await?;
self
.ai_manager
.ai_manager()?
.initialize_after_open_workspace(workspace_id)
.await?;
self
.storage_manager
.storage_manager()?
.initialize_after_open_workspace(workspace_id)
.await;
Ok(())
@ -253,8 +296,13 @@ impl UserStatusCallback for UserStatusCallbackImpl {
fn on_network_status_changed(&self, reachable: bool) {
info!("Notify did update network: reachable: {}", reachable);
self.collab_builder.update_network(reachable);
self.storage_manager.update_network_reachable(reachable);
if let Some(collab_builder) = self.collab_builder.upgrade() {
collab_builder.update_network(reachable);
}
if let Ok(storage) = self.storage_manager() {
storage.update_network_reachable(reachable);
}
}
fn on_subscription_plans_updated(&self, plans: Vec<SubscriptionPlan>) {
@ -266,15 +314,19 @@ impl UserStatusCallback for UserStatusCallbackImpl {
}
}
if storage_plan_changed {
self.storage_manager.enable_storage_write_access();
if let Ok(storage) = self.storage_manager() {
storage.enable_storage_write_access();
}
}
}
fn on_storage_permission_updated(&self, can_write: bool) {
if can_write {
self.storage_manager.enable_storage_write_access();
} else {
self.storage_manager.disable_storage_write_access();
if let Ok(storage) = self.storage_manager() {
if can_write {
storage.enable_storage_write_access();
} else {
storage.disable_storage_write_access();
}
}
}
}

View file

@ -59,16 +59,22 @@ pub struct DatabaseManager {
task_scheduler: Arc<TokioRwLock<TaskDispatcher>>,
pub(crate) editors: Mutex<DatabaseEditorMap>,
removing_editor: Arc<Mutex<HashMap<String, Arc<DatabaseEditor>>>>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
ai_service: Arc<dyn DatabaseAIService>,
}
impl Drop for DatabaseManager {
fn drop(&mut self) {
tracing::trace!("[Drop] drop database manager");
}
}
impl DatabaseManager {
pub fn new(
database_user: Arc<dyn DatabaseUser>,
task_scheduler: Arc<TokioRwLock<TaskDispatcher>>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
ai_service: Arc<dyn DatabaseAIService>,
) -> Self {
@ -84,6 +90,10 @@ impl DatabaseManager {
}
}
fn collab_builder(&self) -> FlowyResult<Arc<AppFlowyCollabBuilder>> {
self.collab_builder.upgrade().ok_or(FlowyError::ref_drop())
}
/// When initialize with new workspace, all the resources will be cleared.
pub async fn initialize(&self, uid: i64, is_local_user: bool) -> FlowyResult<()> {
// 1. Clear all existing tasks
@ -119,7 +129,7 @@ impl DatabaseManager {
.await?;
let collab_object = collab_service
.build_collab_object(&workspace_database_object_id, CollabType::WorkspaceDatabase)?;
let workspace_database = self.collab_builder.create_workspace_database_manager(
let workspace_database = self.collab_builder()?.create_workspace_database_manager(
collab_object,
workspace_database_collab,
collab_db,
@ -281,11 +291,12 @@ impl DatabaseManager {
// hasn't finished syncing yet. In such cases, get_or_create_database will return None.
// The workaround is to add a retry mechanism to attempt fetching the database again.
let database = open_database_with_retry(workspace_database, database_id).await?;
let collab_builder = self.collab_builder()?;
let editor = DatabaseEditor::new(
self.user.clone(),
database,
self.task_scheduler.clone(),
self.collab_builder.clone(),
collab_builder,
)
.await?;
@ -709,7 +720,7 @@ impl DatabaseManager {
struct WorkspaceDatabaseCollabServiceImpl {
is_local_user: bool,
user: Arc<dyn DatabaseUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
persistence: Arc<dyn DatabaseCollabPersistenceService>,
cloud_service: Arc<dyn DatabaseCloudService>,
}
@ -718,7 +729,7 @@ impl WorkspaceDatabaseCollabServiceImpl {
fn new(
is_local_user: bool,
user: Arc<dyn DatabaseUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
) -> Self {
let persistence = DatabasePersistenceImpl { user: user.clone() };
@ -731,6 +742,13 @@ impl WorkspaceDatabaseCollabServiceImpl {
}
}
fn collab_builder(&self) -> Result<Arc<AppFlowyCollabBuilder>, DatabaseError> {
self
.collab_builder
.upgrade()
.ok_or_else(|| DatabaseError::Internal(anyhow!("Collab builder is not initialized")))
}
async fn get_encode_collab(
&self,
object_id: &Uuid,
@ -797,7 +815,7 @@ impl WorkspaceDatabaseCollabServiceImpl {
.workspace_id()
.map_err(|err| DatabaseError::Internal(err.into()))?;
let object = self
.collab_builder
.collab_builder()?
.collab_object(&workspace_id, uid, object_id, object_type)
.map_err(|err| DatabaseError::Internal(anyhow!("Failed to build collab object: {}", err)))?;
Ok(object)
@ -906,7 +924,7 @@ impl DatabaseCollabService for WorkspaceDatabaseCollabServiceImpl {
let collab_db = self.collab_db()?;
let collab = self
.collab_builder
.collab_builder()?
.build_collab(&object, &collab_db, data_source)
.await?;
Ok(collab)

View file

@ -65,7 +65,7 @@ pub struct DatabaseEditor {
pub(crate) database_views: Arc<DatabaseViews>,
#[allow(dead_code)]
user: Arc<dyn DatabaseUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
is_loading_rows: ArcSwapOption<broadcast::Sender<()>>,
opening_ret_txs: Arc<RwLock<Vec<OpenDatabaseResult>>>,
#[allow(dead_code)]
@ -138,7 +138,7 @@ impl DatabaseEditor {
database,
cell_cache,
database_views,
collab_builder,
collab_builder: Arc::downgrade(&collab_builder),
is_loading_rows: Default::default(),
opening_ret_txs: Arc::new(Default::default()),
database_cancellation,
@ -150,6 +150,13 @@ impl DatabaseEditor {
Ok(this)
}
pub fn collab_builder(&self) -> FlowyResult<Arc<AppFlowyCollabBuilder>> {
self
.collab_builder
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
pub async fn close_view(&self, view_id: &str) {
self.database_views.remove_view(view_id).await;
}
@ -795,6 +802,7 @@ impl DatabaseEditor {
}
debug!("[Database]: Init database row: {}", row_id);
let collab_builder = self.collab_builder()?;
let database_row = self
.database
.read()
@ -810,14 +818,14 @@ impl DatabaseEditor {
if !is_finalized {
trace!("[Database]: finalize database row: {}", row_id);
let row_id = Uuid::from_str(row_id.as_str())?;
let collab_object = self.collab_builder.collab_object(
let collab_object = collab_builder.collab_object(
&self.user.workspace_id()?,
self.user.user_id()?,
&row_id,
CollabType::DatabaseRow,
)?;
if let Err(err) = self.collab_builder.finalize(
if let Err(err) = collab_builder.finalize(
collab_object,
CollabBuilderConfig::default(),
database_row.clone(),

View file

@ -53,7 +53,7 @@ pub trait DocumentSnapshotService: Send + Sync {
pub struct DocumentManager {
pub user_service: Arc<dyn DocumentUserService>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
documents: Arc<DashMap<Uuid, Arc<RwLock<Document>>>>,
removing_documents: Arc<DashMap<Uuid, Arc<RwLock<Document>>>>,
cloud_service: Arc<dyn DocumentCloudService>,
@ -61,10 +61,16 @@ pub struct DocumentManager {
snapshot_service: Arc<dyn DocumentSnapshotService>,
}
impl Drop for DocumentManager {
fn drop(&mut self) {
tracing::trace!("[Drop] drop document manager");
}
}
impl DocumentManager {
pub fn new(
user_service: Arc<dyn DocumentUserService>,
collab_builder: Arc<AppFlowyCollabBuilder>,
collab_builder: Weak<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn StorageService>,
snapshot_service: Arc<dyn DocumentSnapshotService>,
@ -80,6 +86,13 @@ impl DocumentManager {
}
}
fn collab_builder(&self) -> FlowyResult<Arc<AppFlowyCollabBuilder>> {
self
.collab_builder
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
/// Get the encoded collab of the document.
pub async fn get_encoded_collab_with_view_id(&self, doc_id: &Uuid) -> FlowyResult<EncodedCollab> {
let uid = self.user_service.user_id()?;
@ -190,10 +203,10 @@ impl DocumentManager {
let workspace_id = self.user_service.workspace_id()?;
let collab_object =
self
.collab_builder
.collab_builder()?
.collab_object(&workspace_id, uid, doc_id, CollabType::Document)?;
let document = self
.collab_builder
.collab_builder()?
.create_document(
collab_object,
data_source,

View file

@ -25,6 +25,8 @@ use tracing_subscriber::{fmt::Subscriber, util::SubscriberInitExt, EnvFilter};
use uuid::Uuid;
pub struct DocumentTest {
#[allow(dead_code)]
builder: Arc<AppFlowyCollabBuilder>,
inner: DocumentManager,
}
@ -44,12 +46,15 @@ impl DocumentTest {
let manager = DocumentManager::new(
Arc::new(user),
builder,
Arc::downgrade(&builder),
cloud_service,
Arc::downgrade(&file_storage),
document_snapshot,
);
Self { inner: manager }
Self {
inner: manager,
builder,
}
}
}

View file

@ -383,6 +383,9 @@ pub enum ErrorCode {
#[error("User not login")]
UserNotLogin = 131,
#[error("Reference resource is not available")]
WeakRefDrop = 132,
}
impl ErrorCode {

View file

@ -162,6 +162,7 @@ impl FlowyError {
static_flowy_error!(local_ai_not_ready, ErrorCode::LocalAINotReady);
static_flowy_error!(local_ai_disabled, ErrorCode::LocalAIDisabled);
static_flowy_error!(user_not_login, ErrorCode::UserNotLogin);
static_flowy_error!(ref_drop, ErrorCode::WeakRefDrop);
}
impl std::convert::From<ErrorCode> for FlowyError {

View file

@ -63,16 +63,22 @@ pub struct FolderManager {
pub(crate) collab_builder: Arc<AppFlowyCollabBuilder>,
pub(crate) user: Arc<dyn FolderUser>,
pub(crate) operation_handlers: FolderOperationHandlers,
pub cloud_service: Arc<dyn FolderCloudService>,
pub cloud_service: Weak<dyn FolderCloudService>,
pub(crate) folder_indexer: Arc<dyn FolderIndexManager>,
pub(crate) store_preferences: Arc<KVStorePreferences>,
}
impl Drop for FolderManager {
fn drop(&mut self) {
tracing::trace!("[Drop] drop folder manager");
}
}
impl FolderManager {
pub fn new(
user: Arc<dyn FolderUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn FolderCloudService>,
cloud_service: Weak<dyn FolderCloudService>,
folder_indexer: Arc<dyn FolderIndexManager>,
store_preferences: Arc<KVStorePreferences>,
) -> FlowyResult<Self> {
@ -89,6 +95,13 @@ impl FolderManager {
Ok(manager)
}
pub fn cloud_service(&self) -> FlowyResult<Arc<dyn FolderCloudService>> {
self
.cloud_service
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
pub fn register_operation_handler(
&self,
layout: ViewLayout,
@ -317,7 +330,7 @@ impl FolderManager {
// The folder updates should not be empty, as the folder data is stored
// when the user signs up for the first time.
let result = self
.cloud_service
.cloud_service()?
.get_folder_doc_state(workspace_id, user_id, CollabType::Folder, workspace_id)
.await
.map_err(FlowyError::from);
@ -326,7 +339,7 @@ impl FolderManager {
Ok(folder_doc_state) => {
info!(
"Get folder updates via {}, doc state len: {}",
self.cloud_service.service_name(),
self.cloud_service()?.service_name(),
folder_doc_state.len()
);
self
@ -1202,7 +1215,7 @@ impl FolderManager {
// Sync the view to the cloud
if sync_after_create {
self
.cloud_service
.cloud_service()?
.batch_create_folder_collab_objects(&workspace_id, objects)
.await?;
}
@ -1355,7 +1368,7 @@ impl FolderManager {
let workspace_id = self.user.workspace_id()?;
self
.cloud_service
.cloud_service()?
.publish_view(&workspace_id, payload)
.await?;
Ok(())
@ -1366,7 +1379,7 @@ impl FolderManager {
pub async fn unpublish_views(&self, view_ids: Vec<Uuid>) -> FlowyResult<()> {
let workspace_id = self.user.workspace_id()?;
self
.cloud_service
.cloud_service()?
.unpublish_views(&workspace_id, view_ids)
.await?;
Ok(())
@ -1376,7 +1389,7 @@ impl FolderManager {
/// The publish info contains the namespace and publish_name of the view.
#[tracing::instrument(level = "debug", skip(self))]
pub async fn get_publish_info(&self, view_id: &Uuid) -> FlowyResult<PublishInfo> {
let publish_info = self.cloud_service.get_publish_info(view_id).await?;
let publish_info = self.cloud_service()?.get_publish_info(view_id).await?;
Ok(publish_info)
}
@ -1385,7 +1398,7 @@ impl FolderManager {
pub async fn set_publish_name(&self, view_id: Uuid, new_name: String) -> FlowyResult<()> {
let workspace_id = self.user.workspace_id()?;
self
.cloud_service
.cloud_service()?
.set_publish_name(&workspace_id, view_id, new_name)
.await?;
Ok(())
@ -1397,7 +1410,7 @@ impl FolderManager {
pub async fn set_publish_namespace(&self, new_namespace: String) -> FlowyResult<()> {
let workspace_id = self.user.workspace_id()?;
self
.cloud_service
.cloud_service()?
.set_publish_namespace(&workspace_id, new_namespace)
.await?;
Ok(())
@ -1408,7 +1421,7 @@ impl FolderManager {
pub async fn get_publish_namespace(&self) -> FlowyResult<String> {
let workspace_id = self.user.workspace_id()?;
let namespace = self
.cloud_service
.cloud_service()?
.get_publish_namespace(&workspace_id)
.await?;
Ok(namespace)
@ -1419,7 +1432,7 @@ impl FolderManager {
pub async fn list_published_views(&self) -> FlowyResult<Vec<PublishInfoView>> {
let workspace_id = self.user.workspace_id()?;
let published_views = self
.cloud_service
.cloud_service()?
.list_published_views(&workspace_id)
.await?;
Ok(published_views)
@ -1429,7 +1442,7 @@ impl FolderManager {
pub async fn get_default_published_view_info(&self) -> FlowyResult<PublishInfo> {
let workspace_id = self.user.workspace_id()?;
let default_published_view_info = self
.cloud_service
.cloud_service()?
.get_default_published_view_info(&workspace_id)
.await?;
Ok(default_published_view_info)
@ -1439,7 +1452,7 @@ impl FolderManager {
pub async fn set_default_published_view(&self, view_id: uuid::Uuid) -> FlowyResult<()> {
let workspace_id = self.user.workspace_id()?;
self
.cloud_service
.cloud_service()?
.set_default_published_view(&workspace_id, view_id)
.await?;
Ok(())
@ -1449,7 +1462,7 @@ impl FolderManager {
pub async fn remove_default_published_view(&self) -> FlowyResult<()> {
let workspace_id = self.user.workspace_id()?;
self
.cloud_service
.cloud_service()?
.remove_default_published_view(&workspace_id)
.await?;
Ok(())
@ -1779,7 +1792,7 @@ impl FolderManager {
}
pub(crate) async fn import_zip_file(&self, zip_file_path: &str) -> FlowyResult<()> {
self.cloud_service.import_zip(zip_file_path).await?;
self.cloud_service()?.import_zip(zip_file_path).await?;
Ok(())
}
@ -1809,7 +1822,7 @@ impl FolderManager {
info!("Syncing the imported {} collab to the cloud", objects.len());
self
.cloud_service
.cloud_service()?
.batch_create_folder_collab_objects(&workspace_id, objects)
.await?;
@ -1939,7 +1952,7 @@ impl FolderManager {
limit: usize,
) -> FlowyResult<Vec<FolderSnapshotPB>> {
let snapshots = self
.cloud_service
.cloud_service()?
.get_folder_snapshots(workspace_id, limit)
.await?
.into_iter()

View file

@ -73,7 +73,7 @@ impl FolderManager {
// 3. If the folder doesn't exist and create_if_not_exist is false, try to fetch the folder data from cloud/
// This will happen user can't fetch the folder data when the user sign in.
let doc_state = self
.cloud_service
.cloud_service()?
.get_folder_doc_state(workspace_id, uid, CollabType::Folder, workspace_id)
.await?;

View file

@ -121,6 +121,12 @@ impl SearchManager {
}
}
impl Drop for SearchManager {
fn drop(&mut self) {
tracing::trace!("[Drop] drop search manager");
}
}
async fn is_current_search(
current_search: &Arc<tokio::sync::Mutex<Option<String>>>,
search_id: &str,

View file

@ -382,10 +382,9 @@ pub async fn set_cloud_config_handler(
let mut config = get_cloud_config(session.user_id, &store_preferences)
.ok_or(FlowyError::internal().with_context("Can't find any cloud config"))?;
let cloud_service = manager.cloud_service()?;
if let Some(enable_sync) = update.enable_sync {
manager
.cloud_service
.set_enable_sync(session.user_id, enable_sync);
cloud_service.set_enable_sync(session.user_id, enable_sync);
config.enable_sync = enable_sync;
}
@ -395,7 +394,7 @@ pub async fn set_cloud_config_handler(
enable_sync: config.enable_sync,
enable_encrypt: config.enable_encrypt,
encrypt_secret: config.encrypt_secret,
server_url: manager.cloud_service.service_url(),
server_url: cloud_service.service_url(),
};
send_notification(
@ -416,13 +415,14 @@ pub async fn get_cloud_config_handler(
let manager = upgrade_manager(manager)?;
let session = manager.get_session()?;
let store_preferences = upgrade_store_preferences(store_preferences)?;
let cloud_service = manager.cloud_service()?;
// Generate the default config if the config is not exist
let config = get_or_create_cloud_config(session.user_id, &store_preferences);
data_result_ok(CloudSettingPB {
enable_sync: config.enable_sync,
enable_encrypt: config.enable_encrypt,
encrypt_secret: config.encrypt_secret,
server_url: manager.cloud_service.service_url(),
server_url: cloud_service.service_url(),
})
}
@ -476,7 +476,7 @@ pub async fn update_network_state_handler(
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let reachable = data.into_inner().ty.is_reachable();
manager.cloud_service.set_network_reachable(reachable);
manager.cloud_service()?.set_network_reachable(reachable);
manager
.user_status_callback
.read()

View file

@ -1,6 +1,6 @@
use diesel::SqliteConnection;
use semver::Version;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use tracing::instrument;
use collab_integrate::CollabKVDB;
@ -35,7 +35,7 @@ impl UserDataMigration for AnonUserWorkspaceTableMigration {
fn run(
&self,
user: &Session,
_collab_db: &Arc<CollabKVDB>,
_collab_db: &Weak<CollabKVDB>,
user_auth_type: &AuthType,
db: &mut SqliteConnection,
store_preferences: &Arc<KVStorePreferences>,

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use collab_plugins::local_storage::kv::doc::migrate_old_keys;
use collab_plugins::local_storage::kv::KVTransactionDB;
@ -7,7 +7,7 @@ use semver::Version;
use tracing::{instrument, trace};
use collab_integrate::CollabKVDB;
use flowy_error::FlowyResult;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_user_pub::entities::AuthType;
@ -40,11 +40,14 @@ impl UserDataMigration for CollabDocKeyWithWorkspaceIdMigration {
fn run(
&self,
user: &Session,
collab_db: &Arc<CollabKVDB>,
collab_db: &Weak<CollabKVDB>,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
_store_preferences: &Arc<KVStorePreferences>,
) -> FlowyResult<()> {
let collab_db = collab_db
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?;
trace!("migrate key with workspace id:{}", user.workspace_id);
collab_db.with_write_txn(|txn| {
migrate_old_keys(txn, &user.workspace_id)?;

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::Collab;
@ -42,7 +42,7 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration {
fn run(
&self,
user: &Session,
collab_db: &Arc<CollabKVDB>,
collab_db: &Weak<CollabKVDB>,
user_auth_type: &AuthType,
_db: &mut SqliteConnection,
_store_preferences: &Arc<KVStorePreferences>,
@ -53,6 +53,9 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration {
if !matches!(user_auth_type, AuthType::Local) {
return Ok(());
}
let collab_db = collab_db
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?;
collab_db.with_write_txn(|write_txn| {
let origin = CollabOrigin::Client(CollabClient::new(user.user_id, "phantom"));
let folder_collab = match load_collab(

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use chrono::NaiveDateTime;
use collab_integrate::CollabKVDB;
@ -18,7 +18,7 @@ pub const FIRST_TIME_INSTALL_VERSION: &str = "first_install_version";
pub struct UserLocalDataMigration {
session: Session,
collab_db: Arc<CollabKVDB>,
collab_db: Weak<CollabKVDB>,
sqlite_pool: Arc<ConnectionPool>,
kv: Arc<KVStorePreferences>,
}
@ -26,7 +26,7 @@ pub struct UserLocalDataMigration {
impl UserLocalDataMigration {
pub fn new(
session: Session,
collab_db: Arc<CollabKVDB>,
collab_db: Weak<CollabKVDB>,
sqlite_pool: Arc<ConnectionPool>,
kv: Arc<KVStorePreferences>,
) -> Self {
@ -103,7 +103,7 @@ pub trait UserDataMigration {
fn run(
&self,
user: &Session,
collab_db: &Arc<CollabKVDB>,
collab_db: &Weak<CollabKVDB>,
user_auth_type: &AuthType,
db: &mut SqliteConnection,
store_preferences: &Arc<KVStorePreferences>,

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use collab_folder::Folder;
use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError};
@ -7,7 +7,7 @@ use semver::Version;
use tracing::instrument;
use collab_integrate::{CollabKVAction, CollabKVDB};
use flowy_error::FlowyResult;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_user_pub::entities::AuthType;
@ -40,11 +40,14 @@ impl UserDataMigration for FavoriteV1AndWorkspaceArrayMigration {
fn run(
&self,
user: &Session,
collab_db: &Arc<CollabKVDB>,
collab_db: &Weak<CollabKVDB>,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
_store_preferences: &Arc<KVStorePreferences>,
) -> FlowyResult<()> {
let collab_db = collab_db
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?;
collab_db.with_write_txn(|write_txn| {
if let Ok(collab) = load_collab(
user.user_id,

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use collab_folder::Folder;
use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError};
@ -7,7 +7,7 @@ use semver::Version;
use tracing::instrument;
use collab_integrate::{CollabKVAction, CollabKVDB};
use flowy_error::FlowyResult;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_user_pub::entities::AuthType;
@ -38,11 +38,14 @@ impl UserDataMigration for WorkspaceTrashMapToSectionMigration {
fn run(
&self,
user: &Session,
collab_db: &Arc<CollabKVDB>,
collab_db: &Weak<CollabKVDB>,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
_store_preferences: &Arc<KVStorePreferences>,
) -> FlowyResult<()> {
let collab_db = collab_db
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?;
collab_db.with_write_txn(|write_txn| {
if let Ok(collab) = load_collab(
user.user_id,

View file

@ -26,6 +26,15 @@ pub struct AuthenticateUser {
session: ArcSwapOption<Session>,
}
impl Drop for AuthenticateUser {
fn drop(&mut self) {
tracing::trace!(
"[Drop ]Drop AuthenticateUser: {:?}",
self.session.load_full().map(|s| s.user_id)
);
}
}
impl AuthenticateUser {
pub fn new(user_config: UserConfig, store_preferences: Arc<KVStorePreferences>) -> Self {
let user_paths = UserPaths::new(user_config.storage_path.clone());
@ -71,10 +80,7 @@ impl AuthenticateUser {
}
pub fn get_collab_db(&self, uid: i64) -> FlowyResult<Weak<CollabKVDB>> {
self
.database
.get_collab_db(uid)
.map(|collab_db| Arc::downgrade(&collab_db))
self.database.get_collab_db(uid)
}
pub fn get_sqlite_connection(&self, uid: i64) -> FlowyResult<DBConnection> {
@ -104,7 +110,11 @@ impl AuthenticateUser {
pub fn is_collab_on_disk(&self, uid: i64, object_id: &str) -> FlowyResult<bool> {
let session = self.get_session()?;
let collab_db = self.database.get_collab_db(uid)?;
let collab_db = self
.database
.get_collab_db(uid)?
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Collab db is not initialized"))?;
let read_txn = collab_db.read_txn();
Ok(read_txn.is_exist(uid, session.workspace_id.as_str(), object_id))
}

View file

@ -47,7 +47,7 @@ use uuid::Uuid;
pub(crate) struct ImportedFolder {
pub imported_session: Session,
pub imported_collab_db: Arc<CollabKVDB>,
pub imported_collab_db: Weak<CollabKVDB>,
pub container_name: Option<String>,
pub parent_view_id: Option<String>,
pub source: ImportedSource,
@ -126,7 +126,7 @@ pub(crate) fn prepare_import(
run_data_migration(
&imported_session,
&imported_user_auth_type,
imported_collab_db.clone(),
Arc::downgrade(&imported_collab_db),
imported_sqlite_db.get_pool(),
other_store_preferences.clone(),
app_version,
@ -134,7 +134,7 @@ pub(crate) fn prepare_import(
Ok(ImportedFolder {
imported_session,
imported_collab_db,
imported_collab_db: Arc::downgrade(&imported_collab_db),
container_name: None,
parent_view_id,
source: ImportedSource::ExternalFolder,
@ -174,7 +174,10 @@ pub(crate) fn generate_import_data(
let imported_workspace_id = imported_folder.imported_session.workspace_id.clone();
let imported_session = imported_folder.imported_session.clone();
let imported_workspace_database_id = imported_folder.workspace_database_id.clone();
let imported_collab_db = imported_folder.imported_collab_db.clone();
let imported_collab_db = imported_folder
.imported_collab_db
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade DB object"))?;
let imported_container_view_name = imported_folder.container_name.clone();
let mut database_view_ids_by_database_id: HashMap<String, Vec<String>> = HashMap::new();

View file

@ -1,6 +1,3 @@
use std::path::{Path, PathBuf};
use std::{fs, io, sync::Arc};
use chrono::{Days, Local};
use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
use collab_plugins::local_storage::kv::KVTransactionDB;
@ -12,6 +9,9 @@ use flowy_sqlite::{DBConnection, Database};
use flowy_user_pub::entities::UserProfile;
use flowy_user_pub::sql::select_user_profile;
use lib_infra::file_util::{unzip_and_replace, zip_folder};
use std::path::{Path, PathBuf};
use std::sync::Weak;
use std::{fs, io, sync::Arc};
use tracing::{error, event, info, instrument};
pub trait UserDBPath: Send + Sync + 'static {
@ -97,9 +97,9 @@ impl UserDB {
Ok(pool)
}
pub(crate) fn get_collab_db(&self, user_id: i64) -> Result<Arc<CollabKVDB>, FlowyError> {
pub(crate) fn get_collab_db(&self, user_id: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
let collab_db = self.open_collab_db(self.paths.collab_db_path(user_id), user_id)?;
Ok(collab_db)
Ok(Arc::downgrade(&collab_db))
}
pub fn open_sqlite_db(
@ -340,7 +340,7 @@ pub(crate) fn validate_collab_db(
read_txn.is_exist(uid, workspace_id, workspace_id)
},
Err(err) => {
error!("open collab db error, {:?}", err);
error!("open collab db error when validate collab, {:?}", err);
!matches!(
err,
PersistenceError::RocksdbCorruption(_) | PersistenceError::RocksdbRepairFail(_)

View file

@ -44,7 +44,7 @@ use flowy_user_pub::session::Session;
use flowy_user_pub::sql::*;
pub struct UserManager {
pub(crate) cloud_service: Arc<dyn UserCloudServiceProvider>,
pub(crate) cloud_service: Weak<dyn UserCloudServiceProvider>,
pub(crate) store_preferences: Arc<KVStorePreferences>,
pub(crate) user_awareness: Arc<ArcSwapOption<RwLock<UserAwareness>>>,
pub(crate) user_status_callback: RwLock<Arc<dyn UserStatusCallback>>,
@ -56,9 +56,15 @@ pub struct UserManager {
pub(crate) is_loading_awareness: Arc<DashMap<Uuid, bool>>,
}
impl Drop for UserManager {
fn drop(&mut self) {
tracing::trace!("[Drop] drop user manager");
}
}
impl UserManager {
pub fn new(
cloud_services: Arc<dyn UserCloudServiceProvider>,
cloud_services: Weak<dyn UserCloudServiceProvider>,
store_preferences: Arc<KVStorePreferences>,
collab_builder: Weak<AppFlowyCollabBuilder>,
authenticate_user: Arc<AuthenticateUser>,
@ -82,7 +88,12 @@ impl UserManager {
});
let weak_user_manager = Arc::downgrade(&user_manager);
if let Ok(user_service) = user_manager.cloud_service.get_user_service() {
if let Ok(user_service) = user_manager
.cloud_service
.upgrade()
.ok_or_else(FlowyError::ref_drop)
.and_then(|v| v.get_user_service())
{
if let Some(mut rx) = user_service.subscribe_user_update() {
tokio::spawn(async move {
while let Some(update) = rx.recv().await {
@ -99,6 +110,13 @@ impl UserManager {
user_manager
}
pub fn cloud_service(&self) -> FlowyResult<Arc<dyn UserCloudServiceProvider>> {
self
.cloud_service
.upgrade()
.ok_or_else(FlowyError::ref_drop)
}
pub fn close_db(&self) {
if let Err(err) = self.authenticate_user.close_db() {
error!("Close db failed: {:?}", err);
@ -125,6 +143,7 @@ impl UserManager {
let user_status_callback = Arc::new(user_status_callback);
*self.user_status_callback.write().await = user_status_callback.clone();
*self.collab_interact.write().await = Arc::new(collab_interact);
let cloud_service = self.cloud_service()?;
if let Ok(session) = self.get_session() {
info!(
@ -137,9 +156,7 @@ impl UserManager {
let uid = session.user_id;
let token = self.token_from_auth_type(&auth_type)?;
self
.cloud_service
.set_server_auth_type(&auth_type, token.clone())?;
cloud_service.set_server_auth_type(&auth_type, token.clone())?;
event!(
tracing::Level::INFO,
@ -157,12 +174,12 @@ impl UserManager {
if auth_type.is_appflowy_cloud() {
let local_token = token.unwrap_or_default();
// Subscribe the token state
let weak_cloud_services = Arc::downgrade(&self.cloud_service);
let weak_cloud_services = self.cloud_service.clone();
let weak_authenticate_user = Arc::downgrade(&self.authenticate_user);
let weak_pool = Arc::downgrade(&self.db_pool(uid)?);
let workspace_id = session.workspace_id.clone();
let cloned_session = session.clone();
if let Some(mut token_state_rx) = self.cloud_service.subscribe_token_state() {
if let Some(mut token_state_rx) = cloud_service.subscribe_token_state() {
event!(tracing::Level::DEBUG, "Listen token state change");
let user_uid = uid;
tokio::spawn(async move {
@ -305,11 +322,7 @@ impl UserManager {
}
pub fn get_collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
self
.authenticate_user
.database
.get_collab_db(uid)
.map(|collab_db| Arc::downgrade(&collab_db))
self.authenticate_user.database.get_collab_db(uid)
}
#[cfg(debug_assertions)]
@ -331,10 +344,10 @@ impl UserManager {
params: SignInParams,
auth_type: AuthType,
) -> Result<UserProfile, FlowyError> {
self.cloud_service.set_server_auth_type(&auth_type, None)?;
let cloud_service = self.cloud_service()?;
cloud_service.set_server_auth_type(&auth_type, None)?;
let response: AuthResponse = self
.cloud_service
let response: AuthResponse = cloud_service
.get_user_service()?
.sign_in(BoxAny::new(params))
.await?;
@ -380,11 +393,12 @@ impl UserManager {
auth_type: AuthType,
params: BoxAny,
) -> Result<UserProfile, FlowyError> {
self.cloud_service.set_server_auth_type(&auth_type, None)?;
let cloud_service = self.cloud_service()?;
cloud_service.set_server_auth_type(&auth_type, None)?;
// sign out the current user if there is one
let migration_user = self.get_migration_user(&auth_type).await;
let auth_service = self.cloud_service.get_user_service()?;
let auth_service = cloud_service.get_user_service()?;
let response: AuthResponse = auth_service.sign_up(params).await?;
let new_user_profile = UserProfile::from((&response, &auth_type));
self
@ -462,7 +476,7 @@ impl UserManager {
pub async fn sign_out(&self) -> Result<(), FlowyError> {
if let Ok(session) = self.get_session() {
sign_out(
&self.cloud_service,
&self.cloud_service()?,
&session,
&self.authenticate_user,
self.db_connection(session.user_id)?,
@ -475,7 +489,7 @@ impl UserManager {
#[tracing::instrument(level = "info", skip(self))]
pub async fn delete_account(&self) -> Result<(), FlowyError> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.delete_account()
.await?;
@ -502,7 +516,7 @@ impl UserManager {
changeset,
)?;
self
.cloud_service
.cloud_service()?
.get_user_service()?
.update_user(params)
.await?;
@ -560,7 +574,7 @@ impl UserManager {
let uid = old_user_profile.uid;
let result: Result<UserProfile, FlowyError> = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_user_profile(uid, workspace_id)
.await;
@ -645,7 +659,7 @@ impl UserManager {
}
pub async fn receive_realtime_event(&self, json: Value) {
if let Ok(user_service) = self.cloud_service.get_user_service() {
if let Ok(user_service) = self.cloud_service().and_then(|v| v.get_user_service()) {
user_service.receive_realtime_event(json)
}
}
@ -656,11 +670,10 @@ impl UserManager {
authenticator: &AuthType,
email: &str,
) -> Result<String, FlowyError> {
self
.cloud_service
.set_server_auth_type(authenticator, None)?;
let cloud_service = self.cloud_service()?;
cloud_service.set_server_auth_type(authenticator, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let auth_service = cloud_service.get_user_service()?;
let url = auth_service.generate_sign_in_url_with_email(email).await?;
Ok(url)
}
@ -672,9 +685,9 @@ impl UserManager {
password: &str,
) -> Result<GotrueTokenResponse, FlowyError> {
self
.cloud_service
.cloud_service()?
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let auth_service = self.cloud_service()?.get_user_service()?;
let response = auth_service.sign_in_with_password(email, password).await?;
Ok(response)
}
@ -686,9 +699,9 @@ impl UserManager {
redirect_to: &str,
) -> Result<(), FlowyError> {
self
.cloud_service
.cloud_service()?
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let auth_service = self.cloud_service()?.get_user_service()?;
auth_service
.sign_in_with_magic_link(email, redirect_to)
.await?;
@ -702,9 +715,9 @@ impl UserManager {
passcode: &str,
) -> Result<GotrueTokenResponse, FlowyError> {
self
.cloud_service
.cloud_service()?
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let auth_service = self.cloud_service()?.get_user_service()?;
let response = auth_service.sign_in_with_passcode(email, passcode).await?;
Ok(response)
}
@ -715,9 +728,9 @@ impl UserManager {
oauth_provider: &str,
) -> Result<String, FlowyError> {
self
.cloud_service
.cloud_service()?
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let auth_service = self.cloud_service()?.get_user_service()?;
let url = auth_service
.generate_oauth_url_with_provider(oauth_provider)
.await?;
@ -855,7 +868,7 @@ fn mark_all_migrations_as_applied(sqlite_pool: &Arc<ConnectionPool>) {
pub(crate) fn run_data_migration(
session: &Session,
user_auth_type: &AuthType,
collab_db: Arc<CollabKVDB>,
collab_db: Weak<CollabKVDB>,
sqlite_pool: Arc<ConnectionPool>,
kv: Arc<KVStorePreferences>,
app_version: &Version,
@ -885,7 +898,13 @@ pub async fn sign_out(
authenticate_user: &AuthenticateUser,
conn: DBConnection,
) -> Result<(), FlowyError> {
info!("[Sign out] Sign out user: {}", session.user_id);
let _ = remove_user_token(session.user_id, conn);
info!(
"[Sign out] Close user related database: {}",
session.user_id
);
authenticate_user.database.close(session.user_id)?;
authenticate_user.set_session(None)?;

View file

@ -215,7 +215,7 @@ impl UserManager {
let collab_db = self.get_collab_db(session.user_id)?;
let weak_builder = self.collab_builder.clone();
let user_awareness = Arc::downgrade(&self.user_awareness);
let cloud_services = self.cloud_service.clone();
let cloud_services = self.cloud_service()?;
let authenticate_user = self.authenticate_user.clone();
let is_loading_awareness = self.is_loading_awareness.clone();

View file

@ -3,7 +3,7 @@ use client_api::entity::billing_dto::{RecurringInterval, SubscriptionPlanDetail}
use client_api::entity::billing_dto::{SubscriptionPlan, WorkspaceUsageAndLimit};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use crate::entities::{
RepeatedUserWorkspacePB, SubscribeWorkspacePB, SuccessWorkspaceSubscriptionPB,
@ -40,7 +40,9 @@ impl UserManager {
let user_collab_db = self
.authenticate_user
.database
.get_collab_db(current_session.user_id)?;
.get_collab_db(current_session.user_id)?
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Collab db not found"))?;
let cloned_current_session = current_session.clone();
let import_data = tokio::task::spawn_blocking(move || {
@ -111,7 +113,7 @@ impl UserManager {
let user_id = current_session.user_id;
let workspace_id = Uuid::parse_str(&current_session.workspace_id)?;
let weak_user_collab_db = Arc::downgrade(&user_collab_db);
let weak_user_cloud_service = self.cloud_service.get_user_service()?;
let weak_user_cloud_service = self.cloud_service()?.get_user_service()?;
match upload_collab_objects_data(
user_id,
weak_user_collab_db,
@ -139,7 +141,7 @@ impl UserManager {
pub async fn migration_anon_user_on_appflowy_cloud_sign_up(
&self,
old_user: &AnonUser,
old_collab_db: &Arc<CollabKVDB>,
old_collab_db: &Weak<CollabKVDB>,
) -> FlowyResult<()> {
let import_context = ImportedFolder {
imported_session: old_user.session.as_ref().clone(),
@ -158,13 +160,14 @@ impl UserManager {
info!("open workspace: {}, auth type:{}", workspace_id, auth_type);
let workspace_id_str = workspace_id.to_string();
let token = self.token_from_auth_type(&auth_type)?;
self.cloud_service.set_server_auth_type(&auth_type, token)?;
let cloud_service = self.cloud_service()?;
cloud_service.set_server_auth_type(&auth_type, token)?;
let uid = self.user_id()?;
let profile = self
.get_user_profile_from_disk(uid, &workspace_id_str)
.await?;
if let Err(err) = self.cloud_service.set_token(&profile.token) {
if let Err(err) = cloud_service.set_token(&profile.token) {
error!("Set token failed: {}", err);
}
@ -174,7 +177,7 @@ impl UserManager {
if err.is_record_not_found() {
sync_workspace(
workspace_id,
self.cloud_service.get_user_service()?,
cloud_service.get_user_service()?,
uid,
auth_type,
self.db_pool(uid)?,
@ -187,7 +190,7 @@ impl UserManager {
Ok(row) => {
let user_workspace = UserWorkspace::from(row);
let workspace_id = *workspace_id;
let user_service = self.cloud_service.get_user_service()?;
let user_service = cloud_service.get_user_service()?;
let pool = self.db_pool(uid)?;
tokio::spawn(async move {
let _ = sync_workspace(&workspace_id, user_service, uid, auth_type, pool).await;
@ -231,10 +234,11 @@ impl UserManager {
auth_type: AuthType,
) -> FlowyResult<UserWorkspace> {
let token = self.token_from_auth_type(&auth_type)?;
self.cloud_service.set_server_auth_type(&auth_type, token)?;
let cloud_service = self.cloud_service()?;
cloud_service.set_server_auth_type(&auth_type, token)?;
let new_workspace = self
.cloud_service
.cloud_service()?
.get_user_service()?
.create_workspace(workspace_name)
.await?;
@ -257,7 +261,7 @@ impl UserManager {
changeset: UserWorkspaceChangeset,
) -> FlowyResult<()> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.patch_workspace(workspace_id, changeset.name.clone(), changeset.icon.clone())
.await?;
@ -280,7 +284,7 @@ impl UserManager {
pub async fn leave_workspace(&self, workspace_id: &Uuid) -> FlowyResult<()> {
info!("leave workspace: {}", workspace_id);
self
.cloud_service
.cloud_service()?
.get_user_service()?
.leave_workspace(workspace_id)
.await?;
@ -300,7 +304,7 @@ impl UserManager {
pub async fn delete_workspace(&self, workspace_id: &Uuid) -> FlowyResult<()> {
info!("delete workspace: {}", workspace_id);
self
.cloud_service
.cloud_service()?
.get_user_service()?
.delete_workspace(workspace_id)
.await?;
@ -323,7 +327,7 @@ impl UserManager {
role: Role,
) -> FlowyResult<()> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.invite_workspace_member(invitee_email, workspace_id, role)
.await?;
@ -333,7 +337,7 @@ impl UserManager {
pub async fn list_pending_workspace_invitations(&self) -> FlowyResult<Vec<WorkspaceInvitation>> {
let status = Some(WorkspaceInvitationStatus::Pending);
let invitations = self
.cloud_service
.cloud_service()?
.get_user_service()?
.list_workspace_invitations(status)
.await?;
@ -342,7 +346,7 @@ impl UserManager {
pub async fn accept_workspace_invitation(&self, invite_id: String) -> FlowyResult<()> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.accept_workspace_invitations(invite_id)
.await?;
@ -355,7 +359,7 @@ impl UserManager {
workspace_id: Uuid,
) -> FlowyResult<()> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.remove_workspace_member(user_email, workspace_id)
.await?;
@ -367,7 +371,7 @@ impl UserManager {
workspace_id: Uuid,
) -> FlowyResult<Vec<WorkspaceMember>> {
let members = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_workspace_members(workspace_id)
.await?;
@ -380,7 +384,7 @@ impl UserManager {
uid: i64,
) -> FlowyResult<WorkspaceMember> {
let member = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_workspace_member(&workspace_id, uid)
.await?;
@ -394,7 +398,7 @@ impl UserManager {
role: Role,
) -> FlowyResult<()> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.update_workspace_member(user_email, workspace_id, role)
.await?;
@ -420,7 +424,10 @@ impl UserManager {
let local_workspaces = select_all_user_workspace(uid, &mut conn)?;
// 2) If both cloud service and pool are available, fire off a background sync
if let (Ok(service), Ok(pool)) = (self.cloud_service.get_user_service(), self.db_pool(uid)) {
if let (Ok(service), Ok(pool)) = (
self.cloud_service().and_then(|v| v.get_user_service()),
self.db_pool(uid),
) {
// capture only what we need
let auth_copy = auth_type;
@ -477,7 +484,7 @@ impl UserManager {
) -> FlowyResult<String> {
let workspace_id = Uuid::from_str(&workspace_subscription.workspace_id)?;
let payment_link = self
.cloud_service
.cloud_service()?
.get_user_service()?
.subscribe_workspace(
workspace_id,
@ -497,7 +504,7 @@ impl UserManager {
) -> FlowyResult<WorkspaceSubscriptionInfoPB> {
let workspace_id = Uuid::from_str(&workspace_id)?;
let subscriptions = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_workspace_subscription_one(&workspace_id)
.await?;
@ -513,7 +520,7 @@ impl UserManager {
reason: Option<String>,
) -> FlowyResult<()> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.cancel_workspace_subscription(workspace_id, plan, reason)
.await?;
@ -528,7 +535,7 @@ impl UserManager {
recurring_interval: RecurringInterval,
) -> FlowyResult<()> {
self
.cloud_service
.cloud_service()?
.get_user_service()?
.update_workspace_subscription_payment_period(workspace_id, plan, recurring_interval)
.await?;
@ -538,7 +545,7 @@ impl UserManager {
#[instrument(level = "info", skip(self), err)]
pub async fn get_subscription_plan_details(&self) -> FlowyResult<Vec<SubscriptionPlanDetail>> {
let plan_details = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_subscription_plan_details()
.await?;
@ -551,7 +558,7 @@ impl UserManager {
workspace_id: &Uuid,
) -> FlowyResult<WorkspaceUsageAndLimit> {
let workspace_usage = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_workspace_usage(workspace_id)
.await?;
@ -577,7 +584,7 @@ impl UserManager {
#[instrument(level = "info", skip(self), err)]
pub async fn get_billing_portal_url(&self) -> FlowyResult<String> {
let url = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_billing_portal_url()
.await?;
@ -589,7 +596,7 @@ impl UserManager {
updated_settings: UpdateUserWorkspaceSettingPB,
) -> FlowyResult<()> {
let workspace_id = Uuid::from_str(&updated_settings.workspace_id)?;
let cloud_service = self.cloud_service.get_user_service()?;
let cloud_service = self.cloud_service()?.get_user_service()?;
let settings = cloud_service
.update_workspace_setting(&workspace_id, updated_settings.clone().into())
.await?;
@ -629,7 +636,7 @@ impl UserManager {
// Spawn a task to sync remote settings using the helper
let pool = self.db_pool(uid)?;
let cloud_service = self.cloud_service.clone();
let cloud_service = self.cloud_service()?;
tokio::spawn(async move {
let _ = sync_workspace_settings(cloud_service, workspace_id, old_pb, uid, pool).await;
});
@ -638,7 +645,7 @@ impl UserManager {
Err(err) => {
if err.is_record_not_found() {
trace!("No workspace settings found, fetch from remote");
let service = self.cloud_service.get_user_service()?;
let service = self.cloud_service()?.get_user_service()?;
let settings = service.get_workspace_setting(workspace_id).await?;
let pb = WorkspaceSettingsPB::from(&settings);
let mut conn = self.db_connection(uid)?;
@ -691,7 +698,7 @@ impl UserManager {
) -> FlowyResult<WorkspaceMember> {
trace!("get workspace member info from remote: {}", workspace_id);
let member = self
.cloud_service
.cloud_service()?
.get_user_service()?
.get_workspace_member(workspace_id, uid)
.await?;
@ -721,7 +728,7 @@ impl UserManager {
let plans = PeriodicallyCheckBillingState::new(
workspace_id,
success.plan.map(SubscriptionPlan::from),
Arc::downgrade(&self.cloud_service),
self.cloud_service.clone(),
Arc::downgrade(&self.authenticate_user),
)
.start()