chore: remove local model

This commit is contained in:
Nathan 2025-04-21 22:02:06 +08:00
parent 9cd49c2447
commit 4e2990e599
36 changed files with 101 additions and 1049 deletions

View file

@ -1,2 +1,3 @@
pub mod cloud;
pub mod persistence;
pub mod user_service;

View file

@ -0,0 +1,14 @@
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use lib_infra::async_trait::async_trait;
use std::path::PathBuf;
use uuid::Uuid;
#[async_trait]
pub trait AIUserService: Send + Sync + 'static {
fn user_id(&self) -> Result<i64, FlowyError>;
async fn is_local_model(&self) -> FlowyResult<bool>;
fn workspace_id(&self) -> Result<Uuid, FlowyError>;
fn sqlite_connection(&self, uid: i64) -> Result<DBConnection, FlowyError>;
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}

View file

@ -14,7 +14,6 @@ use flowy_ai_pub::cloud::{
};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
use crate::notification::{chat_notification_builder, ChatNotification};
use crate::util::ai_available_models_key;
@ -22,6 +21,7 @@ use collab_integrate::persistence::collab_metadata_sql::{
batch_insert_collab_metadata, batch_select_collab_metadata, AFCollabMetadata,
};
use flowy_ai_pub::cloud::ai_dto::AvailableModel;
use flowy_ai_pub::user_service::AIUserService;
use flowy_storage_pub::storage::StorageService;
use lib_infra::async_trait::async_trait;
use lib_infra::util::timestamp;
@ -33,15 +33,6 @@ use tokio::sync::RwLock;
use tracing::{error, info, instrument, trace};
use uuid::Uuid;
#[async_trait]
pub trait AIUserService: Send + Sync + 'static {
fn user_id(&self) -> Result<i64, FlowyError>;
async fn is_local_model(&self) -> FlowyResult<bool>;
fn workspace_id(&self) -> Result<Uuid, FlowyError>;
fn sqlite_connection(&self, uid: i64) -> Result<DBConnection, FlowyError>;
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}
/// AIExternalService is an interface for external services that AI plugin can interact with.
#[async_trait]
pub trait AIExternalService: Send + Sync + 'static {
@ -450,13 +441,9 @@ impl AIManager {
pub async fn get_available_models(&self, source: String) -> FlowyResult<AvailableModelsPB> {
let is_local_mode = self.user_service.is_local_model().await?;
if is_local_mode {
let mut selected_model = AIModel::default();
let mut models = vec![];
if let Some(local_model) = self.local_ai.get_plugin_chat_model() {
let model = AIModel::local(local_model, "".to_string());
selected_model = model.clone();
models.push(model);
}
let setting = self.local_ai.get_local_ai_setting();
let selected_model = AIModel::local(setting.chat_model_name, "".to_string());
let models = vec![selected_model.clone()];
Ok(AvailableModelsPB {
models: models.into_iter().map(|m| m.into()).collect(),

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{
ChatMessageErrorPB, ChatMessageListPB, ChatMessagePB, PredefinedFormatPB,
RepeatedRelatedQuestionPB, StreamMessageParams,
@ -14,6 +13,7 @@ use flowy_ai_pub::persistence::{
select_answer_where_match_reply_message_id, select_chat_messages, upsert_chat_messages,
ChatMessageTable,
};
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use futures::{SinkExt, StreamExt};

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{CompleteTextPB, CompleteTextTaskPB, CompletionTypePB};
use allo_isolate::Isolate;
use std::str::FromStr;
@ -14,6 +13,7 @@ use futures::{SinkExt, StreamExt};
use lib_infra::isolate_stream::IsolateSink;
use crate::stream_message::StreamMessage;
use flowy_ai_pub::user_service::AIUserService;
use std::sync::{Arc, Weak};
use tokio::select;
use tracing::{error, info};

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{LocalAIPB, RunningStatePB};
use crate::local_ai::resource::{LLMResourceService, LocalAIResourceController};
use crate::notification::{
@ -17,6 +16,7 @@ use af_local_ai::ollama_plugin::OllamaAIPlugin;
use af_plugin::core::path::is_plugin_ready;
use af_plugin::core::plugin::RunningState;
use arc_swap::ArcSwapOption;
use flowy_ai_pub::user_service::AIUserService;
use futures_util::SinkExt;
use lib_infra::util::get_operating_system;
use serde::{Deserialize, Serialize};

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::local_ai::controller::LocalAISetting;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use lib_infra::async_trait::async_trait;
@ -11,6 +10,7 @@ use crate::notification::{
};
use af_local_ai::ollama_plugin::OllamaPluginConfig;
use af_plugin::core::path::{is_plugin_ready, ollama_plugin_path};
use flowy_ai_pub::user_service::AIUserService;
use lib_infra::util::{get_operating_system, OperatingSystem};
use reqwest::Client;
use serde::Deserialize;

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{ChatStatePB, ModelTypePB};
use crate::local_ai::controller::LocalAIController;
use crate::notification::{
@ -19,6 +18,7 @@ use futures::{stream, StreamExt, TryStreamExt};
use lib_infra::async_trait::async_trait;
use crate::local_ai::stream_util::QuestionStream;
use flowy_ai_pub::user_service::AIUserService;
use flowy_storage_pub::storage::StorageService;
use serde_json::{json, Value};
use std::path::Path;

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use flowy_ai_pub::cloud::{
AIModel, ChatCloudService, ChatMessage, ChatMessageType, ChatSettings, CompleteTextParams,
MessageCursor, ModelList, RepeatedChatMessage, RepeatedRelatedQuestion, ResponseFormat,
@ -8,6 +7,7 @@ use flowy_ai_pub::persistence::{
update_chat_is_sync, update_chat_message_is_sync, upsert_chat, upsert_chat_messages,
ChatMessageTable, ChatTable,
};
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::FlowyError;
use lib_infra::async_trait::async_trait;
use serde_json::Value;

View file

@ -5,9 +5,10 @@ use collab::preclude::{Collab, StateVector};
use collab::util::is_change_since_sv;
use collab_entity::CollabType;
use collab_integrate::persistence::collab_metadata_sql::AFCollabMetadata;
use flowy_ai::ai_manager::{AIExternalService, AIManager, AIUserService};
use flowy_ai::ai_manager::{AIExternalService, AIManager};
use flowy_ai::local_ai::controller::LocalAIController;
use flowy_ai_pub::cloud::ChatCloudService;
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder::ViewLayout;
use flowy_folder_pub::cloud::{FolderCloudService, FullSyncCollabParams};
@ -153,7 +154,7 @@ impl AIExternalService for ChatQueryServiceImpl {
}
}
struct ChatUserServiceImpl(Weak<AuthenticateUser>);
pub struct ChatUserServiceImpl(Weak<AuthenticateUser>);
impl ChatUserServiceImpl {
fn upgrade_user(&self) -> Result<Arc<AuthenticateUser>, FlowyError> {
let user = self

View file

@ -5,10 +5,8 @@ use dashmap::mapref::one::Ref;
use dashmap::DashMap;
use flowy_ai::local_ai::controller::LocalAIController;
use flowy_error::{FlowyError, FlowyResult};
use flowy_server::af_cloud::{
define::{AIUserServiceImpl, LoggedUser},
AppFlowyCloudServer,
};
use flowy_server::af_cloud::define::AIUserServiceImpl;
use flowy_server::af_cloud::{define::LoggedUser, AppFlowyCloudServer};
use flowy_server::local_server::LocalServer;
use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
use flowy_server_pub::AuthenticatorType;
@ -117,12 +115,14 @@ impl ServerProvider {
.cloud_config
.clone()
.ok_or_else(|| FlowyError::internal().with_context("Missing cloud config"))?;
let ai_user_service = Arc::new(AIUserServiceImpl(Arc::downgrade(&self.logged_user)));
Arc::new(AppFlowyCloudServer::new(
cfg,
self.user_enable_sync.load(Ordering::Acquire),
self.config.device_id.clone(),
self.config.app_version.clone(),
Arc::downgrade(&self.logged_user),
ai_user_service,
))
},
};

View file

@ -1,5 +1,5 @@
use collab_plugins::CollabKVDB;
use flowy_ai::ai_manager::AIUserService;
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use lib_infra::async_trait::async_trait;
@ -28,6 +28,7 @@ pub trait LoggedUser: Send + Sync {
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}
//
pub struct AIUserServiceImpl(pub Weak<dyn LoggedUser>);
impl AIUserServiceImpl {

View file

@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use crate::af_cloud::define::{AIUserServiceImpl, LoggedUser};
use crate::af_cloud::define::LoggedUser;
use anyhow::Error;
use arc_swap::ArcSwap;
use client_api::collab_sync::ServerCollabMessage;
@ -28,7 +28,9 @@ use crate::af_cloud::impls::{
AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl, AFCloudFileStorageServiceImpl,
AFCloudFolderCloudServiceImpl, AFCloudUserAuthServiceImpl, CloudChatServiceImpl,
};
use crate::AppFlowyServer;
use flowy_ai::offline::offline_message_sync::AutoSyncChatService;
use flowy_ai_pub::user_service::AIUserService;
use rand::Rng;
use semver::Version;
use tokio::select;
@ -39,8 +41,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::AppFlowyServer;
use super::impls::AFCloudSearchCloudServiceImpl;
pub(crate) type AFCloudClient = Client;
@ -54,6 +54,7 @@ pub struct AppFlowyCloudServer {
pub device_id: String,
ws_client: Arc<WSClient>,
logged_user: Weak<dyn LoggedUser>,
ai_user_service: Arc<dyn AIUserService>,
}
impl AppFlowyCloudServer {
@ -63,6 +64,7 @@ impl AppFlowyCloudServer {
mut device_id: String,
client_version: Version,
logged_user: Weak<dyn LoggedUser>,
ai_user_service: Arc<dyn AIUserService>,
) -> Self {
// The device id can't be empty, so we generate a new one if it is.
if device_id.is_empty() {
@ -101,6 +103,7 @@ impl AppFlowyCloudServer {
device_id,
ws_client,
logged_user,
ai_user_service,
}
}
@ -222,7 +225,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
Arc::new(CloudChatServiceImpl {
inner: self.get_server_impl(),
}),
Arc::new(AIUserServiceImpl(self.logged_user.clone())),
self.ai_user_service.clone(),
))
}

View file

@ -1,2 +0,0 @@
mod user_test;
mod util;

View file

@ -1,21 +0,0 @@
use flowy_server::AppFlowyServer;
use flowy_user_pub::entities::AuthResponse;
use lib_infra::box_any::BoxAny;
use crate::af_cloud_test::util::{
af_cloud_server, af_cloud_sign_up_param, generate_test_email, get_af_cloud_config,
};
#[tokio::test]
async fn sign_up_test() {
if let Some(config) = get_af_cloud_config() {
let server = af_cloud_server(config.clone());
let user_service = server.user_service();
let email = generate_test_email();
let params = af_cloud_sign_up_param(&email, &config).await;
let resp: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert_eq!(resp.email.unwrap(), email);
assert!(resp.is_new_user);
assert_eq!(resp.user_workspaces.len(), 1);
}
}

View file

@ -1,119 +0,0 @@
use client_api::ClientConfiguration;
use collab_plugins::CollabKVDB;
use flowy_error::{FlowyError, FlowyResult};
use semver::Version;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use uuid::Uuid;
use crate::setup_log;
use flowy_server::af_cloud::define::LoggedUser;
use flowy_server::af_cloud::AppFlowyCloudServer;
use flowy_server_pub::af_cloud_config::AFCloudConfiguration;
use flowy_sqlite::DBConnection;
use lib_infra::async_trait::async_trait;
/// To run the test, create a .env.ci file in the 'flowy-server' directory and set the following environment variables:
///
/// - `APPFLOWY_CLOUD_BASE_URL=http://localhost:8000`
/// - `APPFLOWY_CLOUD_WS_BASE_URL=ws://localhost:8000/ws`
/// - `APPFLOWY_CLOUD_GOTRUE_URL=http://localhost:9998`
///
/// - `GOTRUE_ADMIN_EMAIL=admin@example.com`
/// - `GOTRUE_ADMIN_PASSWORD=password`
pub fn get_af_cloud_config() -> Option<AFCloudConfiguration> {
dotenv::from_filename("./.env.ci").ok()?;
setup_log();
AFCloudConfiguration::from_env().ok()
}
pub fn af_cloud_server(config: AFCloudConfiguration) -> Arc<AppFlowyCloudServer> {
let fake_device_id = uuid::Uuid::new_v4().to_string();
let logged_user = Arc::new(FakeServerUserImpl) as Arc<dyn LoggedUser>;
Arc::new(AppFlowyCloudServer::new(
config,
true,
fake_device_id,
Version::new(0, 5, 8),
// do nothing, just for test
Arc::downgrade(&logged_user),
))
}
struct FakeServerUserImpl;
#[async_trait]
impl LoggedUser for FakeServerUserImpl {
fn workspace_id(&self) -> FlowyResult<Uuid> {
todo!()
}
fn user_id(&self) -> FlowyResult<i64> {
todo!()
}
async fn is_local_mode(&self) -> FlowyResult<bool> {
Ok(true)
}
fn get_sqlite_db(&self, _uid: i64) -> Result<DBConnection, FlowyError> {
todo!()
}
fn get_collab_db(&self, _uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
todo!()
}
fn application_root_dir(&self) -> Result<PathBuf, FlowyError> {
todo!()
}
}
pub async fn generate_sign_in_url(user_email: &str, config: &AFCloudConfiguration) -> String {
let client = client_api::Client::new(
&config.base_url,
&config.ws_base_url,
&config.gotrue_url,
"fake_device_id",
ClientConfiguration::default(),
"test",
);
let admin_email = std::env::var("GOTRUE_ADMIN_EMAIL").unwrap();
let admin_password = std::env::var("GOTRUE_ADMIN_PASSWORD").unwrap();
let admin_client = client_api::Client::new(
client.base_url(),
client.ws_addr(),
client.gotrue_url(),
"fake_device_id",
ClientConfiguration::default(),
&client.client_version.to_string(),
);
admin_client
.sign_in_password(&admin_email, &admin_password)
.await
.unwrap();
let action_link = admin_client
.generate_sign_in_action_link(user_email)
.await
.unwrap();
client.extract_sign_in_url(&action_link).await.unwrap()
}
pub async fn af_cloud_sign_up_param(
email: &str,
config: &AFCloudConfiguration,
) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert(
"sign_in_url".to_string(),
generate_sign_in_url(email, config).await,
);
params.insert("device_id".to_string(), Uuid::new_v4().to_string());
params
}
pub fn generate_test_email() -> String {
format!("{}@test.com", Uuid::new_v4())
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

View file

@ -1,24 +0,0 @@
use std::sync::Once;
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
mod af_cloud_test;
// mod supabase_test;
pub fn setup_log() {
static START: Once = Once::new();
START.call_once(|| {
let level = "trace";
let mut filters = vec![];
filters.push(format!("flowy_server={}", level));
std::env::set_var("RUST_LOG", filters.join(","));
let subscriber = Subscriber::builder()
.with_env_filter(EnvFilter::from_default_env())
.with_ansi(true)
.finish();
subscriber.try_init().unwrap();
});
}

View file

@ -1,63 +0,0 @@
use collab::core::collab::DataSource;
use collab_entity::{CollabObject, CollabType};
use uuid::Uuid;
use flowy_user_pub::entities::AuthResponse;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
collab_service, database_service, get_supabase_ci_config, third_party_sign_up_param,
user_auth_service,
};
#[tokio::test]
async fn supabase_create_database_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_service = collab_service();
let database_service = database_service();
let mut row_ids = vec![];
for _i in 0..3 {
let row_id = uuid::Uuid::new_v4().to_string();
row_ids.push(row_id.clone());
let collab_object = CollabObject::new(
user.user_id,
row_id,
CollabType::DatabaseRow,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
collab_service
.send_update(&collab_object, 0, vec![1, 2, 3])
.await
.unwrap();
collab_service
.send_update(&collab_object, 0, vec![4, 5, 6])
.await
.unwrap();
}
let updates_by_oid = database_service
.batch_get_database_object_doc_state(row_ids, CollabType::DatabaseRow, "fake_workspace_id")
.await
.unwrap();
assert_eq!(updates_by_oid.len(), 3);
for (_, source) in updates_by_oid {
match source {
DataSource::Disk => panic!("should not be from disk"),
DataSource::DocStateV1(doc_state) => {
assert_eq!(doc_state.len(), 2);
},
DataSource::DocStateV2(_) => {},
}
}
}

View file

@ -1,78 +0,0 @@
// use url::Url;
// use uuid::Uuid;
//
// use flowy_storage::StorageObject;
//
// use crate::supabase_test::util::{file_storage_service, get_supabase_ci_config};
//
// #[tokio::test]
// async fn supabase_get_object_test() {
// if get_supabase_ci_config().is_none() {
// return;
// }
//
// let service = file_storage_service();
// let file_name = format!("test-{}.txt", Uuid::new_v4());
// let object = StorageObject::from_file("1", &file_name, "tests/test.txt");
//
// // Upload a file
// let url = service
// .create_object(object)
// .await
// .unwrap()
// .parse::<Url>()
// .unwrap();
//
// // The url would be something like:
// // https://acfrqdbdtbsceyjbxsfc.supabase.co/storage/v1/object/data/test-1693472809.txt
// let name = url.path_segments().unwrap().last().unwrap();
// assert_eq!(name, &file_name);
//
// // Download the file
// let bytes = service.get_object(url.to_string()).await.unwrap();
// let s = String::from_utf8(bytes.to_vec()).unwrap();
// assert_eq!(s, "hello world");
// }
//
// #[tokio::test]
// async fn supabase_upload_image_test() {
// if get_supabase_ci_config().is_none() {
// return;
// }
//
// let service = file_storage_service();
// let file_name = format!("image-{}.png", Uuid::new_v4());
// let object = StorageObject::from_file("1", &file_name, "tests/logo.png");
//
// // Upload a file
// let url = service
// .create_object(object)
// .await
// .unwrap()
// .parse::<Url>()
// .unwrap();
//
// // Download object by url
// let bytes = service.get_object(url.to_string()).await.unwrap();
// assert_eq!(bytes.len(), 15694);
// }
//
// #[tokio::test]
// async fn supabase_delete_object_test() {
// if get_supabase_ci_config().is_none() {
// return;
// }
//
// let service = file_storage_service();
// let file_name = format!("test-{}.txt", Uuid::new_v4());
// let object = StorageObject::from_file("1", &file_name, "tests/test.txt");
// let url = service.create_object(object).await.unwrap();
//
// let result = service.get_object(url.clone()).await;
// assert!(result.is_ok());
//
// let _ = service.delete_object(url.clone()).await;
//
// let result = service.get_object(url.clone()).await;
// assert!(result.is_err());
// }

View file

@ -1,316 +0,0 @@
use assert_json_diff::assert_json_eq;
use collab_entity::{CollabObject, CollabType};
use serde_json::json;
use uuid::Uuid;
use yrs::types::ToJson;
use yrs::updates::decoder::Decode;
use yrs::{merge_updates_v1, Array, Doc, Map, MapPrelim, ReadTxn, StateVector, Transact, Update};
use flowy_user_pub::entities::AuthResponse;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
collab_service, folder_service, get_supabase_ci_config, third_party_sign_up_param,
user_auth_service,
};
#[tokio::test]
async fn supabase_create_workspace_test() {
if get_supabase_ci_config().is_none() {
return;
}
let service = folder_service();
// will replace the uid with the real uid
let workspace = service.create_workspace(1, "test").await.unwrap();
dbg!(workspace);
}
#[tokio::test]
async fn supabase_get_folder_test() {
if get_supabase_ci_config().is_none() {
return;
}
let folder_service = folder_service();
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject::new(
user.user_id,
user.latest_workspace.id.clone(),
CollabType::Folder,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
let doc = Doc::with_client_id(1);
let map = { doc.get_or_insert_map("map") };
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "1", "a");
collab_service
.send_update(&collab_object, 0, txn.encode_update_v1())
.await
.unwrap();
};
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "2", "b");
collab_service
.send_update(&collab_object, 1, txn.encode_update_v1())
.await
.unwrap();
};
// let updates = collab_service.get_all_updates(&collab_object).await.unwrap();
let updates = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
assert_eq!(updates.len(), 2);
for _ in 0..5 {
collab_service
.send_init_sync(&collab_object, 3, vec![])
.await
.unwrap();
}
let updates = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
// Other the init sync, try to get the updates from the server.
let expected_update = doc
.transact_mut()
.encode_state_as_update_v1(&StateVector::default());
// check the update is the same as local document update.
assert_eq!(updates, expected_update);
}
/// This async test function checks the behavior of updates duplication in Supabase.
/// It creates a new user and simulates two updates to the user's workspace with different values.
/// Then, it merges these updates and sends an initial synchronization request to test duplication handling.
/// Finally, it asserts that the duplicated updates don't affect the overall data consistency in Supabase.
#[tokio::test]
async fn supabase_duplicate_updates_test() {
if get_supabase_ci_config().is_none() {
return;
}
let folder_service = folder_service();
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject::new(
user.user_id,
user.latest_workspace.id.clone(),
CollabType::Folder,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
let doc = Doc::with_client_id(1);
let map = { doc.get_or_insert_map("map") };
let mut duplicated_updates = vec![];
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "1", "a");
let update = txn.encode_update_v1();
duplicated_updates.push(update.clone());
collab_service
.send_update(&collab_object, 0, update)
.await
.unwrap();
};
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "2", "b");
let update = txn.encode_update_v1();
duplicated_updates.push(update.clone());
collab_service
.send_update(&collab_object, 1, update)
.await
.unwrap();
};
// send init sync
collab_service
.send_init_sync(&collab_object, 3, vec![])
.await
.unwrap();
let first_init_sync_update = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
// simulate the duplicated updates.
let merged_update = merge_updates_v1(
&duplicated_updates
.iter()
.map(|update| update.as_ref())
.collect::<Vec<&[u8]>>(),
)
.unwrap();
collab_service
.send_init_sync(&collab_object, 4, merged_update)
.await
.unwrap();
let second_init_sync_update = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
let doc_2 = Doc::new();
assert_eq!(first_init_sync_update.len(), second_init_sync_update.len());
let map = { doc_2.get_or_insert_map("map") };
{
let mut txn = doc_2.transact_mut();
let update = Update::decode_v1(&second_init_sync_update).unwrap();
txn.apply_update(update).unwrap();
}
{
let txn = doc_2.transact();
let json = map.to_json(&txn);
assert_json_eq!(
json,
json!({
"1": "a",
"2": "b"
})
);
}
}
/// The state vector of doc;
/// ```json
/// "map": {},
/// "array": []
/// ```
/// The old version of doc:
/// ```json
/// "map": {}
/// ```
///
/// Try to apply the updates from doc to old version doc and check the result.
#[tokio::test]
async fn supabase_diff_state_vector_test() {
if get_supabase_ci_config().is_none() {
return;
}
let folder_service = folder_service();
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject::new(
user.user_id,
user.latest_workspace.id.clone(),
CollabType::Folder,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
let doc = Doc::with_client_id(1);
let map = { doc.get_or_insert_map("map") };
let array = { doc.get_or_insert_array("array") };
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "1", "a");
map.insert(&mut txn, "inner_map", MapPrelim::<String>::new());
array.push_back(&mut txn, "element 1");
let update = txn.encode_update_v1();
collab_service
.send_update(&collab_object, 0, update)
.await
.unwrap();
};
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "2", "b");
array.push_back(&mut txn, "element 2");
let update = txn.encode_update_v1();
collab_service
.send_update(&collab_object, 1, update)
.await
.unwrap();
};
// restore the doc with given updates.
let old_version_doc = Doc::new();
let map = { old_version_doc.get_or_insert_map("map") };
let doc_state = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
{
let mut txn = old_version_doc.transact_mut();
let update = Update::decode_v1(&doc_state).unwrap();
txn.apply_update(update).unwrap();
}
let txn = old_version_doc.transact();
let json = map.to_json(&txn);
assert_json_eq!(
json,
json!({
"1": "a",
"2": "b",
"inner_map": {}
})
);
}
// #[tokio::test]
// async fn print_folder_object_test() {
// if get_supabase_dev_config().is_none() {
// return;
// }
// let secret = Some("43bSxEPHeNkk5ZxxEYOfAjjd7sK2DJ$vVnxwuNc5ru0iKFvhs8wLg==".to_string());
// print_encryption_folder("f8b14b84-e8ec-4cf4-a318-c1e008ecfdfa", secret).await;
// }
//
// #[tokio::test]
// async fn print_folder_snapshot_object_test() {
// if get_supabase_dev_config().is_none() {
// return;
// }
// let secret = Some("NTXRXrDSybqFEm32jwMBDzbxvCtgjU$8np3TGywbBdJAzHtu1QIyQ==".to_string());
// // let secret = None;
// print_encryption_folder_snapshot("12533251-bdd4-41f4-995f-ff12fceeaa42", secret).await;
// }

