Merge pull request #7789 from AppFlowy-IO/imple_local_unsupprot

chore: implement local unsupported methods
This commit is contained in:
Nathan.fooo 2025-04-20 20:52:27 +08:00 committed by GitHub
commit c7bf8bb1ba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 484 additions and 253 deletions

View file

@ -1270,7 +1270,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"arc-swap",
@ -1295,7 +1295,7 @@ dependencies = [
[[package]]
name = "collab-database"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"async-trait",
@ -1335,7 +1335,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"arc-swap",
@ -1356,7 +1356,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"bytes",
@ -1376,7 +1376,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"arc-swap",
@ -1398,7 +1398,7 @@ dependencies = [
[[package]]
name = "collab-importer"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"async-recursion",
@ -1461,7 +1461,7 @@ dependencies = [
[[package]]
name = "collab-plugins"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"async-stream",
@ -1539,7 +1539,7 @@ dependencies = [
[[package]]
name = "collab-user"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=1920e21f47e88a238e11356be0b3ef2f3acdc23e#1920e21f47e88a238e11356be0b3ef2f3acdc23e"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3#f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3"
dependencies = [
"anyhow",
"collab",
@ -2835,6 +2835,7 @@ dependencies = [
"flowy-notification",
"flowy-search-pub",
"flowy-sqlite",
"flowy-user-pub",
"futures",
"lazy_static",
"lib-dispatch",

View file

@ -144,14 +144,14 @@ rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "1710120
# To switch to the local path, run:
# scripts/tool/update_collab_source.sh
# ⚠️⚠️⚠️️
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab-importer = { version = "0.1", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "1920e21f47e88a238e11356be0b3ef2f3acdc23e" }
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
collab-importer = { version = "0.1", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "f029a79e6112c296286cd7bb4c6dcaa4cf0d33f3" }
# Working directory: frontend
# To update the commit ID, run:

View file

@ -131,7 +131,7 @@ impl AIManager {
#[instrument(skip_all, err)]
pub async fn initialize_after_open_workspace(
&self,
_workspace_id: &str,
_workspace_id: &Uuid,
) -> Result<(), FlowyError> {
let local_ai = self.local_ai.clone();
tokio::spawn(async move {

View file

@ -31,7 +31,7 @@ impl FolderOperationHandler for ChatFolderOperation {
}
async fn duplicate_view(&self, _view_id: &Uuid) -> Result<Bytes, FlowyError> {
Err(FlowyError::not_support())
Err(FlowyError::not_support().with_context("Duplicate view"))
}
async fn create_view_with_view_data(
@ -39,7 +39,7 @@ impl FolderOperationHandler for ChatFolderOperation {
_user_id: i64,
_params: CreateViewParams,
) -> Result<Option<EncodedCollab>, FlowyError> {
Err(FlowyError::not_support())
Err(FlowyError::not_support().with_context("Can't create view"))
}
async fn create_default_view(
@ -65,7 +65,7 @@ impl FolderOperationHandler for ChatFolderOperation {
_import_type: ImportType,
_bytes: Vec<u8>,
) -> Result<Vec<ImportedData>, FlowyError> {
Err(FlowyError::not_support())
Err(FlowyError::not_support().with_context("import from data"))
}
async fn import_from_file_path(
@ -74,6 +74,6 @@ impl FolderOperationHandler for ChatFolderOperation {
_name: &str,
_path: String,
) -> Result<(), FlowyError> {
Err(FlowyError::not_support())
Err(FlowyError::not_support().with_context("import file from path"))
}
}

View file

@ -198,7 +198,9 @@ impl FolderOperationHandler for DatabaseFolderOperation {
ViewLayoutPB::Calendar => DatabaseLayoutPB::Calendar,
ViewLayoutPB::Grid => DatabaseLayoutPB::Grid,
ViewLayoutPB::Document | ViewLayoutPB::Chat => {
return Err(FlowyError::not_support());
return Err(
FlowyError::invalid_data().with_context("Can't handle document layout type"),
);
},
};
let name = params.name.to_string();

View file

@ -1,6 +1,7 @@
#![allow(unused_doc_comments)]
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_plugins::CollabKVDB;
use flowy_ai::ai_manager::AIManager;
use flowy_database2::DatabaseManager;
use flowy_document::manager::DocumentManager;
@ -252,6 +253,7 @@ impl AppFlowyCore {
.await;
let user_status_callback = UserStatusCallbackImpl {
user_manager: user_manager.clone(),
collab_builder,
folder_manager: folder_manager.clone(),
database_manager: database_manager.clone(),
@ -342,6 +344,10 @@ impl LoggedUser for ServerUserImpl {
self.upgrade_user()?.get_sqlite_connection(uid)
}
fn get_collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
self.upgrade_user()?.get_collab_db(uid)
}
fn application_root_dir(&self) -> Result<PathBuf, FlowyError> {
Ok(PathBuf::from(
self.upgrade_user()?.get_application_root_dir(),

View file

@ -4,23 +4,27 @@ use anyhow::Context;
use client_api::entity::billing_dto::SubscriptionPlan;
use tracing::{error, event, info};
use crate::server_layer::ServerProvider;
use collab_entity::CollabType;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_plugins::local_storage::kv::doc::CollabKVAction;
use collab_plugins::local_storage::kv::KVTransactionDB;
use flowy_ai::ai_manager::AIManager;
use flowy_database2::DatabaseManager;
use flowy_document::manager::DocumentManager;
use flowy_error::FlowyResult;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder::manager::{FolderInitDataSource, FolderManager};
use flowy_storage::manager::StorageManager;
use flowy_user::event_map::UserStatusCallback;
use flowy_user::user_manager::UserManager;
use flowy_user_pub::cloud::{UserCloudConfig, UserCloudServiceProvider};
use flowy_user_pub::entities::{AuthType, UserProfile, UserWorkspace};
use lib_dispatch::runtime::AFPluginRuntime;
use lib_infra::async_trait::async_trait;
use crate::server_layer::ServerProvider;
use uuid::Uuid;
pub(crate) struct UserStatusCallbackImpl {
pub(crate) user_manager: Arc<UserManager>,
pub(crate) collab_builder: Arc<AppFlowyCollabBuilder>,
pub(crate) folder_manager: Arc<FolderManager>,
pub(crate) database_manager: Arc<DatabaseManager>,
@ -42,6 +46,42 @@ impl UserStatusCallbackImpl {
}
});
}
async fn folder_init_data_source(
&self,
user_id: i64,
workspace_id: &Uuid,
auth_type: &AuthType,
) -> FlowyResult<FolderInitDataSource> {
if self.is_object_exist_on_disk(user_id, workspace_id, workspace_id)? {
return Ok(FolderInitDataSource::LocalDisk {
create_if_not_exist: false,
});
}
let doc_state_result = self
.folder_manager
.cloud_service
.get_folder_doc_state(workspace_id, user_id, CollabType::Folder, workspace_id)
.await;
resolve_data_source(auth_type, doc_state_result)
}
fn is_object_exist_on_disk(
&self,
user_id: i64,
workspace_id: &Uuid,
object_id: &Uuid,
) -> FlowyResult<bool> {
let db = self
.user_manager
.get_collab_db(user_id)?
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Collab db is not initialized"))?;
let read = db.read_txn();
let workspace_id = workspace_id.to_string();
let object_id = object_id.to_string();
Ok(read.is_exist(user_id, &workspace_id, &object_id))
}
}
#[async_trait]
@ -101,9 +141,13 @@ impl UserStatusCallback for UserStatusCallbackImpl {
user_workspace,
device_id
);
let workspace_id = user_workspace.workspace_id()?;
let data_source = self
.folder_init_data_source(user_id, &workspace_id, auth_type)
.await?;
self
.folder_manager
.initialize_after_sign_in(user_id)
.initialize_after_sign_in(user_id, data_source)
.await?;
self
.database_manager
@ -135,37 +179,9 @@ impl UserStatusCallback for UserStatusCallbackImpl {
device_id
);
let workspace_id = user_workspace.workspace_id()?;
// In the current implementation, when a user signs up for AppFlowy Cloud, a default workspace
// is automatically created for them. However, for users who sign up through Supabase, the creation
// of the default workspace relies on the client-side operation. This means that the process
// for initializing a default workspace differs depending on the sign-up method used.
let data_source = match self
.folder_manager
.cloud_service
.get_folder_doc_state(
&workspace_id,
user_profile.uid,
CollabType::Folder,
&workspace_id,
)
.await
{
Ok(doc_state) => match auth_type {
AuthType::Local => FolderInitDataSource::LocalDisk {
create_if_not_exist: true,
},
AuthType::AppFlowyCloud => FolderInitDataSource::Cloud(doc_state),
},
Err(err) => match auth_type {
AuthType::Local => FolderInitDataSource::LocalDisk {
create_if_not_exist: true,
},
AuthType::AppFlowyCloud => {
return Err(err);
},
},
};
let data_source = self
.folder_init_data_source(user_profile.uid, &workspace_id, auth_type)
.await?;
self
.folder_manager
@ -204,12 +220,17 @@ impl UserStatusCallback for UserStatusCallbackImpl {
async fn on_workspace_opened(
&self,
user_id: i64,
user_workspace: &UserWorkspace,
workspace_id: &Uuid,
_user_workspace: &UserWorkspace,
auth_type: &AuthType,
) -> FlowyResult<()> {
let data_source = self
.folder_init_data_source(user_id, workspace_id, auth_type)
.await?;
self
.folder_manager
.initialize_after_open_workspace(user_id)
.initialize_after_open_workspace(user_id, data_source)
.await?;
self
.database_manager
@ -221,11 +242,11 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.await?;
self
.ai_manager
.initialize_after_open_workspace(&user_workspace.id)
.initialize_after_open_workspace(workspace_id)
.await?;
self
.storage_manager
.initialize_after_open_workspace(&user_workspace.id)
.initialize_after_open_workspace(workspace_id)
.await;
Ok(())
}
@ -257,3 +278,23 @@ impl UserStatusCallback for UserStatusCallbackImpl {
}
}
}
fn resolve_data_source(
auth_type: &AuthType,
doc_state_result: Result<Vec<u8>, FlowyError>,
) -> FlowyResult<FolderInitDataSource> {
match doc_state_result {
Ok(doc_state) => Ok(match auth_type {
AuthType::Local => FolderInitDataSource::LocalDisk {
create_if_not_exist: true,
},
AuthType::AppFlowyCloud => FolderInitDataSource::Cloud(doc_state),
}),
Err(err) => match auth_type {
AuthType::Local => Ok(FolderInitDataSource::LocalDisk {
create_if_not_exist: true,
}),
AuthType::AppFlowyCloud => Err(err),
},
}
}

View file

@ -14,6 +14,7 @@ collab-plugins = { workspace = true }
collab-integrate = { workspace = true }
flowy-folder-pub = { workspace = true }
flowy-search-pub = { workspace = true }
flowy-user-pub = { workspace = true }
flowy-sqlite = { workspace = true }
flowy-derive.workspace = true
flowy-notification = { workspace = true }

View file

@ -262,16 +262,20 @@ impl FolderManager {
/// Initialize the folder with the given workspace id.
/// Fetch the folder updates from the cloud service and initialize the folder.
#[tracing::instrument(skip(self, user_id), err)]
pub async fn initialize_after_sign_in(&self, user_id: i64) -> FlowyResult<()> {
#[tracing::instrument(skip_all, err)]
pub async fn initialize_after_sign_in(
&self,
user_id: i64,
data_source: FolderInitDataSource,
) -> FlowyResult<()> {
let workspace_id = self.user.workspace_id()?;
let object_id = &workspace_id;
let is_exist = self
.user
.is_folder_exist_on_disk(user_id, &workspace_id)
.unwrap_or(false);
if is_exist {
if let Err(err) = self.initialize(user_id, &workspace_id, data_source).await {
// If failed to open folder with remote data, open from local disk. After open from the local
// disk. the data will be synced to the remote server.
error!(
"initialize folder for user {} with workspace {} encountered error: {:?}, fallback local",
user_id, workspace_id, err
);
self
.initialize(
user_id,
@ -281,39 +285,17 @@ impl FolderManager {
},
)
.await?;
} else {
let folder_doc_state = self
.cloud_service
.get_folder_doc_state(&workspace_id, user_id, CollabType::Folder, object_id)
.await?;
if let Err(err) = self
.initialize(
user_id,
&workspace_id,
FolderInitDataSource::Cloud(folder_doc_state),
)
.await
{
// If failed to open folder with remote data, open from local disk. After open from the local
// disk. the data will be synced to the remote server.
error!("initialize folder with error {:?}, fallback local", err);
self
.initialize(
user_id,
&workspace_id,
FolderInitDataSource::LocalDisk {
create_if_not_exist: false,
},
)
.await?;
}
}
Ok(())
}
pub async fn initialize_after_open_workspace(&self, uid: i64) -> FlowyResult<()> {
self.initialize_after_sign_in(uid).await
pub async fn initialize_after_open_workspace(
&self,
uid: i64,
data_source: FolderInitDataSource,
) -> FlowyResult<()> {
self.initialize_after_sign_in(uid, data_source).await
}
/// Initialize the folder for the new user.
@ -2139,6 +2121,7 @@ pub(crate) fn get_workspace_private_view_pbs(workspace_id: &Uuid, folder: &Folde
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum FolderInitDataSource {
/// It means using the data stored on local disk to initialize the folder
LocalDisk { create_if_not_exist: bool },

View file

@ -1,3 +1,4 @@
use collab_plugins::CollabKVDB;
use flowy_ai::ai_manager::AIUserService;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
@ -21,6 +22,9 @@ pub trait LoggedUser: Send + Sync {
async fn is_local_mode(&self) -> FlowyResult<bool>;
fn get_sqlite_db(&self, uid: i64) -> Result<DBConnection, FlowyError>;
fn get_collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError>;
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}

View file

@ -217,10 +217,10 @@ where
chat_id: &Uuid,
metadata: Option<HashMap<String, Value>>,
) -> Result<(), FlowyError> {
return Err(
Err(
FlowyError::not_support()
.with_context("indexing file with appflowy cloud is not suppotred yet"),
);
)
}
async fn get_chat_settings(

View file

@ -13,8 +13,8 @@ use client_api::entity::workspace_dto::{
WorkspaceMemberInvitation,
};
use client_api::entity::{
AFRole, AFWorkspace, AFWorkspaceInvitation, AFWorkspaceSettings, AFWorkspaceSettingsChange,
AuthProvider, CollabParams, CreateCollabParams, GotrueTokenResponse, QueryWorkspaceMember,
AFWorkspace, AFWorkspaceInvitation, AFWorkspaceSettings, AFWorkspaceSettingsChange, AuthProvider,
CollabParams, CreateCollabParams, GotrueTokenResponse, QueryWorkspaceMember,
};
use client_api::entity::{QueryCollab, QueryCollabParams};
use client_api::{Client, ClientConfiguration};
@ -341,18 +341,6 @@ where
Ok(members)
}
async fn get_workspace_member(
&self,
workspace_id: Uuid,
uid: i64,
) -> Result<WorkspaceMember, FlowyError> {
let try_get_client = self.server.try_get_client();
let client = try_get_client?;
let query = QueryWorkspaceMember { workspace_id, uid };
let member = client.get_workspace_member(query).await?;
Ok(from_af_workspace_member(member))
}
#[instrument(level = "debug", skip_all)]
async fn get_user_awareness_doc_state(
&self,
@ -452,7 +440,7 @@ where
Ok(payment_link)
}
async fn get_workspace_member_info(
async fn get_workspace_member(
&self,
workspace_id: &Uuid,
uid: i64,
@ -464,17 +452,8 @@ where
uid,
};
let member = client.get_workspace_member(params).await?;
let role = match member.role {
AFRole::Owner => Role::Owner,
AFRole::Member => Role::Member,
AFRole::Guest => Role::Guest,
};
Ok(WorkspaceMember {
email: member.email,
role,
name: member.name,
avatar_url: member.avatar_url,
})
Ok(from_af_workspace_member(member))
}
async fn get_workspace_subscriptions(

View file

@ -8,7 +8,7 @@ use flowy_ai_pub::cloud::chat_dto::{ChatAuthor, ChatAuthorType};
use flowy_ai_pub::cloud::{
AIModel, AppErrorCode, AppResponseError, ChatCloudService, ChatMessage, ChatMessageType,
ChatSettings, CompleteTextParams, MessageCursor, ModelList, RelatedQuestion, RepeatedChatMessage,
ResponseFormat, StreamAnswer, StreamComplete, UpdateChatParams,
ResponseFormat, StreamAnswer, StreamComplete, UpdateChatParams, DEFAULT_AI_MODEL_NAME,
};
use flowy_ai_pub::persistence::{
deserialize_chat_metadata, deserialize_rag_ids, read_chat,
@ -28,14 +28,14 @@ use tracing::trace;
use uuid::Uuid;
pub struct LocalChatServiceImpl {
pub user: Arc<dyn LoggedUser>,
pub logged_user: Arc<dyn LoggedUser>,
pub local_ai: Arc<LocalAIController>,
}
impl LocalChatServiceImpl {
fn get_message_content(&self, message_id: i64) -> FlowyResult<String> {
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let db = self.logged_user.get_sqlite_db(uid)?;
let content = select_message_content(db, message_id)?.ok_or_else(|| {
FlowyError::record_not_found().with_context(format!("Message not found: {}", message_id))
})?;
@ -43,8 +43,8 @@ impl LocalChatServiceImpl {
}
async fn upsert_message(&self, chat_id: &Uuid, message: ChatMessage) -> Result<(), FlowyError> {
let uid = self.user.user_id()?;
let conn = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let conn = self.logged_user.get_sqlite_db(uid)?;
let row = ChatMessageTable::from_message(chat_id.to_string(), message, true);
upsert_chat_messages(conn, &[row])?;
Ok(())
@ -62,8 +62,8 @@ impl ChatCloudService for LocalChatServiceImpl {
_name: &str,
metadata: Value,
) -> Result<(), FlowyError> {
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let db = self.logged_user.get_sqlite_db(uid)?;
let row = ChatTable::new(chat_id.to_string(), metadata, rag_ids, true);
upsert_chat(db, &row)?;
Ok(())
@ -139,8 +139,8 @@ impl ChatCloudService for LocalChatServiceImpl {
chat_id: &Uuid,
question_id: i64,
) -> Result<ChatMessage, FlowyError> {
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let db = self.logged_user.get_sqlite_db(uid)?;
match select_answer_where_match_reply_message_id(db, &chat_id.to_string(), question_id)? {
None => Err(FlowyError::record_not_found()),
@ -156,8 +156,8 @@ impl ChatCloudService for LocalChatServiceImpl {
limit: u64,
) -> Result<RepeatedChatMessage, FlowyError> {
let chat_id = chat_id.to_string();
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let db = self.logged_user.get_sqlite_db(uid)?;
let result = select_chat_messages(db, &chat_id, limit, offset)?;
let messages = result
@ -180,8 +180,8 @@ impl ChatCloudService for LocalChatServiceImpl {
answer_message_id: i64,
) -> Result<ChatMessage, FlowyError> {
let chat_id = chat_id.to_string();
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let db = self.logged_user.get_sqlite_db(uid)?;
let row = select_answer_where_match_reply_message_id(db, &chat_id, answer_message_id)?
.map(chat_message_from_row)
.ok_or_else(FlowyError::record_not_found)?;
@ -278,8 +278,8 @@ impl ChatCloudService for LocalChatServiceImpl {
chat_id: &Uuid,
) -> Result<ChatSettings, FlowyError> {
let chat_id = chat_id.to_string();
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let db = self.logged_user.get_sqlite_db(uid)?;
let row = read_chat(db, &chat_id)?;
let rag_ids = deserialize_rag_ids(&row.rag_ids);
let metadata = deserialize_chat_metadata::<Value>(&row.metadata);
@ -298,8 +298,8 @@ impl ChatCloudService for LocalChatServiceImpl {
id: &Uuid,
s: UpdateChatParams,
) -> Result<(), FlowyError> {
let uid = self.user.user_id()?;
let mut db = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let mut db = self.logged_user.get_sqlite_db(uid)?;
let changeset = ChatTableChangeset {
chat_id: id.to_string(),
name: s.name,
@ -313,11 +313,11 @@ impl ChatCloudService for LocalChatServiceImpl {
}
async fn get_available_models(&self, _workspace_id: &Uuid) -> Result<ModelList, FlowyError> {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
Ok(ModelList { models: vec![] })
}
async fn get_workspace_default_model(&self, _workspace_id: &Uuid) -> Result<String, FlowyError> {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
Ok(DEFAULT_AI_MODEL_NAME.to_string())
}
}

View file

@ -1,16 +1,18 @@
#![allow(unused_variables)]
use crate::af_cloud::define::LoggedUser;
use crate::local_server::util::default_encode_collab_for_collab_type;
use collab::entity::EncodedCollab;
use collab_database::database::default_database_data;
use collab_database::workspace_database::default_workspace_database_data;
use collab_document::document_data::default_document_collab_data;
use collab_entity::CollabType;
use collab_user::core::default_user_awareness_data;
use flowy_database_pub::cloud::{DatabaseCloudService, DatabaseSnapshot, EncodeCollabByOid};
use flowy_error::FlowyError;
use flowy_error::{ErrorCode, FlowyError};
use lib_infra::async_trait::async_trait;
use std::sync::Arc;
use uuid::Uuid;
pub(crate) struct LocalServerDatabaseCloudServiceImpl();
pub(crate) struct LocalServerDatabaseCloudServiceImpl {
pub logged_user: Arc<dyn LoggedUser>,
}
#[async_trait]
impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
@ -18,24 +20,20 @@ impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
&self,
object_id: &Uuid,
collab_type: CollabType,
workspace_id: &Uuid,
_workspace_id: &Uuid, // underscore to silence “unused” warning
) -> Result<Option<EncodedCollab>, FlowyError> {
let uid = self.logged_user.user_id()?;
let object_id = object_id.to_string();
match collab_type {
CollabType::Document => {
let encode_collab = default_document_collab_data(&object_id)?;
Ok(Some(encode_collab))
},
CollabType::Database => default_database_data(&object_id)
.await
.map(Some)
.map_err(Into::into),
CollabType::WorkspaceDatabase => Ok(Some(default_workspace_database_data(&object_id))),
CollabType::Folder => Ok(None),
CollabType::DatabaseRow => Ok(None),
CollabType::UserAwareness => Ok(Some(default_user_awareness_data(&object_id))),
CollabType::Unknown => Ok(None),
}
default_encode_collab_for_collab_type(uid, &object_id, collab_type)
.await
.map(Some)
.or_else(|err| {
if matches!(err.code, ErrorCode::NotSupportYet) {
Ok(None)
} else {
Err(err)
}
})
}
async fn create_database_encode_collab(

View file

@ -1,8 +1,14 @@
#![allow(unused_variables)]
use crate::af_cloud::define::LoggedUser;
use crate::local_server::util::default_encode_collab_for_collab_type;
use client_api::entity::workspace_dto::PublishInfoView;
use client_api::entity::PublishInfo;
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
use collab_entity::CollabType;
use collab_plugins::local_storage::kv::doc::CollabKVAction;
use collab_plugins::local_storage::kv::KVTransactionDB;
use flowy_error::FlowyError;
use flowy_folder_pub::cloud::{
gen_workspace_id, FolderCloudService, FolderCollabParams, FolderData, FolderSnapshot,
@ -10,9 +16,13 @@ use flowy_folder_pub::cloud::{
};
use flowy_folder_pub::entities::PublishPayload;
use lib_infra::async_trait::async_trait;
use std::sync::Arc;
use uuid::Uuid;
pub(crate) struct LocalServerFolderCloudServiceImpl;
pub(crate) struct LocalServerFolderCloudServiceImpl {
#[allow(dead_code)]
pub logged_user: Arc<dyn LoggedUser>,
}
#[async_trait]
impl FolderCloudService for LocalServerFolderCloudServiceImpl {
@ -56,7 +66,27 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl {
collab_type: CollabType,
object_id: &Uuid,
) -> Result<Vec<u8>, FlowyError> {
Err(FlowyError::local_version_not_support())
let object_id = object_id.to_string();
let workspace_id = workspace_id.to_string();
let collab_db = self.logged_user.get_collab_db(uid)?.upgrade().unwrap();
let read_txn = collab_db.read_txn();
let is_exist = read_txn.is_exist(uid, &workspace_id.to_string(), &object_id.to_string());
if is_exist {
// load doc
let collab = Collab::new_with_origin(CollabOrigin::Empty, &object_id, vec![], false);
read_txn.load_doc(uid, &workspace_id, &object_id, collab.doc())?;
let data = collab.encode_collab_v1(|c| {
collab_type
.validate_require_data(c)
.map_err(|err| FlowyError::invalid_data().with_context(err))?;
Ok::<_, FlowyError>(())
})?;
Ok(data.doc_state.to_vec())
} else {
let data = default_encode_collab_for_collab_type(uid, &object_id, collab_type).await?;
drop(read_txn);
Ok(data.doc_state.to_vec())
}
}
async fn batch_create_folder_collab_objects(

View file

@ -1,32 +1,38 @@
#![allow(unused_variables)]
use crate::af_cloud::define::LoggedUser;
use crate::local_server::uid::UserIDGenerator;
use client_api::entity::GotrueTokenResponse;
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
use collab_entity::CollabObject;
use collab_user::core::UserAwareness;
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::af_cloud::define::LoggedUser;
use crate::local_server::uid::UserIDGenerator;
use flowy_ai_pub::cloud::billing_dto::WorkspaceUsageAndLimit;
use flowy_ai_pub::cloud::{AFWorkspaceSettings, AFWorkspaceSettingsChange};
use flowy_error::FlowyError;
use flowy_user_pub::cloud::{UserCloudService, UserCollabParams};
use flowy_user_pub::entities::*;
use flowy_user_pub::sql::{select_all_user_workspace, select_user_profile, select_user_workspace};
use flowy_user_pub::sql::{
select_all_user_workspace, select_user_profile, select_user_workspace, select_workspace_member,
select_workspace_setting, update_user_profile, update_workspace_setting, upsert_workspace_member,
upsert_workspace_setting, UserTableChangeset, WorkspaceMemberTable, WorkspaceSettingsChangeset,
WorkspaceSettingsTable,
};
use flowy_user_pub::DEFAULT_USER_NAME;
use lazy_static::lazy_static;
use lib_infra::async_trait::async_trait;
use lib_infra::box_any::BoxAny;
use lib_infra::util::timestamp;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
lazy_static! {
static ref ID_GEN: Mutex<UserIDGenerator> = Mutex::new(UserIDGenerator::new(1));
}
pub(crate) struct LocalServerUserServiceImpl {
pub user: Arc<dyn LoggedUser>,
pub logged_user: Arc<dyn LoggedUser>,
}
#[async_trait]
@ -121,26 +127,30 @@ impl UserCloudService for LocalServerUserServiceImpl {
Err(FlowyError::internal().with_context("Can't oauth url when using offline mode"))
}
async fn update_user(&self, _params: UpdateUserProfileParams) -> Result<(), FlowyError> {
async fn update_user(&self, params: UpdateUserProfileParams) -> Result<(), FlowyError> {
let uid = self.logged_user.user_id()?;
let mut conn = self.logged_user.get_sqlite_db(uid)?;
let changeset = UserTableChangeset::new(params);
update_user_profile(&mut conn, changeset)?;
Ok(())
}
async fn get_user_profile(&self, uid: i64) -> Result<UserProfile, FlowyError> {
let conn = self.user.get_sqlite_db(uid)?;
let profile = select_user_profile(uid, conn)?;
let mut conn = self.logged_user.get_sqlite_db(uid)?;
let profile = select_user_profile(uid, &mut conn)?;
Ok(profile)
}
async fn open_workspace(&self, workspace_id: &Uuid) -> Result<UserWorkspace, FlowyError> {
let uid = self.user.user_id()?;
let mut conn = self.user.get_sqlite_db(uid)?;
let uid = self.logged_user.user_id()?;
let mut conn = self.logged_user.get_sqlite_db(uid)?;
let workspace = select_user_workspace(&workspace_id.to_string(), &mut conn)?;
Ok(UserWorkspace::from(workspace))
}
async fn get_all_workspace(&self, uid: i64) -> Result<Vec<UserWorkspace>, FlowyError> {
let conn = self.user.get_sqlite_db(uid)?;
let conn = self.logged_user.get_sqlite_db(uid)?;
let workspaces = select_all_user_workspace(uid, conn)?;
Ok(workspaces)
}
@ -198,4 +208,117 @@ impl UserCloudService for LocalServerUserServiceImpl {
) -> Result<(), FlowyError> {
Ok(())
}
async fn get_workspace_member(
&self,
workspace_id: &Uuid,
uid: i64,
) -> Result<WorkspaceMember, FlowyError> {
// For local server, only current user is the member
let conn = self.logged_user.get_sqlite_db(uid)?;
let result = select_workspace_member(conn, &workspace_id.to_string(), uid);
match result {
Ok(row) => Ok(WorkspaceMember::from(row)),
Err(err) => {
if err.is_record_not_found() {
let mut conn = self.logged_user.get_sqlite_db(uid)?;
let profile = select_user_profile(uid, &mut conn)?;
let row = WorkspaceMemberTable {
email: profile.email.to_string(),
role: 0,
name: profile.name.to_string(),
avatar_url: Some(profile.icon_url),
uid,
workspace_id: workspace_id.to_string(),
updated_at: Default::default(),
};
let member = WorkspaceMember::from(row.clone());
upsert_workspace_member(&mut conn, row)?;
Ok(member)
} else {
Err(err)
}
},
}
}
async fn get_workspace_usage(
&self,
workspace_id: &Uuid,
) -> Result<WorkspaceUsageAndLimit, FlowyError> {
Ok(WorkspaceUsageAndLimit {
member_count: 1,
member_count_limit: 1,
storage_bytes: i64::MAX,
storage_bytes_limit: i64::MAX,
storage_bytes_unlimited: true,
single_upload_limit: i64::MAX,
single_upload_unlimited: true,
ai_responses_count: i64::MAX,
ai_responses_count_limit: i64::MAX,
ai_image_responses_count: i64::MAX,
ai_image_responses_count_limit: 0,
local_ai: true,
ai_responses_unlimited: true,
})
}
async fn get_workspace_setting(
&self,
workspace_id: &Uuid,
) -> Result<AFWorkspaceSettings, FlowyError> {
let uid = self.logged_user.user_id()?;
let mut conn = self.logged_user.get_sqlite_db(uid)?;
// By default, workspace setting is existed in local server
let result = select_workspace_setting(&mut conn, &workspace_id.to_string());
match result {
Ok(row) => Ok(AFWorkspaceSettings {
disable_search_indexing: row.disable_search_indexing,
ai_model: row.ai_model,
}),
Err(err) => {
if err.is_record_not_found() {
let row = WorkspaceSettingsTable {
id: workspace_id.to_string(),
disable_search_indexing: false,
ai_model: "".to_string(),
};
let setting = AFWorkspaceSettings {
disable_search_indexing: row.disable_search_indexing,
ai_model: row.ai_model.clone(),
};
upsert_workspace_setting(&mut conn, row)?;
Ok(setting)
} else {
Err(err)
}
},
}
}
async fn update_workspace_setting(
&self,
workspace_id: &Uuid,
workspace_settings: AFWorkspaceSettingsChange,
) -> Result<AFWorkspaceSettings, FlowyError> {
let uid = self.logged_user.user_id()?;
let mut conn = self.logged_user.get_sqlite_db(uid)?;
let changeset = WorkspaceSettingsChangeset {
id: workspace_id.to_string(),
disable_search_indexing: workspace_settings.disable_search_indexing,
ai_model: workspace_settings.ai_model,
};
update_workspace_setting(&mut conn, changeset)?;
let row = select_workspace_setting(&mut conn, &workspace_id.to_string())?;
Ok(AFWorkspaceSettings {
disable_search_indexing: row.disable_search_indexing,
ai_model: row.ai_model,
})
}
}

View file

@ -3,3 +3,4 @@ pub use server::*;
pub mod impls;
mod server;
pub(crate) mod uid;
mod util;

View file

@ -17,15 +17,15 @@ use flowy_user_pub::cloud::UserCloudService;
use tokio::sync::mpsc;
pub struct LocalServer {
user: Arc<dyn LoggedUser>,
logged_user: Arc<dyn LoggedUser>,
local_ai: Arc<LocalAIController>,
stop_tx: Option<mpsc::Sender<()>>,
}
impl LocalServer {
pub fn new(user: Arc<dyn LoggedUser>, local_ai: Arc<LocalAIController>) -> Self {
pub fn new(logged_user: Arc<dyn LoggedUser>, local_ai: Arc<LocalAIController>) -> Self {
Self {
user,
logged_user,
local_ai,
stop_tx: Default::default(),
}
@ -42,16 +42,20 @@ impl LocalServer {
impl AppFlowyServer for LocalServer {
fn user_service(&self) -> Arc<dyn UserCloudService> {
Arc::new(LocalServerUserServiceImpl {
user: self.user.clone(),
logged_user: self.logged_user.clone(),
})
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
Arc::new(LocalServerFolderCloudServiceImpl)
Arc::new(LocalServerFolderCloudServiceImpl {
logged_user: self.logged_user.clone(),
})
}
fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
Arc::new(LocalServerDatabaseCloudServiceImpl())
Arc::new(LocalServerDatabaseCloudServiceImpl {
logged_user: self.logged_user.clone(),
})
}
fn database_ai_service(&self) -> Option<Arc<dyn DatabaseAIService>> {
@ -64,7 +68,7 @@ impl AppFlowyServer for LocalServer {
fn chat_service(&self) -> Arc<dyn ChatCloudService> {
Arc::new(LocalChatServiceImpl {
user: self.user.clone(),
logged_user: self.logged_user.clone(),
local_ai: self.local_ai.clone(),
})
}

View file

@ -0,0 +1,47 @@
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::preclude::Collab;
use collab_database::database::default_database_data;
use collab_database::workspace_database::default_workspace_database_data;
use collab_document::document_data::default_document_collab_data;
use collab_entity::CollabType;
use collab_user::core::default_user_awareness_data;
use flowy_error::{FlowyError, FlowyResult};
pub async fn default_encode_collab_for_collab_type(
_uid: i64,
object_id: &str,
collab_type: CollabType,
) -> FlowyResult<EncodedCollab> {
match collab_type {
CollabType::Document => {
let encode_collab = default_document_collab_data(object_id)?;
Ok(encode_collab)
},
CollabType::Database => default_database_data(object_id).await.map_err(Into::into),
CollabType::WorkspaceDatabase => Ok(default_workspace_database_data(object_id)),
CollabType::Folder => {
// let collab = Collab::new_with_origin(CollabOrigin::Empty, object_id, vec![], false);
// let workspace = Workspace::new(object_id.to_string(), "".to_string(), uid);
// let folder_data = FolderData::new(workspace);
// let folder = Folder::create(uid, collab, None, folder_data);
// let data = folder.encode_collab_v1(|c| {
// collab_type
// .validate_require_data(c)
// .map_err(|err| FlowyError::invalid_data().with_context(err))?;
// Ok::<_, FlowyError>(())
// })?;
// Ok(data)
Err(FlowyError::not_support().with_context("Can not create default folder"))
},
CollabType::DatabaseRow => {
Err(FlowyError::not_support().with_context("Can not create default database row"))
},
CollabType::UserAwareness => Ok(default_user_awareness_data(object_id)),
CollabType::Unknown => {
let collab = Collab::new_with_origin(CollabOrigin::Empty, object_id, vec![], false);
let data = collab.encode_collab_v1(|_| Ok::<_, FlowyError>(()))?;
Ok(data)
},
}
}

View file

@ -1,10 +1,10 @@
use client_api::ClientConfiguration;
use collab_plugins::CollabKVDB;
use flowy_error::{FlowyError, FlowyResult};
use semver::Version;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use flowy_error::{FlowyError, FlowyResult};
use std::sync::{Arc, Weak};
use uuid::Uuid;
use crate::setup_log;
@ -61,6 +61,10 @@ impl LoggedUser for FakeServerUserImpl {
todo!()
}
fn get_collab_db(&self, _uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
todo!()
}
fn application_root_dir(&self) -> Result<PathBuf, FlowyError> {
todo!()
}

View file

@ -181,7 +181,7 @@ impl StorageManager {
}
}
pub async fn initialize_after_open_workspace(&self, workspace_id: &str) {
pub async fn initialize_after_open_workspace(&self, workspace_id: &Uuid) {
self.enable_storage_write_access();
if let Err(err) = prepare_upload_task(self.uploader.clone(), self.user_service.clone()).await {

View file

@ -132,7 +132,7 @@ pub trait UserCloudService: Send + Sync + 'static {
/// Delete an account and all the data associated with the account
async fn delete_account(&self) -> Result<(), FlowyError> {
Err(FlowyError::not_support())
Ok(())
}
/// Generate a sign in url for the user with the given email
@ -234,14 +234,6 @@ pub trait UserCloudService: Send + Sync + 'static {
Ok(vec![])
}
async fn get_workspace_member(
&self,
workspace_id: Uuid,
uid: i64,
) -> Result<WorkspaceMember, FlowyError> {
Err(FlowyError::not_support())
}
async fn get_user_awareness_doc_state(
&self,
uid: i64,
@ -281,14 +273,11 @@ pub trait UserCloudService: Send + Sync + 'static {
Err(FlowyError::not_support())
}
async fn get_workspace_member_info(
async fn get_workspace_member(
&self,
workspace_id: &Uuid,
uid: i64,
) -> Result<WorkspaceMember, FlowyError> {
Err(FlowyError::not_support())
}
) -> Result<WorkspaceMember, FlowyError>;
/// Get all subscriptions for all workspaces for a user (email)
async fn get_workspace_subscriptions(
&self,
@ -323,9 +312,7 @@ pub trait UserCloudService: Send + Sync + 'static {
async fn get_workspace_usage(
&self,
workspace_id: &Uuid,
) -> Result<WorkspaceUsageAndLimit, FlowyError> {
Err(FlowyError::not_support())
}
) -> Result<WorkspaceUsageAndLimit, FlowyError>;
async fn get_billing_portal_url(&self) -> Result<String, FlowyError> {
Err(FlowyError::not_support())
@ -337,27 +324,23 @@ pub trait UserCloudService: Send + Sync + 'static {
plan: SubscriptionPlan,
recurring_interval: RecurringInterval,
) -> Result<(), FlowyError> {
Err(FlowyError::not_support())
Ok(())
}
async fn get_subscription_plan_details(&self) -> Result<Vec<SubscriptionPlanDetail>, FlowyError> {
Err(FlowyError::not_support())
Ok(vec![])
}
async fn get_workspace_setting(
&self,
workspace_id: &Uuid,
) -> Result<AFWorkspaceSettings, FlowyError> {
Err(FlowyError::not_support())
}
) -> Result<AFWorkspaceSettings, FlowyError>;
async fn update_workspace_setting(
&self,
workspace_id: &Uuid,
workspace_settings: AFWorkspaceSettingsChange,
) -> Result<AFWorkspaceSettings, FlowyError> {
Err(FlowyError::not_support())
}
) -> Result<AFWorkspaceSettings, FlowyError>;
}
pub type UserUpdateReceiver = tokio::sync::mpsc::Receiver<UserUpdate>;

View file

@ -1,10 +1,11 @@
use crate::entities::{Role, WorkspaceMember};
use diesel::{insert_into, RunQueryDsl};
use flowy_error::FlowyResult;
use flowy_sqlite::schema::workspace_members_table;
use flowy_sqlite::schema::workspace_members_table::dsl;
use flowy_sqlite::{prelude::*, DBConnection, ExpressionMethods};
#[derive(Queryable, Insertable, AsChangeset, Debug)]
#[derive(Queryable, Insertable, AsChangeset, Debug, Clone)]
#[diesel(table_name = workspace_members_table)]
#[diesel(primary_key(email, workspace_id))]
pub struct WorkspaceMemberTable {
@ -17,8 +18,19 @@ pub struct WorkspaceMemberTable {
pub updated_at: chrono::NaiveDateTime,
}
impl From<WorkspaceMemberTable> for WorkspaceMember {
fn from(value: WorkspaceMemberTable) -> Self {
Self {
email: value.email,
role: Role::from(value.role),
name: value.name,
avatar_url: value.avatar_url,
}
}
}
pub fn upsert_workspace_member<T: Into<WorkspaceMemberTable>>(
mut conn: DBConnection,
conn: &mut SqliteConnection,
member: T,
) -> FlowyResult<()> {
let member = member.into();
@ -31,7 +43,7 @@ pub fn upsert_workspace_member<T: Into<WorkspaceMemberTable>>(
))
.do_update()
.set(&member)
.execute(&mut conn)?;
.execute(conn)?;
Ok(())
}

View file

@ -92,10 +92,24 @@ impl From<UserUpdate> for UserTableChangeset {
}
}
pub fn select_user_profile(uid: i64, mut conn: DBConnection) -> Result<UserProfile, FlowyError> {
pub fn update_user_profile(
conn: &mut SqliteConnection,
changeset: UserTableChangeset,
) -> Result<(), FlowyError> {
let user_id = changeset.id.clone();
update(user_table::dsl::user_table.filter(user_table::id.eq(&user_id)))
.set(changeset)
.execute(conn)?;
Ok(())
}
pub fn select_user_profile(
uid: i64,
conn: &mut SqliteConnection,
) -> Result<UserProfile, FlowyError> {
let user: UserProfile = user_table::dsl::user_table
.filter(user_table::id.eq(&uid.to_string()))
.first::<UserTable>(&mut *conn)
.first::<UserTable>(conn)
.map_err(|err| {
FlowyError::record_not_found().with_context(format!(
"Can't find the user profile for user id: {}, error: {:?}",

View file

@ -45,7 +45,7 @@ pub fn update_workspace_setting(
/// Upserts a workspace setting into the database.
pub fn upsert_workspace_setting(
conn: &mut DBConnection,
conn: &mut SqliteConnection,
settings: WorkspaceSettingsTable,
) -> Result<(), FlowyError> {
diesel::insert_into(dsl::workspace_setting_table)
@ -62,11 +62,11 @@ pub fn upsert_workspace_setting(
/// Selects a workspace setting by id from the database.
pub fn select_workspace_setting(
conn: &mut DBConnection,
id: &str,
conn: &mut SqliteConnection,
workspace_id: &str,
) -> Result<WorkspaceSettingsTable, FlowyError> {
let setting = dsl::workspace_setting_table
.filter(workspace_setting_table::id.eq(id))
.filter(workspace_setting_table::id.eq(workspace_id))
.first::<WorkspaceSettingsTable>(conn)?;
Ok(setting)
}

View file

@ -1,13 +1,13 @@
use client_api::entity::billing_dto::SubscriptionPlan;
use std::sync::Weak;
use strum_macros::Display;
use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
use flowy_error::FlowyResult;
use flowy_user_pub::cloud::UserCloudConfig;
use flowy_user_pub::entities::*;
use lib_dispatch::prelude::*;
use lib_infra::async_trait::async_trait;
use std::sync::Weak;
use strum_macros::Display;
use uuid::Uuid;
use crate::event_handler::*;
use crate::user_manager::UserManager;
@ -323,6 +323,7 @@ pub trait UserStatusCallback: Send + Sync + 'static {
async fn on_workspace_opened(
&self,
_user_id: i64,
_workspace_id: &Uuid,
_user_workspace: &UserWorkspace,
_auth_type: &AuthType,
) -> FlowyResult<()> {

View file

@ -103,7 +103,7 @@ pub(crate) fn prepare_import(
);
let imported_user = select_user_profile(
imported_session.user_id,
imported_sqlite_db.get_connection()?,
&mut *imported_sqlite_db.get_connection()?,
)?;
run_collab_data_migration(

View file

@ -126,8 +126,8 @@ impl UserDB {
pool: &Arc<ConnectionPool>,
uid: i64,
) -> Result<UserProfile, FlowyError> {
let conn = pool.get()?;
let profile = select_user_profile(uid, conn)?;
let mut conn = pool.get()?;
let profile = select_user_profile(uid, &mut conn)?;
Ok(profile)
}

View file

@ -1,7 +1,7 @@
use client_api::entity::GotrueTokenResponse;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use flowy_error::{internal_error, FlowyResult};
use flowy_error::FlowyResult;
use arc_swap::ArcSwapOption;
use collab::lock::RwLock;
@ -506,8 +506,12 @@ impl UserManager {
self.db_connection(session.user_id)?,
changeset,
)?;
self
.cloud_service
.get_user_service()?
.update_user(params)
.await?;
self.update_user(params).await?;
Ok(())
}
@ -539,7 +543,8 @@ impl UserManager {
/// Fetches the user profile for the given user ID.
pub async fn get_user_profile_from_disk(&self, uid: i64) -> Result<UserProfile, FlowyError> {
select_user_profile(uid, self.db_connection(uid)?)
let mut conn = self.db_connection(uid)?;
select_user_profile(uid, &mut conn)
}
#[tracing::instrument(level = "info", skip_all, err)]
@ -625,14 +630,6 @@ impl UserManager {
Ok(None)
}
async fn update_user(&self, params: UpdateUserProfileParams) -> Result<(), FlowyError> {
let server = self.cloud_service.get_user_service()?;
tokio::spawn(async move { server.update_user(params).await })
.await
.map_err(internal_error)??;
Ok(())
}
async fn save_user(&self, uid: i64, user: UserTable) -> Result<(), FlowyError> {
let conn = self.db_connection(uid)?;
upsert_user(user, conn)?;
@ -816,7 +813,7 @@ pub fn upsert_user_profile_change(
"Update user profile with changeset: {:?}",
changeset
);
diesel_update_table!(user_table, changeset, &mut *conn);
update_user_profile(&mut conn, changeset)?;
let user: UserProfile = user_table::dsl::user_table
.filter(user_table::id.eq(&uid.to_string()))
.first::<UserTable>(&mut *conn)?

View file

@ -195,7 +195,7 @@ impl UserManager {
.user_status_callback
.read()
.await
.on_workspace_opened(uid, &user_workspace, &user_profile.auth_type)
.on_workspace_opened(uid, workspace_id, &user_workspace, &user_profile.auth_type)
.await
{
error!("Open workspace failed: {:?}", err);
@ -377,7 +377,7 @@ impl UserManager {
let member = self
.cloud_service
.get_user_service()?
.get_workspace_member(workspace_id, uid)
.get_workspace_member(&workspace_id, uid)
.await?;
Ok(member)
}
@ -655,7 +655,7 @@ impl UserManager {
let member = self
.cloud_service
.get_user_service()?
.get_workspace_member_info(workspace_id, uid)
.get_workspace_member(workspace_id, uid)
.await?;
let record = WorkspaceMemberTable {
@ -668,8 +668,8 @@ impl UserManager {
updated_at: Utc::now().naive_utc(),
};
let db = self.authenticate_user.get_sqlite_connection(uid)?;
upsert_workspace_member(db, record)?;
let mut db = self.authenticate_user.get_sqlite_connection(uid)?;
upsert_workspace_member(&mut db, record)?;
Ok(member)
}