View file

@ -1,5 +0,0 @@
mod database_test;
mod file_test;
mod folder_test;
mod user_test;
mod util;

View file

@ -1,141 +0,0 @@
use uuid::Uuid;
use flowy_encrypt::{encrypt_text, generate_encryption_secret};
use flowy_error::FlowyError;
use flowy_user_pub::entities::*;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
get_supabase_ci_config, third_party_sign_up_param, user_auth_service,
};
// ‼️‼️‼️ Warning: this test will create a table in the database
#[tokio::test]
async fn supabase_user_sign_up_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert!(!user.latest_workspace.id.is_empty());
assert!(!user.user_workspaces.is_empty());
assert!(!user.latest_workspace.database_indexer_id.is_empty());
}
#[tokio::test]
async fn supabase_user_sign_up_with_existing_uuid_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let _user: AuthResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
.unwrap();
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert!(!user.latest_workspace.id.is_empty());
assert!(!user.latest_workspace.database_indexer_id.is_empty());
assert!(!user.user_workspaces.is_empty());
}
#[tokio::test]
async fn supabase_update_user_profile_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
.unwrap();
let params = UpdateUserProfileParams::new(user.user_id)
.with_name("123")
.with_email(format!("{}@test.com", Uuid::new_v4()));
user_service
.update_user(UserCredentials::from_uid(user.user_id), params)
.await
.unwrap();
let user_profile = user_service
.get_user_profile(UserCredentials::from_uid(user.user_id))
.await
.unwrap();
assert_eq!(user_profile.name, "123");
}
#[tokio::test]
async fn supabase_get_user_profile_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
.unwrap();
let credential = UserCredentials::from_uid(user.user_id);
user_service
.get_user_profile(credential.clone())
.await
.unwrap();
}
#[tokio::test]
async fn supabase_get_not_exist_user_profile_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let result: FlowyError = user_service
.get_user_profile(UserCredentials::from_uid(i64::MAX))
.await
.unwrap_err();
// user not found
assert!(result.is_record_not_found());
}
#[tokio::test]
async fn user_encryption_sign_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
// generate encryption sign
let secret = generate_encryption_secret();
let sign = encrypt_text(user.user_id.to_string(), &secret).unwrap();
user_service
.update_user(
UserCredentials::from_uid(user.user_id),
UpdateUserProfileParams::new(user.user_id)
.with_encryption_type(EncryptionType::SelfEncryption(sign.clone())),
)
.await
.unwrap();
let user_profile: UserProfile = user_service
.get_user_profile(UserCredentials::from_uid(user.user_id))
.await
.unwrap();
assert_eq!(
user_profile.encryption_type,
EncryptionType::SelfEncryption(sign)
);
}

View file

@ -1,162 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use collab::core::collab::{DataSource, MutexCollab};
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use uuid::Uuid;
use flowy_database_pub::cloud::DatabaseCloudService;
use flowy_error::FlowyError;
use flowy_folder_pub::cloud::{Folder, FolderCloudService};
use flowy_server::supabase::api::{
RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
};
use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID};
use flowy_server::{AppFlowyEncryption, EncryptionImpl};
use flowy_server_pub::supabase_config::SupabaseConfiguration;
use flowy_user_pub::cloud::UserCloudService;
use lib_infra::future::FutureResult;
use crate::setup_log;
pub fn get_supabase_ci_config() -> Option<SupabaseConfiguration> {
dotenv::from_filename("./.env.ci").ok()?;
setup_log();
SupabaseConfiguration::from_env().ok()
}
#[allow(dead_code)]
pub fn get_supabase_dev_config() -> Option<SupabaseConfiguration> {
dotenv::from_filename("./.env.dev").ok()?;
setup_log();
SupabaseConfiguration::from_env().ok()
}
pub fn collab_service() -> Arc<dyn RemoteCollabStorage> {
let (server, encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseCollabStorageImpl::new(
server,
None,
Arc::downgrade(&encryption_impl),
))
}
pub fn database_service() -> Arc<dyn DatabaseCloudService> {
let (server, _encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseDatabaseServiceImpl::new(server))
}
pub fn user_auth_service() -> Arc<dyn UserCloudService> {
let (server, _encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseUserServiceImpl::new(server, vec![], None))
}
pub fn folder_service() -> Arc<dyn FolderCloudService> {
let (server, _encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseFolderServiceImpl::new(server))
}
#[allow(dead_code)]
pub fn file_storage_service() -> Arc<dyn ObjectStorageCloudService> {
let encryption_impl: Arc<dyn AppFlowyEncryption> = Arc::new(EncryptionImpl::new(None));
let config = SupabaseConfiguration::from_env().unwrap();
Arc::new(
SupabaseFileStorage::new(
&config,
Arc::downgrade(&encryption_impl),
Arc::new(TestFileStoragePlan),
)
.unwrap(),
)
}
#[allow(dead_code)]
pub fn encryption_folder_service(
secret: Option<String>,
) -> (Arc<dyn FolderCloudService>, Arc<dyn AppFlowyEncryption>) {
let (server, encryption_impl) = supabase_server_service(secret);
let service = Arc::new(SupabaseFolderServiceImpl::new(server));
(service, encryption_impl)
}
#[allow(dead_code)]
pub fn encryption_collab_service(
secret: Option<String>,
) -> (Arc<dyn RemoteCollabStorage>, Arc<dyn AppFlowyEncryption>) {
let (server, encryption_impl) = supabase_server_service(secret);
let service = Arc::new(SupabaseCollabStorageImpl::new(
server,
None,
Arc::downgrade(&encryption_impl),
));
(service, encryption_impl)
}
#[allow(dead_code)]
pub async fn print_encryption_folder(
uid: &i64,
folder_id: &str,
encryption_secret: Option<String>,
) {
let (cloud_service, _encryption) = encryption_folder_service(encryption_secret);
let folder_data = cloud_service.get_folder_data(folder_id, uid).await.unwrap();
let json = serde_json::to_value(folder_data).unwrap();
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
#[allow(dead_code)]
pub async fn print_encryption_folder_snapshot(
uid: &i64,
folder_id: &str,
encryption_secret: Option<String>,
) {
let (cloud_service, _encryption) = encryption_collab_service(encryption_secret);
let snapshot = cloud_service
.get_snapshots(folder_id, 1)
.await
.pop()
.unwrap();
let collab = Arc::new(MutexCollab::new(
Collab::new_with_source(
CollabOrigin::Empty,
folder_id,
DataSource::DocStateV1(snapshot.blob),
vec![],
false,
)
.unwrap(),
));
let folder_data = Folder::open(uid, collab, None)
.unwrap()
.get_folder_data(folder_id)
.unwrap();
let json = serde_json::to_value(folder_data).unwrap();
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
pub fn supabase_server_service(
encryption_secret: Option<String>,
) -> (SupabaseServerServiceImpl, Arc<dyn AppFlowyEncryption>) {
let config = SupabaseConfiguration::from_env().unwrap();
let encryption_impl: Arc<dyn AppFlowyEncryption> =
Arc::new(EncryptionImpl::new(encryption_secret));
let encryption = Arc::downgrade(&encryption_impl);
let server = Arc::new(RESTfulPostgresServer::new(config, encryption));
(SupabaseServerServiceImpl::new(server), encryption_impl)
}
pub fn third_party_sign_up_param(uuid: String) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert(USER_UUID.to_string(), uuid);
params.insert(
USER_EMAIL.to_string(),
format!("{}@test.com", Uuid::new_v4()),
);
params.insert(USER_DEVICE_ID.to_string(), Uuid::new_v4().to_string());
params
}
pub struct TestFileStoragePlan;

View file

@ -1 +0,0 @@
hello world

View file

@ -6,7 +6,7 @@ use crate::sql::{
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::schema::user_table;
use flowy_sqlite::{prelude::*, DBConnection, ExpressionMethods, RunQueryDsl};
use tracing::{trace, warn};
use tracing::trace;
/// The order of the fields in the struct must be the same as the order of the fields in the table.
/// Check out the [schema.rs] for table schema.
@ -157,26 +157,12 @@ pub fn select_user_profile(
Ok(user)
}
pub fn select_workspace_auth_type(
pub fn select_user_auth_type(
uid: i64,
workspace_id: &str,
conn: &mut SqliteConnection,
) -> Result<AuthType, FlowyError> {
match select_user_workspace(workspace_id, conn) {
Ok(workspace) => Ok(AuthType::from(workspace.workspace_type)),
Err(err) => {
if err.is_record_not_found() {
let row = select_user_table_row(uid, conn)?;
warn!(
"user user auth type:{} as workspace auth type",
row.auth_type
);
Ok(AuthType::from(row.auth_type))
} else {
Err(err)
}
},
}
let row = select_user_table_row(uid, conn)?;
Ok(AuthType::from(row.auth_type))
}
pub fn upsert_user(user: UserTable, mut conn: DBConnection) -> FlowyResult<()> {

View file

@ -1,7 +1,7 @@
use diesel::SqliteConnection;
use semver::Version;
use std::sync::Arc;
use tracing::{info, instrument};
use tracing::instrument;
use collab_integrate::CollabKVDB;
use flowy_error::FlowyResult;
@ -9,7 +9,7 @@ use flowy_user_pub::entities::AuthType;
use crate::migrations::migration::UserDataMigration;
use flowy_user_pub::session::Session;
use flowy_user_pub::sql::{select_user_workspace, upsert_user_workspace};
use flowy_user_pub::sql::upsert_user_workspace;
pub struct AnonUserWorkspaceTableMigration;
@ -34,23 +34,15 @@ impl UserDataMigration for AnonUserWorkspaceTableMigration {
&self,
session: &Session,
_collab_db: &Arc<CollabKVDB>,
auth_type: &AuthType,
user_auth_type: &AuthType,
db: &mut SqliteConnection,
) -> FlowyResult<()> {
// For historical reason, anon user doesn't have a workspace in user_workspace_table.
// So we need to create a new entry for the anon user in the user_workspace_table.
if matches!(auth_type, AuthType::Local) {
let user_workspace = &session.user_workspace;
let result = select_user_workspace(&user_workspace.id, db);
if let Err(e) = result {
if e.is_record_not_found() {
info!(
"Anon user workspace not found in the database, creating a new entry for user_id: {}",
session.user_id
);
upsert_user_workspace(session.user_id, *auth_type, user_workspace.clone(), db)?;
}
}
if matches!(user_auth_type, AuthType::Local) {
let mut user_workspace = session.user_workspace.clone();
user_workspace.workspace_type = AuthType::Local;
upsert_user_workspace(session.user_id, *user_auth_type, user_workspace, db)?;
}
Ok(())

View file

@ -40,7 +40,7 @@ impl UserDataMigration for CollabDocKeyWithWorkspaceIdMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
_authenticator: &AuthType,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
trace!(

View file

@ -42,13 +42,13 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
authenticator: &AuthType,
user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
// - The `empty document` struct has already undergone refactoring prior to the launch of the AppFlowy cloud version.
// - Consequently, if a user is utilizing the AppFlowy cloud version, there is no need to perform any migration for the `empty document` struct.
// - This migration step is only necessary for users who are transitioning from a local version of AppFlowy to the cloud version.
if !matches!(authenticator, AuthType::Local) {
if !matches!(user_auth_type, AuthType::Local) {
return Ok(());
}
collab_db.with_write_txn(|write_txn| {

View file

@ -54,7 +54,7 @@ impl UserLocalDataMigration {
pub fn run(
self,
migrations: Vec<Box<dyn UserDataMigration>>,
auth_type: &AuthType,
user_auth_type: &AuthType,
app_version: &Version,
) -> FlowyResult<Vec<String>> {
let mut applied_migrations = vec![];
@ -75,7 +75,7 @@ impl UserLocalDataMigration {
let migration_name = migration.name().to_string();
if !duplicated_names.contains(&migration_name) {
migration.run(&self.session, &self.collab_db, auth_type, &mut conn)?;
migration.run(&self.session, &self.collab_db, user_auth_type, &mut conn)?;
applied_migrations.push(migration.name().to_string());
save_migration_record(&mut conn, &migration_name);
duplicated_names.push(migration_name);
@ -98,7 +98,7 @@ pub trait UserDataMigration {
&self,
user: &Session,
collab_db: &Arc<CollabKVDB>,
authenticator: &AuthType,
user_auth_type: &AuthType,
db: &mut SqliteConnection,
) -> FlowyResult<()>;
}

View file

@ -40,7 +40,7 @@ impl UserDataMigration for FavoriteV1AndWorkspaceArrayMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
_authenticator: &AuthType,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
collab_db.with_write_txn(|write_txn| {

View file

@ -38,7 +38,7 @@ impl UserDataMigration for WorkspaceTrashMapToSectionMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
_authenticator: &AuthType,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
collab_db.with_write_txn(|write_txn| {

View file

@ -10,7 +10,7 @@ use collab_plugins::local_storage::kv::KVTransactionDB;
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
use flowy_user_pub::entities::UserWorkspace;
use flowy_user_pub::entities::{AuthType, UserWorkspace};
use flowy_user_pub::session::Session;
use std::path::PathBuf;
use std::str::FromStr;
@ -48,14 +48,11 @@ impl AuthenticateUser {
}
pub async fn is_local_mode(&self) -> FlowyResult<bool> {
let uid = self.user_id()?;
if let Ok(anon_user) = self.get_anon_user().await {
if anon_user == uid {
return Ok(true);
}
}
Ok(false)
let session = self.get_session()?;
Ok(matches!(
session.user_workspace.workspace_type,
AuthType::Local
))
}
pub fn device_id(&self) -> FlowyResult<String> {
@ -150,28 +147,24 @@ impl AuthenticateUser {
match self
.store_preferences
.get_object::<Arc<Session>>(&self.user_config.session_cache_key)
.get_object::<Session>(&self.user_config.session_cache_key)
{
None => Err(FlowyError::new(
ErrorCode::RecordNotFound,
"User is not logged in",
"Can't find user session. Please login again",
)),
Some(session) => {
Some(mut session) => {
// Set the workspace type to local if the user is anon.
if let Some(anon_session) = self.store_preferences.get_object::<Session>(ANON_USER) {
if session.user_id == anon_session.user_id {
session.user_workspace.workspace_type = AuthType::Local;
}
}
let session = Arc::new(session);
self.session.store(Some(session.clone()));
Ok(session)
},
}
}
async fn get_anon_user(&self) -> FlowyResult<i64> {
let anon_session = self
.store_preferences
.get_object::<Session>(ANON_USER)
.ok_or(FlowyError::new(
ErrorCode::RecordNotFound,
"Anon user not found",
))?;
Ok(anon_session.user_id)
}
}

View file

@ -3,7 +3,7 @@ use crate::migrations::session_migration::migrate_session_with_user_uuid;
use crate::services::data_import::importer::load_collab_by_object_ids;
use crate::services::db::UserDBPath;
use crate::services::entities::UserPaths;
use crate::user_manager::run_collab_data_migration;
use crate::user_manager::run_data_migration;
use anyhow::anyhow;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
@ -36,7 +36,7 @@ use std::collections::{HashMap, HashSet};
use collab_document::blocks::TextDelta;
use collab_document::document::Document;
use flowy_user_pub::sql::{select_user_profile, select_workspace_auth_type};
use flowy_user_pub::sql::{select_user_auth_type, select_user_profile};
use semver::Version;
use serde_json::json;
use std::ops::{Deref, DerefMut};
@ -103,23 +103,17 @@ pub(crate) fn prepare_import(
);
let mut conn = imported_sqlite_db.get_connection()?;
let imported_workspace_auth_type = select_user_profile(
let imported_user_auth_type = select_user_profile(
imported_session.user_id,
&imported_session.user_workspace.id,
&mut conn,
)
.map(|v| v.workspace_auth_type)
.or_else(|_| {
select_workspace_auth_type(
imported_session.user_id,
&imported_session.user_workspace.id,
&mut conn,
)
})?;
.map(|v| v.auth_type)
.or_else(|_| select_user_auth_type(imported_session.user_id, &mut conn))?;
run_collab_data_migration(
run_data_migration(
&imported_session,
&imported_workspace_auth_type,
&imported_user_auth_type,
imported_collab_db.clone(),
imported_sqlite_db.get_pool(),
other_store_preferences.clone(),

View file

@ -235,9 +235,9 @@ impl UserManager {
self.authenticate_user.database.get_pool(session.user_id),
) {
(Ok(collab_db), Ok(sqlite_pool)) => {
run_collab_data_migration(
run_data_migration(
&session,
&auth_type,
&user.auth_type,
collab_db,
sqlite_pool,
self.store_preferences.clone(),
@ -844,9 +844,9 @@ fn mark_all_migrations_as_applied(sqlite_pool: &Arc<ConnectionPool>) {
}
}
pub(crate) fn run_collab_data_migration(
pub(crate) fn run_data_migration(
session: &Session,
auth_type: &AuthType,
user_auth_type: &AuthType,
collab_db: Arc<CollabKVDB>,
sqlite_pool: Arc<ConnectionPool>,
kv: Arc<KVStorePreferences>,
@ -855,7 +855,7 @@ pub(crate) fn run_collab_data_migration(
let migrations = collab_migration_list();
match UserLocalDataMigration::new(session.clone(), collab_db, sqlite_pool, kv).run(
migrations,
auth_type,
user_auth_type,
app_version,
) {
Ok(applied_migrations) => {

View file

@ -53,6 +53,18 @@ impl UserManager {
Ok(UserProfilePB::from(profile))
}
pub fn get_anon_user_id(&self) -> FlowyResult<i64> {
let anon_session = self
.store_preferences
.get_object::<Session>(ANON_USER)
.ok_or(FlowyError::new(
ErrorCode::RecordNotFound,
"Anon user not found",
))?;
Ok(anon_session.user_id)
}
/// Opens a historical user's session based on their user ID, device ID, and authentication type.
///
/// This function facilitates the re-opening of a user's session from historical tracking.