Merge pull request #7797 from AppFlowy-IO/integrate_workspace_template

Integrate workspace template
This commit is contained in:
Nathan.fooo 2025-04-22 13:35:03 +08:00 committed by GitHub
commit 7f74543125
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
81 changed files with 522 additions and 2494 deletions

View file

@ -145,7 +145,7 @@ class _MobileHomePageState extends State<MobileHomePage> {
void _onLatestViewChange() async {
final id = getIt<MenuSharedState>().latestOpenView?.id;
if (id == null) {
if (id == null || id.isEmpty) {
return;
}
await FolderEventSetLatestView(ViewIdPB(value: id)).send();

View file

@ -27,6 +27,7 @@ class ChatMemberBloc extends Bloc<ChatMemberEvent, ChatMemberState> {
final payload = WorkspaceMemberIdPB(
uid: Int64.parseInt(userId),
);
await UserEventGetMemberInfo(payload).send().then((result) {
result.fold(
(member) {

View file

@ -36,7 +36,7 @@ class BlankPagePlugin extends Plugin {
PluginWidgetBuilder get widgetBuilder => BlankPagePluginWidgetBuilder();
@override
PluginId get id => "BlankStack";
PluginId get id => "";
@override
PluginType get pluginType => PluginType.blank;

View file

@ -127,7 +127,7 @@ class UserBackendService implements IUserBackendService {
) {
final payload = OpenUserWorkspacePB()
..workspaceId = workspaceId
..authType = authType;
..workspaceAuthType = authType;
return UserEventOpenWorkspace(payload).send();
}
@ -244,13 +244,6 @@ class UserBackendService implements IUserBackendService {
return UserEventGetWorkspaceSubscriptionInfo(params).send();
}
Future<FlowyResult<WorkspaceMemberPB, FlowyError>>
getWorkspaceMember() async {
final data = WorkspaceMemberIdPB.create()..uid = userId;
return UserEventGetMemberInfo(data).send();
}
@override
Future<FlowyResult<PaymentLinkPB, FlowyError>> createSubscription(
String workspaceId,

View file

@ -30,6 +30,10 @@ class LocalAiPluginBloc extends Bloc<LocalAiPluginEvent, LocalAiPluginState> {
LocalAiPluginEvent event,
Emitter<LocalAiPluginState> emit,
) async {
if (isClosed) {
return;
}
await event.when(
didReceiveAiState: (aiState) {
emit(
@ -54,7 +58,9 @@ class LocalAiPluginBloc extends Bloc<LocalAiPluginEvent, LocalAiPluginState> {
emit(LocalAiPluginState.loading());
await AIEventToggleLocalAI().send().fold(
(aiState) {
if (!isClosed) {
add(LocalAiPluginEvent.didReceiveAiState(aiState));
}
},
Log.error,
);
@ -69,10 +75,14 @@ class LocalAiPluginBloc extends Bloc<LocalAiPluginEvent, LocalAiPluginState> {
void _startListening() {
listener.start(
stateCallback: (pluginState) {
if (!isClosed) {
add(LocalAiPluginEvent.didReceiveAiState(pluginState));
}
},
resourceCallback: (data) {
if (!isClosed) {
add(LocalAiPluginEvent.didReceiveLackOfResources(data));
}
},
);
}
@ -80,7 +90,9 @@ class LocalAiPluginBloc extends Bloc<LocalAiPluginEvent, LocalAiPluginState> {
void _getLocalAiState() {
AIEventGetLocalAIState().send().fold(
(aiState) {
if (!isClosed) {
add(LocalAiPluginEvent.didReceiveAiState(aiState));
}
},
Log.error,
);

View file

@ -76,12 +76,6 @@ class SpaceBloc extends Bloc<SpaceEvent, SpaceState> {
final (spaces, publicViews, privateViews) = await _getSpaces();
final shouldShowUpgradeDialog = await this.shouldShowUpgradeDialog(
spaces: spaces,
publicViews: publicViews,
privateViews: privateViews,
);
final currentSpace = await _getLastOpenedSpace(spaces);
final isExpanded = await _getSpaceExpandStatus(currentSpace);
emit(
@ -89,17 +83,11 @@ class SpaceBloc extends Bloc<SpaceEvent, SpaceState> {
spaces: spaces,
currentSpace: currentSpace,
isExpanded: isExpanded,
shouldShowUpgradeDialog: shouldShowUpgradeDialog,
shouldShowUpgradeDialog: false,
isInitialized: true,
),
);
if (shouldShowUpgradeDialog && !integrationMode().isTest) {
if (!isClosed) {
add(const SpaceEvent.migrate());
}
}
if (openFirstPage) {
if (currentSpace != null) {
if (!isClosed) {

View file

@ -404,7 +404,7 @@ class ViewBloc extends Bloc<ViewEvent, ViewState> {
});
}
if (update.updateChildViews.isNotEmpty) {
if (update.updateChildViews.isNotEmpty && update.parentViewId.isNotEmpty) {
final view = await ViewBackendService.getView(update.parentViewId);
final childViews = view.fold((l) => l.childViews, (r) => []);
bool isSameOrder = true;

View file

@ -111,6 +111,12 @@ class ViewBackendService {
static Future<FlowyResult<List<ViewPB>, FlowyError>> getChildViews({
required String viewId,
}) {
if (viewId.isEmpty) {
return Future.value(
FlowyResult<List<ViewPB>, FlowyError>.success(<ViewPB>[]),
);
}
final payload = ViewIdPB.create()..value = viewId;
return FolderEventGetView(payload).send().then((result) {
@ -262,6 +268,9 @@ class ViewBackendService {
static Future<FlowyResult<ViewPB, FlowyError>> getView(
String viewId,
) async {
if (viewId.isEmpty) {
Log.error('ViewId is empty');
}
final payload = ViewIdPB.create()..value = viewId;
return FolderEventGetView(payload).send();
}

View file

@ -631,7 +631,7 @@ class PageNotifier extends ChangeNotifier {
}
// Set the plugin view as the latest view.
if (setLatest) {
if (setLatest && newPlugin.id.isNotEmpty) {
FolderEventSetLatestView(ViewIdPB(value: newPlugin.id)).send();
}

View file

@ -47,7 +47,6 @@ class _LocalAISettingState extends State<LocalAISetting> {
),
header: LocalAiSettingHeader(
isEnabled: state.isEnabled,
isToggleable: state is ReadyLocalAiPluginState,
),
collapsed: const SizedBox.shrink(),
expanded: Padding(
@ -65,11 +64,9 @@ class LocalAiSettingHeader extends StatelessWidget {
const LocalAiSettingHeader({
super.key,
required this.isEnabled,
required this.isToggleable,
});
final bool isEnabled;
final bool isToggleable;
@override
Widget build(BuildContext context) {
@ -91,22 +88,20 @@ class LocalAiSettingHeader extends StatelessWidget {
],
),
),
IgnorePointer(
ignoring: !isToggleable,
child: Opacity(
opacity: isToggleable ? 1 : 0.5,
child: Toggle(
Toggle(
value: isEnabled,
onChanged: (_) => _onToggleChanged(context),
),
),
onChanged: (value) {
_onToggleChanged(value, context);
},
),
],
);
}
void _onToggleChanged(BuildContext context) {
if (isEnabled) {
void _onToggleChanged(bool value, BuildContext context) {
if (value) {
context.read<LocalAiPluginBloc>().add(const LocalAiPluginEvent.toggle());
} else {
showConfirmDialog(
context: context,
title: LocaleKeys.settings_aiPage_keys_disableLocalAITitle.tr(),
@ -119,8 +114,6 @@ class LocalAiSettingHeader extends StatelessWidget {
.add(const LocalAiPluginEvent.toggle());
},
);
} else {
context.read<LocalAiPluginBloc>().add(const LocalAiPluginEvent.toggle());
}
}
}

View file

@ -283,7 +283,7 @@ impl EventIntegrationTest {
pub async fn open_workspace(&self, workspace_id: &str, auth_type: AuthTypePB) {
let payload = OpenUserWorkspacePB {
workspace_id: workspace_id.to_string(),
auth_type,
workspace_auth_type: auth_type,
};
EventBuilder::new(self.clone())
.event(UserEvent::OpenWorkspace)

View file

@ -1,106 +0,0 @@
use std::ops::Deref;
use assert_json_diff::assert_json_eq;
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use collab::preclude::updates::decoder::Decode;
use collab::preclude::{Collab, JsonValue, Update};
use collab_entity::CollabType;
use event_integration_test::event_builder::EventBuilder;
use flowy_database2::entities::{DatabasePB, DatabaseViewIdPB, RepeatedDatabaseSnapshotPB};
use flowy_database2::event_map::DatabaseEvent::*;
use flowy_folder::entities::ViewPB;
use crate::util::FlowySupabaseTest;
pub struct FlowySupabaseDatabaseTest {
pub uuid: String,
inner: FlowySupabaseTest,
}
impl FlowySupabaseDatabaseTest {
#[allow(dead_code)]
pub async fn new_with_user(uuid: String) -> Option<Self> {
let inner = FlowySupabaseTest::new().await?;
inner.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
Some(Self { uuid, inner })
}
pub async fn new_with_new_user() -> Option<Self> {
let inner = FlowySupabaseTest::new().await?;
let uuid = uuid::Uuid::new_v4().to_string();
let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
Some(Self { uuid, inner })
}
pub async fn create_database(&self) -> (ViewPB, DatabasePB) {
let current_workspace = self.inner.get_current_workspace().await;
let view = self
.inner
.create_grid(&current_workspace.id, "my database".to_string(), vec![])
.await;
let database = self.inner.get_database(&view.id).await;
(view, database)
}
pub async fn get_collab_json(&self, database_id: &str) -> JsonValue {
let database_editor = self
.database_manager
.get_database(database_id)
.await
.unwrap();
// let address = Arc::into_raw(database_editor.clone());
let database = database_editor.get_mutex_database().lock();
database.get_mutex_collab().to_json_value()
}
pub async fn get_database_snapshots(&self, view_id: &str) -> RepeatedDatabaseSnapshotPB {
EventBuilder::new(self.inner.deref().clone())
.event(GetDatabaseSnapshots)
.payload(DatabaseViewIdPB {
value: view_id.to_string(),
})
.async_send()
.await
.parse::<RepeatedDatabaseSnapshotPB>()
}
pub async fn get_database_collab_update(&self, database_id: &str) -> Vec<u8> {
let workspace_id = self.user_manager.workspace_id().unwrap();
let cloud_service = self.database_manager.get_cloud_service().clone();
cloud_service
.get_database_object_doc_state(database_id, CollabType::Database, &workspace_id)
.await
.unwrap()
.unwrap()
}
}
pub fn assert_database_collab_content(
database_id: &str,
collab_update: &[u8],
expected: JsonValue,
) {
let collab = MutexCollab::new(Collab::new_with_origin(
CollabOrigin::Server,
database_id,
vec![],
false,
));
collab.lock().with_origin_transact_mut(|txn| {
let update = Update::decode_v1(collab_update).unwrap();
txn.apply_update(update).unwrap();
});
let json = collab.to_json_value();
assert_json_eq!(json, expected);
}
impl Deref for FlowySupabaseDatabaseTest {
type Target = FlowySupabaseTest;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

View file

@ -1,2 +0,0 @@
mod helper;
mod test;

View file

@ -1,108 +0,0 @@
use std::time::Duration;
use flowy_database2::entities::{
DatabaseSnapshotStatePB, DatabaseSyncState, DatabaseSyncStatePB, FieldChangesetPB, FieldType,
};
use flowy_database2::notification::DatabaseNotification::DidUpdateDatabaseSnapshotState;
use crate::database::supabase_test::helper::{
assert_database_collab_content, FlowySupabaseDatabaseTest,
};
use crate::util::receive_with_timeout;
#[tokio::test]
async fn supabase_initial_database_snapshot_test() {
if let Some(test) = FlowySupabaseDatabaseTest::new_with_new_user().await {
let (view, database) = test.create_database().await;
let rx = test
.notification_sender
.subscribe::<DatabaseSnapshotStatePB>(&database.id, DidUpdateDatabaseSnapshotState);
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
let expected = test.get_collab_json(&database.id).await;
let snapshots = test.get_database_snapshots(&view.id).await;
assert_eq!(snapshots.items.len(), 1);
assert_database_collab_content(&database.id, &snapshots.items[0].data, expected);
}
}
#[tokio::test]
async fn supabase_edit_database_test() {
if let Some(test) = FlowySupabaseDatabaseTest::new_with_new_user().await {
let (view, database) = test.create_database().await;
let existing_fields = test.get_all_database_fields(&view.id).await;
for field in existing_fields.items {
if !field.is_primary {
test.delete_field(&view.id, &field.id).await;
}
}
let field = test.create_field(&view.id, FieldType::Checklist).await;
test
.update_field(FieldChangesetPB {
field_id: field.id.clone(),
view_id: view.id.clone(),
name: Some("hello world".to_string()),
..Default::default()
})
.await;
// wait all updates are send to the remote
let rx = test
.notification_sender
.subscribe_with_condition::<DatabaseSyncStatePB, _>(&database.id, |pb| {
pb.value == DatabaseSyncState::SyncFinished
});
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
assert_eq!(test.get_all_database_fields(&view.id).await.items.len(), 2);
let expected = test.get_collab_json(&database.id).await;
let update = test.get_database_collab_update(&database.id).await;
assert_database_collab_content(&database.id, &update, expected);
}
}
// #[tokio::test]
// async fn cloud_test_supabase_login_sync_database_test() {
// if let Some(test) = FlowySupabaseDatabaseTest::new_with_new_user().await {
// let uuid = test.uuid.clone();
// let (view, database) = test.create_database().await;
// // wait all updates are send to the remote
// let mut rx = test
// .notification_sender
// .subscribe_with_condition::<DatabaseSyncStatePB, _>(&database.id, |pb| pb.is_finish);
// receive_with_timeout(&mut rx, Duration::from_secs(30))
// .await
// .unwrap();
// let expected = test.get_collab_json(&database.id).await;
// test.sign_out().await;
// // Drop the test will cause the test resources to be dropped, which will
// // delete the user data folder.
// drop(test);
//
// let new_test = FlowySupabaseDatabaseTest::new_with_user(uuid)
// .await
// .unwrap();
// // let actual = new_test.get_collab_json(&database.id).await;
// // assert_json_eq!(actual, json!(""));
//
// new_test.open_database(&view.id).await;
//
// // wait all updates are synced from the remote
// let mut rx = new_test
// .notification_sender
// .subscribe_with_condition::<DatabaseSyncStatePB, _>(&database.id, |pb| pb.is_finish);
// receive_with_timeout(&mut rx, Duration::from_secs(30))
// .await
// .unwrap();
//
// // when the new sync is finished, the database should be the same as the old one
// let actual = new_test.get_collab_json(&database.id).await;
// assert_json_eq!(actual, expected);
// }
// }

View file

@ -1,65 +0,0 @@
use std::time::Duration;
use event_integration_test::document_event::assert_document_data_equal;
use flowy_document::entities::{DocumentSyncState, DocumentSyncStatePB};
use crate::document::supabase_test::helper::FlowySupabaseDocumentTest;
use crate::util::receive_with_timeout;
#[tokio::test]
async fn supabase_document_edit_sync_test() {
if let Some(test) = FlowySupabaseDocumentTest::new().await {
let view = test.create_document().await;
let document_id = view.id.clone();
let cloned_test = test.clone();
let cloned_document_id = document_id.clone();
test.appflowy_core.dispatcher().spawn(async move {
cloned_test
.insert_document_text(&cloned_document_id, "hello world", 0)
.await;
});
// wait all update are send to the remote
let rx = test
.notification_sender
.subscribe_with_condition::<DocumentSyncStatePB, _>(&document_id, |pb| {
pb.value != DocumentSyncState::Syncing
});
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
let document_data = test.get_document_data(&document_id).await;
let update = test.get_document_doc_state(&document_id).await;
assert_document_data_equal(&update, &document_id, document_data);
}
}
#[tokio::test]
async fn supabase_document_edit_sync_test2() {
if let Some(test) = FlowySupabaseDocumentTest::new().await {
let view = test.create_document().await;
let document_id = view.id.clone();
for i in 0..10 {
test
.insert_document_text(&document_id, "hello world", i)
.await;
}
// wait all update are send to the remote
let rx = test
.notification_sender
.subscribe_with_condition::<DocumentSyncStatePB, _>(&document_id, |pb| {
pb.value != DocumentSyncState::Syncing
});
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
let document_data = test.get_document_data(&document_id).await;
let update = test.get_document_doc_state(&document_id).await;
assert_document_data_equal(&update, &document_id, document_data);
}
}

View file

@ -1,118 +0,0 @@
// use std::fs::File;
// use std::io::{Cursor, Read};
// use std::path::Path;
//
// use uuid::Uuid;
// use zip::ZipArchive;
//
// use flowy_storage::StorageObject;
//
// use crate::document::supabase_test::helper::FlowySupabaseDocumentTest;
//
// #[tokio::test]
// async fn supabase_document_upload_text_file_test() {
// if let Some(test) = FlowySupabaseDocumentTest::new().await {
// let workspace_id = test.get_current_workspace().await.id;
// let storage_service = test
// .document_manager
// .get_file_storage_service()
// .upgrade()
// .unwrap();
//
// let object = StorageObject::from_bytes(
// &workspace_id,
// &Uuid::new_v4().to_string(),
// "hello world".as_bytes(),
// "text/plain".to_string(),
// );
//
// let url = storage_service.create_object(object).await.unwrap();
//
// let bytes = storage_service
// .get_object(url.clone())
// .await
// .unwrap();
// let s = String::from_utf8(bytes.to_vec()).unwrap();
// assert_eq!(s, "hello world");
//
// // Delete the text file
// let _ = storage_service.delete_object(url).await;
// }
// }
//
// #[tokio::test]
// async fn supabase_document_upload_zip_file_test() {
// if let Some(test) = FlowySupabaseDocumentTest::new().await {
// let workspace_id = test.get_current_workspace().await.id;
// let storage_service = test
// .document_manager
// .get_file_storage_service()
// .upgrade()
// .unwrap();
//
// // Upload zip file
// let object = StorageObject::from_file(
// &workspace_id,
// &Uuid::new_v4().to_string(),
// "./tests/asset/test.txt.zip",
// );
// let url = storage_service.create_object(object).await.unwrap();
//
// // Read zip file
// let zip_data = storage_service
// .get_object(url.clone())
// .await
// .unwrap();
// let reader = Cursor::new(zip_data);
// let mut archive = ZipArchive::new(reader).unwrap();
// for i in 0..archive.len() {
// let mut file = archive.by_index(i).unwrap();
// let name = file.name().to_string();
// let mut out = Vec::new();
// file.read_to_end(&mut out).unwrap();
//
// if name.starts_with("__MACOSX/") {
// continue;
// }
// assert_eq!(name, "test.txt");
// assert_eq!(String::from_utf8(out).unwrap(), "hello world");
// }
//
// // Delete the zip file
// let _ = storage_service.delete_object(url).await;
// }
// }
// #[tokio::test]
// async fn supabase_document_upload_image_test() {
// if let Some(test) = FlowySupabaseDocumentTest::new().await {
// let workspace_id = test.get_current_workspace().await.id;
// let storage_service = test
// .document_manager
// .get_file_storage_service()
// .upgrade()
// .unwrap();
//
// // Upload zip file
// let object = StorageObject::from_file(
// &workspace_id,
// &Uuid::new_v4().to_string(),
// "./tests/asset/logo.png",
// );
// let url = storage_service.create_object(object).await.unwrap();
//
// let image_data = storage_service
// .get_object(url.clone())
// .await
// .unwrap();
//
// // Read the image file
// let mut file = File::open(Path::new("./tests/asset/logo.png")).unwrap();
// let mut local_data = Vec::new();
// file.read_to_end(&mut local_data).unwrap();
//
// assert_eq!(image_data, local_data);
//
// // Delete the image
// let _ = storage_service.delete_object(url).await;
// }
// }

View file

@ -1,49 +0,0 @@
use std::ops::Deref;
use event_integration_test::event_builder::EventBuilder;
use flowy_document::entities::{OpenDocumentPayloadPB, RepeatedDocumentSnapshotMetaPB};
use flowy_document::event_map::DocumentEvent::GetDocumentSnapshotMeta;
use flowy_folder::entities::ViewPB;
use crate::util::FlowySupabaseTest;
pub struct FlowySupabaseDocumentTest {
inner: FlowySupabaseTest,
}
impl FlowySupabaseDocumentTest {
pub async fn new() -> Option<Self> {
let inner = FlowySupabaseTest::new().await?;
let uuid = uuid::Uuid::new_v4().to_string();
let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await;
Some(Self { inner })
}
pub async fn create_document(&self) -> ViewPB {
let current_workspace = self.inner.get_current_workspace().await;
self
.inner
.create_and_open_document(&current_workspace.id, "my document".to_string(), vec![])
.await
}
#[allow(dead_code)]
pub async fn get_document_snapshots(&self, view_id: &str) -> RepeatedDocumentSnapshotMetaPB {
EventBuilder::new(self.inner.deref().clone())
.event(GetDocumentSnapshotMeta)
.payload(OpenDocumentPayloadPB {
document_id: view_id.to_string(),
})
.async_send()
.await
.parse::<RepeatedDocumentSnapshotMetaPB>()
}
}
impl Deref for FlowySupabaseDocumentTest {
type Target = FlowySupabaseTest;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

View file

@ -1,3 +0,0 @@
mod edit_test;
mod file_test;
mod helper;

View file

@ -1,91 +0,0 @@
use std::ops::Deref;
use assert_json_diff::assert_json_eq;
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use collab::preclude::updates::decoder::Decode;
use collab::preclude::{Collab, JsonValue, Update};
use collab_entity::CollabType;
use collab_folder::FolderData;
use event_integration_test::event_builder::EventBuilder;
use flowy_folder::entities::{FolderSnapshotPB, RepeatedFolderSnapshotPB, WorkspaceIdPB};
use flowy_folder::event_map::FolderEvent::GetFolderSnapshots;
use crate::util::FlowySupabaseTest;
pub struct FlowySupabaseFolderTest {
inner: FlowySupabaseTest,
}
impl FlowySupabaseFolderTest {
pub async fn new() -> Option<Self> {
let inner = FlowySupabaseTest::new().await?;
let uuid = uuid::Uuid::new_v4().to_string();
let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await;
Some(Self { inner })
}
pub async fn get_collab_json(&self) -> JsonValue {
let folder = self.folder_manager.get_mutex_folder().lock();
folder.as_ref().unwrap().to_json_value()
}
pub async fn get_local_folder_data(&self) -> FolderData {
let folder = self.folder_manager.get_mutex_folder().lock();
folder.as_ref().unwrap().get_folder_data().unwrap()
}
pub async fn get_folder_snapshots(&self, workspace_id: &str) -> Vec<FolderSnapshotPB> {
EventBuilder::new(self.inner.deref().clone())
.event(GetFolderSnapshots)
.payload(WorkspaceIdPB {
value: workspace_id.to_string(),
})
.async_send()
.await
.parse::<RepeatedFolderSnapshotPB>()
.items
}
pub async fn get_collab_update(&self, workspace_id: &str) -> Vec<u8> {
let cloud_service = self.folder_manager.get_cloud_service().clone();
cloud_service
.get_folder_doc_state(
workspace_id,
self.user_manager.user_id().unwrap(),
CollabType::Folder,
workspace_id,
)
.await
.unwrap()
}
}
pub fn assert_folder_collab_content(workspace_id: &str, collab_update: &[u8], expected: JsonValue) {
if collab_update.is_empty() {
panic!("collab update is empty");
}
let collab = MutexCollab::new(Collab::new_with_origin(
CollabOrigin::Server,
workspace_id,
vec![],
false,
));
collab.lock().with_origin_transact_mut(|txn| {
let update = Update::decode_v1(collab_update).unwrap();
txn.apply_update(update).unwrap();
});
let json = collab.to_json_value();
assert_json_eq!(json["folder"], expected);
}
impl Deref for FlowySupabaseFolderTest {
type Target = FlowySupabaseTest;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

View file

@ -1,2 +0,0 @@
mod helper;
mod test;

View file

@ -1,122 +0,0 @@
use std::time::Duration;
use assert_json_diff::assert_json_eq;
use serde_json::json;
use flowy_folder::entities::{FolderSnapshotStatePB, FolderSyncStatePB};
use flowy_folder::notification::FolderNotification::DidUpdateFolderSnapshotState;
use crate::folder::supabase_test::helper::{assert_folder_collab_content, FlowySupabaseFolderTest};
use crate::util::{get_folder_data_from_server, receive_with_timeout};
#[tokio::test]
async fn supabase_encrypt_folder_test() {
if let Some(test) = FlowySupabaseFolderTest::new().await {
let uid = test.user_manager.user_id().unwrap();
let secret = test.enable_encryption().await;
let local_folder_data = test.get_local_folder_data().await;
let workspace_id = test.get_current_workspace().await.id;
let remote_folder_data = get_folder_data_from_server(&uid, &workspace_id, Some(secret))
.await
.unwrap()
.unwrap();
assert_json_eq!(json!(local_folder_data), json!(remote_folder_data));
}
}
#[tokio::test]
async fn supabase_decrypt_folder_data_test() {
if let Some(test) = FlowySupabaseFolderTest::new().await {
let uid = test.user_manager.user_id().unwrap();
let secret = Some(test.enable_encryption().await);
let workspace_id = test.get_current_workspace().await.id;
test
.create_view(&workspace_id, "encrypt view".to_string())
.await;
let rx = test
.notification_sender
.subscribe_with_condition::<FolderSyncStatePB, _>(&workspace_id, |pb| pb.is_finish);
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();
let folder_data = get_folder_data_from_server(&uid, &workspace_id, secret)
.await
.unwrap()
.unwrap();
assert_eq!(folder_data.views.len(), 2);
assert_eq!(folder_data.views[1].name, "encrypt view");
}
}
#[tokio::test]
#[should_panic]
async fn supabase_decrypt_with_invalid_secret_folder_data_test() {
if let Some(test) = FlowySupabaseFolderTest::new().await {
let uid = test.user_manager.user_id().unwrap();
let _ = Some(test.enable_encryption().await);
let workspace_id = test.get_current_workspace().await.id;
test
.create_view(&workspace_id, "encrypt view".to_string())
.await;
let rx = test
.notification_sender
.subscribe_with_condition::<FolderSyncStatePB, _>(&workspace_id, |pb| pb.is_finish);
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();
let _ = get_folder_data_from_server(&uid, &workspace_id, Some("invalid secret".to_string()))
.await
.unwrap();
}
}
#[tokio::test]
async fn supabase_folder_snapshot_test() {
if let Some(test) = FlowySupabaseFolderTest::new().await {
let workspace_id = test.get_current_workspace().await.id;
let rx = test
.notification_sender
.subscribe::<FolderSnapshotStatePB>(&workspace_id, DidUpdateFolderSnapshotState);
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();
let expected = test.get_collab_json().await;
let snapshots = test.get_folder_snapshots(&workspace_id).await;
assert_eq!(snapshots.len(), 1);
assert_folder_collab_content(&workspace_id, &snapshots[0].data, expected);
}
}
#[tokio::test]
async fn supabase_initial_folder_snapshot_test2() {
if let Some(test) = FlowySupabaseFolderTest::new().await {
let workspace_id = test.get_current_workspace().await.id;
test
.create_view(&workspace_id, "supabase test view1".to_string())
.await;
test
.create_view(&workspace_id, "supabase test view2".to_string())
.await;
test
.create_view(&workspace_id, "supabase test view3".to_string())
.await;
let rx = test
.notification_sender
.subscribe_with_condition::<FolderSyncStatePB, _>(&workspace_id, |pb| pb.is_finish);
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();
let expected = test.get_collab_json().await;
let update = test.get_collab_update(&workspace_id).await;
assert_folder_collab_content(&workspace_id, &update, expected);
}
}

View file

@ -5,6 +5,8 @@ use collab_entity::CollabType;
use collab_folder::Folder;
use event_integration_test::user_event::use_localhost_af_cloud;
use event_integration_test::EventIntegrationTest;
use flowy_user::entities::AFRolePB;
use flowy_user_pub::cloud::UserCloudServiceProvider;
use flowy_user_pub::entities::AuthType;
use std::time::Duration;
use tokio::task::LocalSet;
@ -150,7 +152,7 @@ async fn af_cloud_open_workspace_test() {
test
.open_workspace(
&first_workspace.workspace_id,
first_workspace.workspace_auth_type.clone(),
first_workspace.workspace_auth_type,
)
.await;
sleep(Duration::from_millis(300)).await;
@ -161,7 +163,7 @@ async fn af_cloud_open_workspace_test() {
test
.open_workspace(
&second_workspace.workspace_id,
second_workspace.workspace_auth_type.clone(),
second_workspace.workspace_auth_type,
)
.await;
sleep(Duration::from_millis(200)).await;
@ -174,7 +176,7 @@ async fn af_cloud_open_workspace_test() {
test
.open_workspace(
&first_workspace.workspace_id,
first_workspace.workspace_auth_type.clone(),
first_workspace.workspace_auth_type,
)
.await;
let views_1 = test.get_all_workspace_views().await;
@ -186,7 +188,7 @@ async fn af_cloud_open_workspace_test() {
test
.open_workspace(
&second_workspace.workspace_id,
second_workspace.workspace_auth_type.clone(),
second_workspace.workspace_auth_type,
)
.await;
let views_2 = test.get_all_workspace_views().await;
@ -245,10 +247,7 @@ async fn af_cloud_different_open_same_workspace_test() {
let index = i % 2;
let iter_workspace_id = &all_workspaces[index].workspace_id;
client
.open_workspace(
iter_workspace_id,
all_workspaces[index].workspace_auth_type.clone(),
)
.open_workspace(iter_workspace_id, all_workspaces[index].workspace_auth_type)
.await;
if iter_workspace_id == &cloned_shared_workspace_id {
let views = client.get_all_workspace_views().await;
@ -295,36 +294,127 @@ async fn af_cloud_different_open_same_workspace_test() {
#[tokio::test]
async fn af_cloud_create_local_workspace_test() {
// Setup: Initialize test environment with AppFlowyCloud
use_localhost_af_cloud().await;
let test = EventIntegrationTest::new().await;
let _ = test.af_cloud_sign_up().await;
let workspaces = test.get_all_workspaces().await.items;
assert_eq!(workspaces.len(), 1);
// Verify initial state: User should have one default workspace
let initial_workspaces = test.get_all_workspaces().await.items;
assert_eq!(
initial_workspaces.len(),
1,
"User should start with one default workspace"
);
let created_workspace = test
// make sure the workspaces order is consistent
// tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
// Test: Create a local workspace
let local_workspace = test
.create_workspace("my local workspace", AuthType::Local)
.await;
assert_eq!(created_workspace.name, "my local workspace");
let workspaces = test.get_all_workspaces().await.items;
assert_eq!(workspaces.len(), 2);
assert_eq!(workspaces[1].name, "my local workspace");
// Verify: Local workspace was created correctly
assert_eq!(local_workspace.name, "my local workspace");
let updated_workspaces = test.get_all_workspaces().await.items;
assert_eq!(
updated_workspaces.len(),
2,
"Should now have two workspaces"
);
dbg!(&updated_workspaces);
// Find local workspace by name instead of using index
let found_local_workspace = updated_workspaces
.iter()
.find(|workspace| workspace.name == "my local workspace")
.expect("Local workspace should exist");
assert_eq!(found_local_workspace.name, "my local workspace");
// Test: Open the local workspace
test
.open_workspace(
&created_workspace.workspace_id,
created_workspace.workspace_auth_type,
&local_workspace.workspace_id,
local_workspace.workspace_auth_type,
)
.await;
// Verify: Views in the local workspace
let views = test.get_all_views().await;
assert_eq!(views.len(), 2);
assert!(views
assert_eq!(
views.len(),
2,
"Local workspace should have 2 default views"
);
assert!(
views
.iter()
.any(|view| view.parent_view_id == workspaces[1].workspace_id));
.any(|view| view.parent_view_id == local_workspace.workspace_id),
"Views should belong to the local workspace"
);
// Verify: Can access all views
for view in views {
test.get_view(&view.id).await;
}
// Verify: Local workspace members
let members = test
.get_workspace_members(&local_workspace.workspace_id)
.await;
assert_eq!(
members.len(),
1,
"Local workspace should have only one member"
);
assert_eq!(members[0].role, AFRolePB::Owner, "User should be the owner");
// Test: Create a server workspace
let server_workspace = test
.create_workspace("my server workspace", AuthType::AppFlowyCloud)
.await;
// Verify: Server workspace was created correctly
assert_eq!(server_workspace.name, "my server workspace");
let final_workspaces = test.get_all_workspaces().await.items;
assert_eq!(
final_workspaces.len(),
3,
"Should now have three workspaces"
);
dbg!(&final_workspaces);
// Find workspaces by name instead of using indices
let found_local_workspace = final_workspaces
.iter()
.find(|workspace| workspace.name == "my local workspace")
.expect("Local workspace should exist");
assert_eq!(found_local_workspace.name, "my local workspace");
let found_server_workspace = final_workspaces
.iter()
.find(|workspace| workspace.name == "my server workspace")
.expect("Server workspace should exist");
assert_eq!(found_server_workspace.name, "my server workspace");
// Verify: Server-side only recognizes cloud workspaces (not local ones)
let user_profile = test.get_user_profile().await.unwrap();
test
.server_provider
.set_server_auth_type(&AuthType::AppFlowyCloud, Some(user_profile.token.clone()))
.unwrap();
test.server_provider.set_token(&user_profile.token).unwrap();
let user_service = test.server_provider.get_server().unwrap().user_service();
let server_workspaces = user_service
.get_all_workspace(user_profile.id)
.await
.unwrap();
assert_eq!(
server_workspaces.len(),
2,
"Server should only see 2 workspaces (the default and server workspace, not the local one)"
);
}

View file

@ -1,502 +0,0 @@
use std::collections::HashMap;
use assert_json_diff::assert_json_eq;
use collab_database::rows::database_row_document_id_from_row_id;
use collab_document::blocks::DocumentData;
use collab_entity::CollabType;
use collab_folder::FolderData;
use nanoid::nanoid;
use serde_json::json;
use event_integration_test::document::document_event::DocumentEventTest;
use event_integration_test::event_builder::EventBuilder;
use event_integration_test::EventIntegrationTest;
use flowy_core::DEFAULT_NAME;
use flowy_encrypt::decrypt_text;
use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID};
use flowy_user::entities::{
AuthenticatorPB, OauthSignInPB, UpdateUserProfilePayloadPB, UserProfilePB,
};
use flowy_user::errors::ErrorCode;
use flowy_user::event_map::UserEvent::*;
use crate::util::*;
#[tokio::test]
async fn third_party_sign_up_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new().await;
let mut map = HashMap::new();
map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string());
map.insert(
USER_EMAIL.to_string(),
format!("{}@appflowy.io", nanoid!(6)),
);
map.insert(USER_DEVICE_ID.to_string(), uuid::Uuid::new_v4().to_string());
let payload = OauthSignInPB {
map,
authenticator: AuthenticatorPB::Supabase,
};
let response = EventBuilder::new(test.clone())
.event(OauthSignIn)
.payload(payload)
.async_send()
.await
.parse::<UserProfilePB>();
dbg!(&response);
}
}
#[tokio::test]
async fn third_party_sign_up_with_encrypt_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new().await;
test.supabase_party_sign_up().await;
let user_profile = test.get_user_profile().await.unwrap();
assert!(user_profile.encryption_sign.is_empty());
let secret = test.enable_encryption().await;
let user_profile = test.get_user_profile().await.unwrap();
assert!(!user_profile.encryption_sign.is_empty());
let decryption_sign = decrypt_text(user_profile.encryption_sign, &secret).unwrap();
assert_eq!(decryption_sign, user_profile.id.to_string());
}
}
#[tokio::test]
async fn third_party_sign_up_with_duplicated_uuid() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new().await;
let email = format!("{}@appflowy.io", nanoid!(6));
let mut map = HashMap::new();
map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string());
map.insert(USER_EMAIL.to_string(), email.clone());
map.insert(USER_DEVICE_ID.to_string(), uuid::Uuid::new_v4().to_string());
let response_1 = EventBuilder::new(test.clone())
.event(OauthSignIn)
.payload(OauthSignInPB {
map: map.clone(),
authenticator: AuthenticatorPB::Supabase,
})
.async_send()
.await
.parse::<UserProfilePB>();
dbg!(&response_1);
let response_2 = EventBuilder::new(test.clone())
.event(OauthSignIn)
.payload(OauthSignInPB {
map: map.clone(),
authenticator: AuthenticatorPB::Supabase,
})
.async_send()
.await
.parse::<UserProfilePB>();
assert_eq!(response_1, response_2);
};
}
#[tokio::test]
async fn third_party_sign_up_with_duplicated_email() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new().await;
let email = format!("{}@appflowy.io", nanoid!(6));
test
.supabase_sign_up_with_uuid(&uuid::Uuid::new_v4().to_string(), Some(email.clone()))
.await
.unwrap();
let error = test
.supabase_sign_up_with_uuid(&uuid::Uuid::new_v4().to_string(), Some(email.clone()))
.await
.err()
.unwrap();
assert_eq!(error.code, ErrorCode::Conflict);
};
}
#[tokio::test]
async fn sign_up_as_guest_and_then_update_to_new_cloud_user_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new_anon().await;
let old_views = test
.folder_manager
.get_current_workspace_public_views()
.await
.unwrap();
let old_workspace = test.folder_manager.get_current_workspace().await.unwrap();
let uuid = uuid::Uuid::new_v4().to_string();
test.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
let new_views = test
.folder_manager
.get_current_workspace_public_views()
.await
.unwrap();
let new_workspace = test.folder_manager.get_current_workspace().await.unwrap();
assert_eq!(old_views.len(), new_views.len());
assert_eq!(old_workspace.name, new_workspace.name);
assert_eq!(old_workspace.views.len(), new_workspace.views.len());
for (index, view) in old_views.iter().enumerate() {
assert_eq!(view.name, new_views[index].name);
assert_eq!(view.layout, new_views[index].layout);
assert_eq!(view.create_time, new_views[index].create_time);
}
}
}
#[tokio::test]
async fn sign_up_as_guest_and_then_update_to_existing_cloud_user_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new_anon().await;
let uuid = uuid::Uuid::new_v4().to_string();
let email = format!("{}@appflowy.io", nanoid!(6));
// The workspace of the guest will be migrated to the new user with given uuid
let _user_profile = test
.supabase_sign_up_with_uuid(&uuid, Some(email.clone()))
.await
.unwrap();
let old_cloud_workspace = test.folder_manager.get_current_workspace().await.unwrap();
let old_cloud_views = test
.folder_manager
.get_current_workspace_public_views()
.await
.unwrap();
assert_eq!(old_cloud_views.len(), 1);
assert_eq!(old_cloud_views.first().unwrap().child_views.len(), 1);
// sign out and then sign in as a guest
test.sign_out().await;
let _sign_up_context = test.sign_up_as_anon().await;
let new_workspace = test.folder_manager.get_current_workspace().await.unwrap();
test
.create_view(&new_workspace.id, "new workspace child view".to_string())
.await;
let new_workspace = test.folder_manager.get_current_workspace().await.unwrap();
assert_eq!(new_workspace.views.len(), 2);
// upload to cloud user with given uuid. This time the workspace of the guest will not be merged
// because the cloud user already has a workspace
test
.supabase_sign_up_with_uuid(&uuid, Some(email))
.await
.unwrap();
let new_cloud_workspace = test.folder_manager.get_current_workspace().await.unwrap();
let new_cloud_views = test
.folder_manager
.get_current_workspace_public_views()
.await
.unwrap();
assert_eq!(new_cloud_workspace, old_cloud_workspace);
assert_eq!(new_cloud_views, old_cloud_views);
}
}
#[tokio::test]
async fn get_user_profile_test() {
if let Some(test) = FlowySupabaseTest::new().await {
let uuid = uuid::Uuid::new_v4().to_string();
test.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
let result = test.get_user_profile().await;
assert!(result.is_ok());
}
}
#[tokio::test]
async fn update_user_profile_test() {
if let Some(test) = FlowySupabaseTest::new().await {
let uuid = uuid::Uuid::new_v4().to_string();
let profile = test.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
test
.update_user_profile(UpdateUserProfilePayloadPB::new(profile.id).name("lucas"))
.await;
let new_profile = test.get_user_profile().await.unwrap();
assert_eq!(new_profile.name, "lucas")
}
}
#[tokio::test]
async fn update_user_profile_with_existing_email_test() {
if let Some(test) = FlowySupabaseTest::new().await {
let email = format!("{}@appflowy.io", nanoid!(6));
let _ = test
.supabase_sign_up_with_uuid(&uuid::Uuid::new_v4().to_string(), Some(email.clone()))
.await;
let profile = test
.supabase_sign_up_with_uuid(
&uuid::Uuid::new_v4().to_string(),
Some(format!("{}@appflowy.io", nanoid!(6))),
)
.await
.unwrap();
let error = test
.update_user_profile(
UpdateUserProfilePayloadPB::new(profile.id)
.name("lucas")
.email(&email),
)
.await
.unwrap();
assert_eq!(error.code, ErrorCode::Conflict);
}
}
#[tokio::test]
async fn migrate_anon_document_on_cloud_signup() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new().await;
let user_profile = test.sign_up_as_anon().await.user_profile;
let view = test
.create_view(&user_profile.workspace_id, "My first view".to_string())
.await;
let document_event = DocumentEventTest::new_with_core(test.clone());
let block_id = document_event
.insert_index(&view.id, "hello world", 1, None)
.await;
let _ = test.supabase_party_sign_up().await;
let workspace_id = test.user_manager.workspace_id().unwrap();
// After sign up, the documents should be migrated to the cloud
// So, we can get the document data from the cloud
let data: DocumentData = test
.document_manager
.get_cloud_service()
.get_document_data(&view.id, &workspace_id)
.await
.unwrap()
.unwrap();
let block = data.blocks.get(&block_id).unwrap();
assert_json_eq!(
block.data,
json!({
"delta": [
{
"insert": "hello world"
}
]
})
);
}
}
#[tokio::test]
async fn migrate_anon_data_on_cloud_signup() {
if get_supabase_config().is_some() {
let (cleaner, user_db_path) = unzip(
"./tests/user/supabase_test/history_user_db",
"workspace_sync",
)
.unwrap();
let test =
EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await;
let user_profile = test.supabase_party_sign_up().await;
// Get the folder data from remote
let folder_data: FolderData = test
.folder_manager
.get_cloud_service()
.get_folder_data(&user_profile.workspace_id, &user_profile.id)
.await
.unwrap()
.unwrap();
let expected_folder_data = expected_workspace_sync_folder_data();
assert_eq!(folder_data.views.len(), expected_folder_data.views.len());
// After migration, the ids of the folder_data should be different from the expected_folder_data
for i in 0..folder_data.views.len() {
let left_view = &folder_data.views[i];
let right_view = &expected_folder_data.views[i];
assert_ne!(left_view.id, right_view.id);
assert_ne!(left_view.parent_view_id, right_view.parent_view_id);
assert_eq!(left_view.name, right_view.name);
}
assert_ne!(folder_data.workspace.id, expected_folder_data.workspace.id);
assert_ne!(folder_data.current_view, expected_folder_data.current_view);
let database_views = folder_data
.views
.iter()
.filter(|view| view.layout.is_database())
.collect::<Vec<_>>();
// Try to load the database from the cloud.
for (i, database_view) in database_views.iter().enumerate() {
let cloud_service = test.database_manager.get_cloud_service();
let database_id = test
.database_manager
.get_database_id_with_view_id(&database_view.id)
.await
.unwrap();
let editor = test
.database_manager
.get_database(&database_id)
.await
.unwrap();
// The database view setting should be loaded by the view id
let _ = editor
.get_database_view_setting(&database_view.id)
.await
.unwrap();
let rows = editor.get_rows(&database_view.id).await.unwrap();
assert_eq!(rows.len(), 3);
let workspace_id = test.user_manager.workspace_id().unwrap();
if i == 0 {
let first_row = rows.first().unwrap().as_ref();
let icon_url = first_row.meta.icon_url.clone().unwrap();
assert_eq!(icon_url, "😄");
let document_id = database_row_document_id_from_row_id(&first_row.row.id);
let document_data: DocumentData = test
.document_manager
.get_cloud_service()
.get_document_data(&document_id, &workspace_id)
.await
.unwrap()
.unwrap();
let editor = test
.document_manager
.get_document(&document_id)
.await
.unwrap();
let expected_document_data = editor.lock().get_document_data().unwrap();
// let expected_document_data = test
// .document_manager
// .get_document_data(&document_id)
// .await
// .unwrap();
assert_eq!(document_data, expected_document_data);
let json = json!(document_data);
assert_eq!(
json["blocks"]["LPMpo0Qaab"]["data"]["delta"][0]["insert"],
json!("Row document")
);
}
assert!(cloud_service
.get_database_object_doc_state(&database_id, CollabType::Database, &workspace_id)
.await
.is_ok());
}
drop(cleaner);
}
}
fn expected_workspace_sync_folder_data() -> FolderData {
serde_json::from_value::<FolderData>(json!({
"current_view": "e0811131-9928-4541-a174-20b7553d9e4c",
"current_workspace_id": "8df7f755-fa5d-480e-9f8e-48ea0fed12b3",
"views": [
{
"children": {
"items": [
{
"id": "e0811131-9928-4541-a174-20b7553d9e4c"
},
{
"id": "53333949-c262-447b-8597-107589697059"
}
]
},
"created_at": 1693147093,
"desc": "",
"icon": null,
"id": "e203afb3-de5d-458a-8380-33cd788a756e",
"is_favorite": false,
"layout": 0,
"name": "⭐️ Getting started",
"parent_view_id": "8df7f755-fa5d-480e-9f8e-48ea0fed12b3"
},
{
"children": {
"items": [
{
"id": "11c697ba-5ed1-41c0-adfc-576db28ad27b"
},
{
"id": "4a5c25e2-a734-440c-973b-4c0e7ab0039c"
}
]
},
"created_at": 1693147096,
"desc": "",
"icon": null,
"id": "e0811131-9928-4541-a174-20b7553d9e4c",
"is_favorite": false,
"layout": 1,
"name": "database",
"parent_view_id": "e203afb3-de5d-458a-8380-33cd788a756e"
},
{
"children": {
"items": []
},
"created_at": 1693147124,
"desc": "",
"icon": null,
"id": "11c697ba-5ed1-41c0-adfc-576db28ad27b",
"is_favorite": false,
"layout": 3,
"name": "calendar",
"parent_view_id": "e0811131-9928-4541-a174-20b7553d9e4c"
},
{
"children": {
"items": []
},
"created_at": 1693147125,
"desc": "",
"icon": null,
"id": "4a5c25e2-a734-440c-973b-4c0e7ab0039c",
"is_favorite": false,
"layout": 2,
"name": "board",
"parent_view_id": "e0811131-9928-4541-a174-20b7553d9e4c"
},
{
"children": {
"items": []
},
"created_at": 1693147133,
"desc": "",
"icon": null,
"id": "53333949-c262-447b-8597-107589697059",
"is_favorite": false,
"layout": 0,
"name": "document",
"parent_view_id": "e203afb3-de5d-458a-8380-33cd788a756e"
}
],
"workspaces": [
{
"child_views": {
"items": [
{
"id": "e203afb3-de5d-458a-8380-33cd788a756e"
}
]
},
"created_at": 1693147093,
"id": "8df7f755-fa5d-480e-9f8e-48ea0fed12b3",
"name": "Workspace"
}
]
}))
.unwrap()
}

View file

@ -1,4 +0,0 @@
## Don't modify the zip files in this folder
The zip files in this folder are used for integration tests. If the tests fail, it means users upgrading to this version of AppFlowy will encounter issues

View file

@ -1,2 +0,0 @@
mod auth_test;
mod workspace_test;

View file

@ -1,43 +0,0 @@
use std::collections::HashMap;
use event_integration_test::{event_builder::EventBuilder, EventIntegrationTest};
use flowy_folder::entities::WorkspaceSettingPB;
use flowy_folder::event_map::FolderEvent::GetCurrentWorkspaceSetting;
use flowy_server::supabase::define::{USER_EMAIL, USER_UUID};
use flowy_user::entities::{AuthenticatorPB, OauthSignInPB, UserProfilePB};
use flowy_user::event_map::UserEvent::*;
use crate::util::*;
#[tokio::test]
async fn initial_workspace_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new().await;
let mut map = HashMap::new();
map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string());
map.insert(
USER_EMAIL.to_string(),
format!("{}@gmail.com", uuid::Uuid::new_v4()),
);
let payload = OauthSignInPB {
map,
authenticator: AuthenticatorPB::Supabase,
};
let _ = EventBuilder::new(test.clone())
.event(OauthSignIn)
.payload(payload)
.async_send()
.await
.parse::<UserProfilePB>();
let workspace_settings = EventBuilder::new(test.clone())
.event(GetCurrentWorkspaceSetting)
.async_send()
.await
.parse::<WorkspaceSettingPB>();
assert!(workspace_settings.latest_view.is_some());
dbg!(&workspace_settings);
}
}

View file

@ -1,2 +1,3 @@
pub mod cloud;
pub mod persistence;
pub mod user_service;

View file

@ -0,0 +1,14 @@
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use lib_infra::async_trait::async_trait;
use std::path::PathBuf;
use uuid::Uuid;
#[async_trait]
pub trait AIUserService: Send + Sync + 'static {
fn user_id(&self) -> Result<i64, FlowyError>;
async fn is_local_model(&self) -> FlowyResult<bool>;
fn workspace_id(&self) -> Result<Uuid, FlowyError>;
fn sqlite_connection(&self, uid: i64) -> Result<DBConnection, FlowyError>;
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}

View file

@ -12,9 +12,8 @@ use dashmap::DashMap;
use flowy_ai_pub::cloud::{
AIModel, ChatCloudService, ChatSettings, UpdateChatParams, DEFAULT_AI_MODEL_NAME,
};
use flowy_error::{FlowyError, FlowyResult};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
use crate::notification::{chat_notification_builder, ChatNotification};
use crate::util::ai_available_models_key;
@ -22,6 +21,7 @@ use collab_integrate::persistence::collab_metadata_sql::{
batch_insert_collab_metadata, batch_select_collab_metadata, AFCollabMetadata,
};
use flowy_ai_pub::cloud::ai_dto::AvailableModel;
use flowy_ai_pub::user_service::AIUserService;
use flowy_storage_pub::storage::StorageService;
use lib_infra::async_trait::async_trait;
use lib_infra::util::timestamp;
@ -33,15 +33,6 @@ use tokio::sync::RwLock;
use tracing::{error, info, instrument, trace};
use uuid::Uuid;
#[async_trait]
pub trait AIUserService: Send + Sync + 'static {
fn user_id(&self) -> Result<i64, FlowyError>;
async fn is_local_model(&self) -> FlowyResult<bool>;
fn workspace_id(&self) -> Result<Uuid, FlowyError>;
fn sqlite_connection(&self, uid: i64) -> Result<DBConnection, FlowyError>;
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}
/// AIExternalService is an interface for external services that AI plugin can interact with.
#[async_trait]
pub trait AIExternalService: Send + Sync + 'static {
@ -113,36 +104,86 @@ impl AIManager {
}
}
#[instrument(skip_all, err)]
pub async fn initialize(&self, _workspace_id: &str) -> Result<(), FlowyError> {
let local_ai = self.local_ai.clone();
tokio::spawn(async move {
if let Err(err) = local_ai.destroy_plugin().await {
error!("Failed to destroy plugin: {}", err);
async fn reload_with_workspace_id(&self, workspace_id: &str) {
// Check if local AI is enabled for this workspace and if we're in local mode
let result = self.user_service.is_local_model().await;
if let Err(err) = &result {
if matches!(err.code, ErrorCode::UserNotLogin) {
info!("[AI Manager] User not logged in, skipping local AI reload");
return;
}
}
if let Err(err) = local_ai.reload().await {
error!("[AI Manager] failed to reload local AI: {:?}", err);
let is_local = result.unwrap_or(false);
let is_enabled = self.local_ai.is_enabled_on_workspace(workspace_id);
let is_running = self.local_ai.is_running();
info!(
"[AI Manager] Reloading workspace: {}, is_local: {}, is_enabled: {}, is_running: {}",
workspace_id, is_local, is_enabled, is_running
);
// Shutdown AI if it's running but shouldn't be (not enabled and not in local mode)
if is_running && !is_enabled && !is_local {
info!("[AI Manager] Local AI is running but not enabled, shutting it down");
let local_ai = self.local_ai.clone();
tokio::spawn(async move {
// Wait for 5 seconds to allow other services to initialize
// TODO: pick a right time to start plugin service. Maybe [UserStatusCallback::did_launch]
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
if let Err(err) = local_ai.toggle_plugin(false).await {
error!("[AI Manager] failed to shutdown local AI: {:?}", err);
}
});
return;
}
// Start AI if it's enabled but not running
if is_enabled && !is_running {
info!("[AI Manager] Local AI is enabled but not running, starting it now");
let local_ai = self.local_ai.clone();
tokio::spawn(async move {
// Wait for 5 seconds to allow other services to initialize
// TODO: pick a right time to start plugin service. Maybe [UserStatusCallback::did_launch]
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
if let Err(err) = local_ai.toggle_plugin(true).await {
error!("[AI Manager] failed to start local AI: {:?}", err);
}
});
return;
}
// Log status for other cases
if is_running {
info!("[AI Manager] Local AI is already running");
}
}
#[instrument(skip_all, err)]
pub async fn on_launch_if_authenticated(&self, workspace_id: &str) -> Result<(), FlowyError> {
self.reload_with_workspace_id(workspace_id).await;
Ok(())
}
pub async fn initialize_after_sign_in(&self, workspace_id: &str) -> Result<(), FlowyError> {
self.reload_with_workspace_id(workspace_id).await;
Ok(())
}
pub async fn initialize_after_sign_up(&self, workspace_id: &str) -> Result<(), FlowyError> {
self.reload_with_workspace_id(workspace_id).await;
Ok(())
}
#[instrument(skip_all, err)]
pub async fn initialize_after_open_workspace(
&self,
_workspace_id: &Uuid,
workspace_id: &Uuid,
) -> Result<(), FlowyError> {
let local_ai = self.local_ai.clone();
tokio::spawn(async move {
if let Err(err) = local_ai.destroy_plugin().await {
error!("Failed to destroy plugin: {}", err);
}
if let Err(err) = local_ai.reload().await {
error!("[AI Manager] failed to reload local AI: {:?}", err);
}
});
self
.reload_with_workspace_id(&workspace_id.to_string())
.await;
Ok(())
}
@ -450,13 +491,9 @@ impl AIManager {
pub async fn get_available_models(&self, source: String) -> FlowyResult<AvailableModelsPB> {
let is_local_mode = self.user_service.is_local_model().await?;
if is_local_mode {
let mut selected_model = AIModel::default();
let mut models = vec![];
if let Some(local_model) = self.local_ai.get_plugin_chat_model() {
let model = AIModel::local(local_model, "".to_string());
selected_model = model.clone();
models.push(model);
}
let setting = self.local_ai.get_local_ai_setting();
let selected_model = AIModel::local(setting.chat_model_name, "".to_string());
let models = vec![selected_model.clone()];
Ok(AvailableModelsPB {
models: models.into_iter().map(|m| m.into()).collect(),

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{
ChatMessageErrorPB, ChatMessageListPB, ChatMessagePB, PredefinedFormatPB,
RepeatedRelatedQuestionPB, StreamMessageParams,
@ -14,6 +13,7 @@ use flowy_ai_pub::persistence::{
select_answer_where_match_reply_message_id, select_chat_messages, upsert_chat_messages,
ChatMessageTable,
};
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use futures::{SinkExt, StreamExt};

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{CompleteTextPB, CompleteTextTaskPB, CompletionTypePB};
use allo_isolate::Isolate;
use std::str::FromStr;
@ -14,6 +13,7 @@ use futures::{SinkExt, StreamExt};
use lib_infra::isolate_stream::IsolateSink;
use crate::stream_message::StreamMessage;
use flowy_ai_pub::user_service::AIUserService;
use std::sync::{Arc, Weak};
use tokio::select;
use tracing::{error, info};

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{LocalAIPB, RunningStatePB};
use crate::local_ai::resource::{LLMResourceService, LocalAIResourceController};
use crate::notification::{
@ -17,6 +16,7 @@ use af_local_ai::ollama_plugin::OllamaAIPlugin;
use af_plugin::core::path::is_plugin_ready;
use af_plugin::core::plugin::RunningState;
use arc_swap::ArcSwapOption;
use flowy_ai_pub::user_service::AIUserService;
use futures_util::SinkExt;
use lib_infra::util::get_operating_system;
use serde::{Deserialize, Serialize};
@ -99,7 +99,7 @@ impl LocalAIController {
continue;
};
let key = local_ai_enabled_key(&workspace_id);
let key = local_ai_enabled_key(&workspace_id.to_string());
info!("[AI Plugin] state: {:?}", state);
// Read whether plugin is enabled from store; default to true
@ -157,14 +157,15 @@ impl LocalAIController {
}
#[instrument(level = "debug", skip_all)]
pub async fn observe_plugin_resource(&self) {
debug!(
"[AI Plugin] init plugin when first run. thread: {:?}",
std::thread::current().id()
);
let sys = get_operating_system();
if !sys.is_desktop() {
return;
}
debug!(
"[AI Plugin] observer plugin state. thread: {:?}",
std::thread::current().id()
);
async fn try_init_plugin(
resource: &Arc<LocalAIResourceController>,
ai_plugin: &Arc<OllamaAIPlugin>,
@ -196,12 +197,6 @@ impl LocalAIController {
});
}
pub async fn reload(&self) -> FlowyResult<()> {
let is_enabled = self.is_enabled();
self.toggle_plugin(is_enabled).await?;
Ok(())
}
fn upgrade_store_preferences(&self) -> FlowyResult<Arc<KVStorePreferences>> {
self
.store_preferences
@ -211,9 +206,6 @@ impl LocalAIController {
/// Indicate whether the local AI plugin is running.
pub fn is_running(&self) -> bool {
if !self.is_enabled() {
return false;
}
self.ai_plugin.get_plugin_running_state().is_running()
}
@ -225,18 +217,23 @@ impl LocalAIController {
return false;
}
if let Ok(key) = self
.user_service
.workspace_id()
.map(|workspace_id| local_ai_enabled_key(&workspace_id))
{
if let Ok(workspace_id) = self.user_service.workspace_id() {
self.is_enabled_on_workspace(&workspace_id.to_string())
} else {
false
}
}
pub fn is_enabled_on_workspace(&self, workspace_id: &str) -> bool {
let key = local_ai_enabled_key(workspace_id);
if !get_operating_system().is_desktop() {
return false;
}
match self.upgrade_store_preferences() {
Ok(store) => store.get_bool(&key).unwrap_or(false),
Err(_) => false,
}
} else {
false
}
}
pub fn get_plugin_chat_model(&self) -> Option<String> {
@ -298,7 +295,8 @@ impl LocalAIController {
);
if self.resource.set_llm_setting(setting).await.is_ok() {
self.reload().await?;
let is_enabled = self.is_enabled();
self.toggle_plugin(is_enabled).await?;
}
Ok(())
}
@ -373,7 +371,7 @@ impl LocalAIController {
pub async fn toggle_local_ai(&self) -> FlowyResult<bool> {
let workspace_id = self.user_service.workspace_id()?;
let key = local_ai_enabled_key(&workspace_id);
let key = local_ai_enabled_key(&workspace_id.to_string());
let store_preferences = self.upgrade_store_preferences()?;
let enabled = !store_preferences.get_bool(&key).unwrap_or(true);
store_preferences.set_bool(&key, enabled)?;
@ -482,7 +480,7 @@ impl LocalAIController {
}
#[instrument(level = "debug", skip_all)]
async fn toggle_plugin(&self, enabled: bool) -> FlowyResult<()> {
pub(crate) async fn toggle_plugin(&self, enabled: bool) -> FlowyResult<()> {
info!(
"[AI Plugin] enable: {}, thread id: {:?}",
enabled,
@ -618,6 +616,6 @@ impl LLMResourceService for LLMResourceServiceImpl {
}
const APPFLOWY_LOCAL_AI_ENABLED: &str = "appflowy_local_ai_enabled";
fn local_ai_enabled_key(workspace_id: &Uuid) -> String {
fn local_ai_enabled_key(workspace_id: &str) -> String {
format!("{}:{}", APPFLOWY_LOCAL_AI_ENABLED, workspace_id)
}

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::local_ai::controller::LocalAISetting;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use lib_infra::async_trait::async_trait;
@ -11,6 +10,7 @@ use crate::notification::{
};
use af_local_ai::ollama_plugin::OllamaPluginConfig;
use af_plugin::core::path::{is_plugin_ready, ollama_plugin_path};
use flowy_ai_pub::user_service::AIUserService;
use lib_infra::util::{get_operating_system, OperatingSystem};
use reqwest::Client;
use serde::Deserialize;

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use crate::entities::{ChatStatePB, ModelTypePB};
use crate::local_ai::controller::LocalAIController;
use crate::notification::{
@ -19,6 +18,7 @@ use futures::{stream, StreamExt, TryStreamExt};
use lib_infra::async_trait::async_trait;
use crate::local_ai::stream_util::QuestionStream;
use flowy_ai_pub::user_service::AIUserService;
use flowy_storage_pub::storage::StorageService;
use serde_json::{json, Value};
use std::path::Path;

View file

@ -1,4 +1,3 @@
use crate::ai_manager::AIUserService;
use flowy_ai_pub::cloud::{
AIModel, ChatCloudService, ChatMessage, ChatMessageType, ChatSettings, CompleteTextParams,
MessageCursor, ModelList, RepeatedChatMessage, RepeatedRelatedQuestion, ResponseFormat,
@ -8,6 +7,7 @@ use flowy_ai_pub::persistence::{
update_chat_is_sync, update_chat_message_is_sync, upsert_chat, upsert_chat_messages,
ChatMessageTable, ChatTable,
};
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::FlowyError;
use lib_infra::async_trait::async_trait;
use serde_json::Value;

View file

@ -5,9 +5,10 @@ use collab::preclude::{Collab, StateVector};
use collab::util::is_change_since_sv;
use collab_entity::CollabType;
use collab_integrate::persistence::collab_metadata_sql::AFCollabMetadata;
use flowy_ai::ai_manager::{AIExternalService, AIManager, AIUserService};
use flowy_ai::ai_manager::{AIExternalService, AIManager};
use flowy_ai::local_ai::controller::LocalAIController;
use flowy_ai_pub::cloud::ChatCloudService;
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder::ViewLayout;
use flowy_folder_pub::cloud::{FolderCloudService, FullSyncCollabParams};
@ -153,7 +154,7 @@ impl AIExternalService for ChatQueryServiceImpl {
}
}
struct ChatUserServiceImpl(Weak<AuthenticateUser>);
pub struct ChatUserServiceImpl(Weak<AuthenticateUser>);
impl ChatUserServiceImpl {
fn upgrade_user(&self) -> Result<Arc<AuthenticateUser>, FlowyError> {
let user = self

View file

@ -161,6 +161,7 @@ impl StorageCloudService for ServerProvider {
impl UserCloudServiceProvider for ServerProvider {
fn set_token(&self, token: &str) -> Result<(), FlowyError> {
let server = self.get_server()?;
info!("Set token");
server.set_token(token)?;
Ok(())
}
@ -191,8 +192,12 @@ impl UserCloudServiceProvider for ServerProvider {
/// to create a new [AppFlowyServer] if it doesn't exist. Once the [AuthType] is set,
/// it will be used when user open the app again.
///
fn set_server_auth_type(&self, auth_type: &AuthType) {
fn set_server_auth_type(&self, auth_type: &AuthType, token: Option<String>) -> FlowyResult<()> {
self.set_auth_type(*auth_type);
if let Some(token) = token {
self.set_token(&token)?;
}
Ok(())
}
fn get_server_auth_type(&self) -> AuthType {

View file

@ -5,10 +5,8 @@ use dashmap::mapref::one::Ref;
use dashmap::DashMap;
use flowy_ai::local_ai::controller::LocalAIController;
use flowy_error::{FlowyError, FlowyResult};
use flowy_server::af_cloud::{
define::{AIUserServiceImpl, LoggedUser},
AppFlowyCloudServer,
};
use flowy_server::af_cloud::define::AIUserServiceImpl;
use flowy_server::af_cloud::{define::LoggedUser, AppFlowyCloudServer};
use flowy_server::local_server::LocalServer;
use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
use flowy_server_pub::AuthenticatorType;
@ -117,12 +115,14 @@ impl ServerProvider {
.cloud_config
.clone()
.ok_or_else(|| FlowyError::internal().with_context("Missing cloud config"))?;
let ai_user_service = Arc::new(AIUserServiceImpl(Arc::downgrade(&self.logged_user)));
Arc::new(AppFlowyCloudServer::new(
cfg,
self.user_enable_sync.load(Ordering::Acquire),
self.config.device_id.clone(),
self.config.app_version.clone(),
Arc::downgrade(&self.logged_user),
ai_user_service,
))
},
};

View file

@ -38,15 +38,6 @@ pub(crate) struct UserStatusCallbackImpl {
}
impl UserStatusCallbackImpl {
fn init_ai_component(&self, workspace_id: String) {
let cloned_ai_manager = self.ai_manager.clone();
self.runtime.spawn(async move {
if let Err(err) = cloned_ai_manager.initialize(&workspace_id).await {
error!("Failed to initialize AIManager: {:?}", err);
}
});
}
async fn folder_init_data_source(
&self,
user_id: i64,
@ -95,7 +86,6 @@ impl UserStatusCallback for UserStatusCallbackImpl {
auth_type: &AuthType,
) -> FlowyResult<()> {
let workspace_id = user_workspace.workspace_id()?;
if let Some(cloud_config) = cloud_config {
self
.server_provider
@ -124,7 +114,15 @@ impl UserStatusCallback for UserStatusCallbackImpl {
self.document_manager.initialize(user_id).await?;
let workspace_id = user_workspace.id.clone();
self.init_ai_component(workspace_id);
let cloned_ai_manager = self.ai_manager.clone();
self.runtime.spawn(async move {
if let Err(err) = cloned_ai_manager
.on_launch_if_authenticated(&workspace_id)
.await
{
error!("Failed to initialize AIManager: {:?}", err);
}
});
Ok(())
}
@ -158,8 +156,11 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.initialize_after_sign_in(user_id)
.await?;
let workspace_id = user_workspace.id.clone();
self.init_ai_component(workspace_id);
self
.ai_manager
.initialize_after_sign_in(&user_workspace.id)
.await?;
Ok(())
}
@ -207,8 +208,10 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.await
.context("DocumentManager error")?;
let workspace_id = user_workspace.id.clone();
self.init_ai_component(workspace_id);
self
.ai_manager
.initialize_after_sign_up(&user_workspace.id)
.await?;
Ok(())
}

View file

@ -2,12 +2,14 @@ use collab_folder::{View, ViewIcon, ViewLayout};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_error::ErrorCode;
use flowy_folder_pub::cloud::gen_view_id;
use lib_infra::validator_fn::required_not_empty_str;
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::Arc;
use uuid::Uuid;
use validator::Validate;
use crate::entities::icon::ViewIconPB;
use crate::entities::parser::view::{ViewIdentify, ViewName, ViewThumbnail};
@ -394,9 +396,10 @@ impl TryInto<CreateViewParams> for CreateOrphanViewPayloadPB {
}
}
#[derive(Default, ProtoBuf, Clone, Debug)]
#[derive(Default, ProtoBuf, Validate, Clone, Debug)]
pub struct ViewIdPB {
#[pb(index = 1)]
#[validate(custom(function = "required_not_empty_str"))]
pub value: String,
}

View file

@ -111,7 +111,7 @@ pub(crate) async fn get_view_handler(
folder: AFPluginState<Weak<FolderManager>>,
) -> DataResult<ViewPB, FlowyError> {
let folder = upgrade_folder(folder)?;
let view_id: ViewIdPB = data.into_inner();
let view_id = data.try_into_inner()?;
let view_pb = folder.get_view_pb(&view_id.value).await?;
data_result_ok(view_pb)
}

View file

@ -9,7 +9,7 @@ use collab_integrate::CollabKVDB;
use flowy_error::{FlowyError, FlowyResult};
use std::sync::{Arc, Weak};
use tokio::task::spawn_blocking;
use tracing::{event, info, Level};
use tracing::{error, event, info, Level};
use uuid::Uuid;
impl FolderManager {
@ -139,9 +139,12 @@ impl FolderManager {
);
let weak_folder_indexer = Arc::downgrade(&self.folder_indexer);
let workspace_id = *workspace_id;
tokio::spawn(async move {
if let Some(folder_indexer) = weak_folder_indexer.upgrade() {
folder_indexer.initialize().await;
if let Err(err) = folder_indexer.initialize(&workspace_id).await {
error!("Failed to initialize folder indexer: {:?}", err);
}
}
});

View file

@ -38,7 +38,7 @@ pub trait IndexManager: Send + Sync {
#[async_trait]
pub trait FolderIndexManager: IndexManager {
async fn initialize(&self);
async fn initialize(&self, workspace_id: &Uuid) -> Result<(), FlowyError>;
fn index_all_views(&self, views: Vec<Arc<View>>, workspace_id: Uuid);

View file

@ -35,8 +35,6 @@ impl Drop for TantivyState {
}
}
const FOLDER_INDEX_DIR: &str = "folder_index";
#[derive(Clone)]
pub struct FolderIndexManagerImpl {
auth_user: Weak<AuthenticateUser>,
@ -64,7 +62,7 @@ impl FolderIndexManagerImpl {
}
/// Initializes the state using the workspace directory.
async fn initialize(&self) -> FlowyResult<()> {
async fn initialize(&self, workspace_id: &Uuid) -> FlowyResult<()> {
if let Some(state) = self.state.write().await.take() {
info!("Re-initializing folder indexer");
drop(state);
@ -82,7 +80,7 @@ impl FolderIndexManagerImpl {
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("AuthenticateUser is not available"))?;
let index_path = auth_user.get_index_path()?.join(FOLDER_INDEX_DIR);
let index_path = auth_user.get_index_path()?.join(workspace_id.to_string());
if !index_path.exists() {
fs::create_dir_all(&index_path).map_err(|e| {
error!("Failed to create folder index directory: {:?}", e);
@ -327,10 +325,9 @@ impl IndexManager for FolderIndexManagerImpl {
#[async_trait]
impl FolderIndexManager for FolderIndexManagerImpl {
async fn initialize(&self) {
if let Err(e) = self.initialize().await {
error!("Failed to initialize FolderIndexManager: {:?}", e);
}
async fn initialize(&self, workspace_id: &Uuid) -> Result<(), FlowyError> {
self.initialize(workspace_id).await?;
Ok(())
}
fn index_all_views(&self, views: Vec<Arc<View>>, workspace_id: Uuid) {

View file

@ -1,5 +1,5 @@
use collab_plugins::CollabKVDB;
use flowy_ai::ai_manager::AIUserService;
use flowy_ai_pub::user_service::AIUserService;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use lib_infra::async_trait::async_trait;
@ -28,6 +28,7 @@ pub trait LoggedUser: Send + Sync {
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}
//
pub struct AIUserServiceImpl(pub Weak<dyn LoggedUser>);
impl AIUserServiceImpl {

View file

@ -656,6 +656,7 @@ fn to_user_workspace(af_workspace: AFWorkspace) -> UserWorkspace {
icon: af_workspace.icon,
member_count: af_workspace.member_count.unwrap_or(0),
role: af_workspace.role.map(|r| r.into()),
workspace_type: AuthType::AppFlowyCloud,
}
}

View file

@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use crate::af_cloud::define::{AIUserServiceImpl, LoggedUser};
use crate::af_cloud::define::LoggedUser;
use anyhow::Error;
use arc_swap::ArcSwap;
use client_api::collab_sync::ServerCollabMessage;
@ -28,7 +28,9 @@ use crate::af_cloud::impls::{
AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl, AFCloudFileStorageServiceImpl,
AFCloudFolderCloudServiceImpl, AFCloudUserAuthServiceImpl, CloudChatServiceImpl,
};
use crate::AppFlowyServer;
use flowy_ai::offline::offline_message_sync::AutoSyncChatService;
use flowy_ai_pub::user_service::AIUserService;
use rand::Rng;
use semver::Version;
use tokio::select;
@ -39,8 +41,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::AppFlowyServer;
use super::impls::AFCloudSearchCloudServiceImpl;
pub(crate) type AFCloudClient = Client;
@ -54,6 +54,7 @@ pub struct AppFlowyCloudServer {
pub device_id: String,
ws_client: Arc<WSClient>,
logged_user: Weak<dyn LoggedUser>,
ai_user_service: Arc<dyn AIUserService>,
}
impl AppFlowyCloudServer {
@ -63,6 +64,7 @@ impl AppFlowyCloudServer {
mut device_id: String,
client_version: Version,
logged_user: Weak<dyn LoggedUser>,
ai_user_service: Arc<dyn AIUserService>,
) -> Self {
// The device id can't be empty, so we generate a new one if it is.
if device_id.is_empty() {
@ -101,6 +103,7 @@ impl AppFlowyCloudServer {
device_id,
ws_client,
logged_user,
ai_user_service,
}
}
@ -222,7 +225,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
Arc::new(CloudChatServiceImpl {
inner: self.get_server_impl(),
}),
Arc::new(AIUserServiceImpl(self.logged_user.clone())),
self.ai_user_service.clone(),
))
}

View file

@ -2,6 +2,7 @@
use crate::af_cloud::define::LoggedUser;
use crate::local_server::uid::UserIDGenerator;
use anyhow::Context;
use client_api::entity::GotrueTokenResponse;
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
@ -13,10 +14,10 @@ use flowy_error::FlowyError;
use flowy_user_pub::cloud::{UserCloudService, UserCollabParams};
use flowy_user_pub::entities::*;
use flowy_user_pub::sql::{
select_all_user_workspace, select_user_profile, select_user_workspace, select_workspace_member,
select_workspace_setting, update_user_profile, update_workspace_setting, upsert_workspace_member,
upsert_workspace_setting, UserTableChangeset, WorkspaceMemberTable, WorkspaceSettingsChangeset,
WorkspaceSettingsTable,
insert_local_workspace, select_all_user_workspace, select_user_profile, select_user_workspace,
select_workspace_member, select_workspace_setting, update_user_profile, update_workspace_setting,
upsert_workspace_member, upsert_workspace_setting, UserTableChangeset, WorkspaceMemberTable,
WorkspaceSettingsChangeset, WorkspaceSettingsTable,
};
use flowy_user_pub::DEFAULT_USER_NAME;
use lazy_static::lazy_static;
@ -161,10 +162,11 @@ impl UserCloudService for LocalServerUserServiceImpl {
async fn create_workspace(&self, workspace_name: &str) -> Result<UserWorkspace, FlowyError> {
let workspace_id = Uuid::new_v4();
Ok(UserWorkspace::new_local(
workspace_id.to_string(),
workspace_name,
))
let uid = self.logged_user.user_id()?;
let mut conn = self.logged_user.get_sqlite_db(uid)?;
let user_workspace =
insert_local_workspace(uid, &workspace_id.to_string(), workspace_name, &mut conn)?;
Ok(user_workspace)
}
async fn patch_workspace(
@ -180,6 +182,15 @@ impl UserCloudService for LocalServerUserServiceImpl {
Ok(())
}
async fn get_workspace_members(
&self,
workspace_id: Uuid,
) -> Result<Vec<WorkspaceMember>, FlowyError> {
let uid = self.logged_user.user_id()?;
let member = self.get_workspace_member(&workspace_id, uid).await?;
Ok(vec![member])
}
async fn get_user_awareness_doc_state(
&self,
uid: i64,
@ -227,15 +238,16 @@ impl UserCloudService for LocalServerUserServiceImpl {
Err(err) => {
if err.is_record_not_found() {
let mut conn = self.logged_user.get_sqlite_db(uid)?;
let profile = select_user_profile(uid, &workspace_id.to_string(), &mut conn)?;
let profile = select_user_profile(uid, &workspace_id.to_string(), &mut conn)
.context("Can't find user profile when create workspace member")?;
let row = WorkspaceMemberTable {
email: profile.email.to_string(),
role: 0,
role: Role::Owner as i32,
name: profile.name.to_string(),
avatar_url: Some(profile.icon_url),
uid,
workspace_id: workspace_id.to_string(),
updated_at: Default::default(),
updated_at: chrono::Utc::now().naive_utc(),
};
let member = WorkspaceMember::from(row.clone());

View file

@ -1,2 +0,0 @@
mod user_test;
mod util;

View file

@ -1,21 +0,0 @@
use flowy_server::AppFlowyServer;
use flowy_user_pub::entities::AuthResponse;
use lib_infra::box_any::BoxAny;
use crate::af_cloud_test::util::{
af_cloud_server, af_cloud_sign_up_param, generate_test_email, get_af_cloud_config,
};
#[tokio::test]
async fn sign_up_test() {
if let Some(config) = get_af_cloud_config() {
let server = af_cloud_server(config.clone());
let user_service = server.user_service();
let email = generate_test_email();
let params = af_cloud_sign_up_param(&email, &config).await;
let resp: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert_eq!(resp.email.unwrap(), email);
assert!(resp.is_new_user);
assert_eq!(resp.user_workspaces.len(), 1);
}
}

View file

@ -1,119 +0,0 @@
use client_api::ClientConfiguration;
use collab_plugins::CollabKVDB;
use flowy_error::{FlowyError, FlowyResult};
use semver::Version;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use uuid::Uuid;
use crate::setup_log;
use flowy_server::af_cloud::define::LoggedUser;
use flowy_server::af_cloud::AppFlowyCloudServer;
use flowy_server_pub::af_cloud_config::AFCloudConfiguration;
use flowy_sqlite::DBConnection;
use lib_infra::async_trait::async_trait;
/// To run the test, create a .env.ci file in the 'flowy-server' directory and set the following environment variables:
///
/// - `APPFLOWY_CLOUD_BASE_URL=http://localhost:8000`
/// - `APPFLOWY_CLOUD_WS_BASE_URL=ws://localhost:8000/ws`
/// - `APPFLOWY_CLOUD_GOTRUE_URL=http://localhost:9998`
///
/// - `GOTRUE_ADMIN_EMAIL=admin@example.com`
/// - `GOTRUE_ADMIN_PASSWORD=password`
pub fn get_af_cloud_config() -> Option<AFCloudConfiguration> {
dotenv::from_filename("./.env.ci").ok()?;
setup_log();
AFCloudConfiguration::from_env().ok()
}
pub fn af_cloud_server(config: AFCloudConfiguration) -> Arc<AppFlowyCloudServer> {
let fake_device_id = uuid::Uuid::new_v4().to_string();
let logged_user = Arc::new(FakeServerUserImpl) as Arc<dyn LoggedUser>;
Arc::new(AppFlowyCloudServer::new(
config,
true,
fake_device_id,
Version::new(0, 5, 8),
// do nothing, just for test
Arc::downgrade(&logged_user),
))
}
struct FakeServerUserImpl;
#[async_trait]
impl LoggedUser for FakeServerUserImpl {
fn workspace_id(&self) -> FlowyResult<Uuid> {
todo!()
}
fn user_id(&self) -> FlowyResult<i64> {
todo!()
}
async fn is_local_mode(&self) -> FlowyResult<bool> {
Ok(true)
}
fn get_sqlite_db(&self, _uid: i64) -> Result<DBConnection, FlowyError> {
todo!()
}
fn get_collab_db(&self, _uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
todo!()
}
fn application_root_dir(&self) -> Result<PathBuf, FlowyError> {
todo!()
}
}
pub async fn generate_sign_in_url(user_email: &str, config: &AFCloudConfiguration) -> String {
let client = client_api::Client::new(
&config.base_url,
&config.ws_base_url,
&config.gotrue_url,
"fake_device_id",
ClientConfiguration::default(),
"test",
);
let admin_email = std::env::var("GOTRUE_ADMIN_EMAIL").unwrap();
let admin_password = std::env::var("GOTRUE_ADMIN_PASSWORD").unwrap();
let admin_client = client_api::Client::new(
client.base_url(),
client.ws_addr(),
client.gotrue_url(),
"fake_device_id",
ClientConfiguration::default(),
&client.client_version.to_string(),
);
admin_client
.sign_in_password(&admin_email, &admin_password)
.await
.unwrap();
let action_link = admin_client
.generate_sign_in_action_link(user_email)
.await
.unwrap();
client.extract_sign_in_url(&action_link).await.unwrap()
}
pub async fn af_cloud_sign_up_param(
email: &str,
config: &AFCloudConfiguration,
) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert(
"sign_in_url".to_string(),
generate_sign_in_url(email, config).await,
);
params.insert("device_id".to_string(), Uuid::new_v4().to_string());
params
}
pub fn generate_test_email() -> String {
format!("{}@test.com", Uuid::new_v4())
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

View file

@ -1,24 +0,0 @@
use std::sync::Once;
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
mod af_cloud_test;
// mod supabase_test;
pub fn setup_log() {
static START: Once = Once::new();
START.call_once(|| {
let level = "trace";
let mut filters = vec![];
filters.push(format!("flowy_server={}", level));
std::env::set_var("RUST_LOG", filters.join(","));
let subscriber = Subscriber::builder()
.with_env_filter(EnvFilter::from_default_env())
.with_ansi(true)
.finish();
subscriber.try_init().unwrap();
});
}

View file

@ -1,63 +0,0 @@
use collab::core::collab::DataSource;
use collab_entity::{CollabObject, CollabType};
use uuid::Uuid;
use flowy_user_pub::entities::AuthResponse;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
collab_service, database_service, get_supabase_ci_config, third_party_sign_up_param,
user_auth_service,
};
#[tokio::test]
async fn supabase_create_database_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_service = collab_service();
let database_service = database_service();
let mut row_ids = vec![];
for _i in 0..3 {
let row_id = uuid::Uuid::new_v4().to_string();
row_ids.push(row_id.clone());
let collab_object = CollabObject::new(
user.user_id,
row_id,
CollabType::DatabaseRow,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
collab_service
.send_update(&collab_object, 0, vec![1, 2, 3])
.await
.unwrap();
collab_service
.send_update(&collab_object, 0, vec![4, 5, 6])
.await
.unwrap();
}
let updates_by_oid = database_service
.batch_get_database_object_doc_state(row_ids, CollabType::DatabaseRow, "fake_workspace_id")
.await
.unwrap();
assert_eq!(updates_by_oid.len(), 3);
for (_, source) in updates_by_oid {
match source {
DataSource::Disk => panic!("should not be from disk"),
DataSource::DocStateV1(doc_state) => {
assert_eq!(doc_state.len(), 2);
},
DataSource::DocStateV2(_) => {},
}
}
}

View file

@ -1,78 +0,0 @@
// use url::Url;
// use uuid::Uuid;
//
// use flowy_storage::StorageObject;
//
// use crate::supabase_test::util::{file_storage_service, get_supabase_ci_config};
//
// #[tokio::test]
// async fn supabase_get_object_test() {
// if get_supabase_ci_config().is_none() {
// return;
// }
//
// let service = file_storage_service();
// let file_name = format!("test-{}.txt", Uuid::new_v4());
// let object = StorageObject::from_file("1", &file_name, "tests/test.txt");
//
// // Upload a file
// let url = service
// .create_object(object)
// .await
// .unwrap()
// .parse::<Url>()
// .unwrap();
//
// // The url would be something like:
// // https://acfrqdbdtbsceyjbxsfc.supabase.co/storage/v1/object/data/test-1693472809.txt
// let name = url.path_segments().unwrap().last().unwrap();
// assert_eq!(name, &file_name);
//
// // Download the file
// let bytes = service.get_object(url.to_string()).await.unwrap();
// let s = String::from_utf8(bytes.to_vec()).unwrap();
// assert_eq!(s, "hello world");
// }
//
// #[tokio::test]
// async fn supabase_upload_image_test() {
// if get_supabase_ci_config().is_none() {
// return;
// }
//
// let service = file_storage_service();
// let file_name = format!("image-{}.png", Uuid::new_v4());
// let object = StorageObject::from_file("1", &file_name, "tests/logo.png");
//
// // Upload a file
// let url = service
// .create_object(object)
// .await
// .unwrap()
// .parse::<Url>()
// .unwrap();
//
// // Download object by url
// let bytes = service.get_object(url.to_string()).await.unwrap();
// assert_eq!(bytes.len(), 15694);
// }
//
// #[tokio::test]
// async fn supabase_delete_object_test() {
// if get_supabase_ci_config().is_none() {
// return;
// }
//
// let service = file_storage_service();
// let file_name = format!("test-{}.txt", Uuid::new_v4());
// let object = StorageObject::from_file("1", &file_name, "tests/test.txt");
// let url = service.create_object(object).await.unwrap();
//
// let result = service.get_object(url.clone()).await;
// assert!(result.is_ok());
//
// let _ = service.delete_object(url.clone()).await;
//
// let result = service.get_object(url.clone()).await;
// assert!(result.is_err());
// }

View file

@ -1,316 +0,0 @@
use assert_json_diff::assert_json_eq;
use collab_entity::{CollabObject, CollabType};
use serde_json::json;
use uuid::Uuid;
use yrs::types::ToJson;
use yrs::updates::decoder::Decode;
use yrs::{merge_updates_v1, Array, Doc, Map, MapPrelim, ReadTxn, StateVector, Transact, Update};
use flowy_user_pub::entities::AuthResponse;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
collab_service, folder_service, get_supabase_ci_config, third_party_sign_up_param,
user_auth_service,
};
#[tokio::test]
async fn supabase_create_workspace_test() {
if get_supabase_ci_config().is_none() {
return;
}
let service = folder_service();
// will replace the uid with the real uid
let workspace = service.create_workspace(1, "test").await.unwrap();
dbg!(workspace);
}
#[tokio::test]
async fn supabase_get_folder_test() {
if get_supabase_ci_config().is_none() {
return;
}
let folder_service = folder_service();
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject::new(
user.user_id,
user.latest_workspace.id.clone(),
CollabType::Folder,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
let doc = Doc::with_client_id(1);
let map = { doc.get_or_insert_map("map") };
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "1", "a");
collab_service
.send_update(&collab_object, 0, txn.encode_update_v1())
.await
.unwrap();
};
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "2", "b");
collab_service
.send_update(&collab_object, 1, txn.encode_update_v1())
.await
.unwrap();
};
// let updates = collab_service.get_all_updates(&collab_object).await.unwrap();
let updates = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
assert_eq!(updates.len(), 2);
for _ in 0..5 {
collab_service
.send_init_sync(&collab_object, 3, vec![])
.await
.unwrap();
}
let updates = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
// Other the init sync, try to get the updates from the server.
let expected_update = doc
.transact_mut()
.encode_state_as_update_v1(&StateVector::default());
// check the update is the same as local document update.
assert_eq!(updates, expected_update);
}
/// This async test function checks the behavior of updates duplication in Supabase.
/// It creates a new user and simulates two updates to the user's workspace with different values.
/// Then, it merges these updates and sends an initial synchronization request to test duplication handling.
/// Finally, it asserts that the duplicated updates don't affect the overall data consistency in Supabase.
#[tokio::test]
async fn supabase_duplicate_updates_test() {
if get_supabase_ci_config().is_none() {
return;
}
let folder_service = folder_service();
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject::new(
user.user_id,
user.latest_workspace.id.clone(),
CollabType::Folder,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
let doc = Doc::with_client_id(1);
let map = { doc.get_or_insert_map("map") };
let mut duplicated_updates = vec![];
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "1", "a");
let update = txn.encode_update_v1();
duplicated_updates.push(update.clone());
collab_service
.send_update(&collab_object, 0, update)
.await
.unwrap();
};
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "2", "b");
let update = txn.encode_update_v1();
duplicated_updates.push(update.clone());
collab_service
.send_update(&collab_object, 1, update)
.await
.unwrap();
};
// send init sync
collab_service
.send_init_sync(&collab_object, 3, vec![])
.await
.unwrap();
let first_init_sync_update = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
// simulate the duplicated updates.
let merged_update = merge_updates_v1(
&duplicated_updates
.iter()
.map(|update| update.as_ref())
.collect::<Vec<&[u8]>>(),
)
.unwrap();
collab_service
.send_init_sync(&collab_object, 4, merged_update)
.await
.unwrap();
let second_init_sync_update = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
let doc_2 = Doc::new();
assert_eq!(first_init_sync_update.len(), second_init_sync_update.len());
let map = { doc_2.get_or_insert_map("map") };
{
let mut txn = doc_2.transact_mut();
let update = Update::decode_v1(&second_init_sync_update).unwrap();
txn.apply_update(update).unwrap();
}
{
let txn = doc_2.transact();
let json = map.to_json(&txn);
assert_json_eq!(
json,
json!({
"1": "a",
"2": "b"
})
);
}
}
/// The state vector of doc;
/// ```json
/// "map": {},
/// "array": []
/// ```
/// The old version of doc:
/// ```json
/// "map": {}
/// ```
///
/// Try to apply the updates from doc to old version doc and check the result.
#[tokio::test]
async fn supabase_diff_state_vector_test() {
if get_supabase_ci_config().is_none() {
return;
}
let folder_service = folder_service();
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject::new(
user.user_id,
user.latest_workspace.id.clone(),
CollabType::Folder,
user.latest_workspace.id.clone(),
"fake_device_id".to_string(),
);
let doc = Doc::with_client_id(1);
let map = { doc.get_or_insert_map("map") };
let array = { doc.get_or_insert_array("array") };
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "1", "a");
map.insert(&mut txn, "inner_map", MapPrelim::<String>::new());
array.push_back(&mut txn, "element 1");
let update = txn.encode_update_v1();
collab_service
.send_update(&collab_object, 0, update)
.await
.unwrap();
};
{
let mut txn = doc.transact_mut();
map.insert(&mut txn, "2", "b");
array.push_back(&mut txn, "element 2");
let update = txn.encode_update_v1();
collab_service
.send_update(&collab_object, 1, update)
.await
.unwrap();
};
// restore the doc with given updates.
let old_version_doc = Doc::new();
let map = { old_version_doc.get_or_insert_map("map") };
let doc_state = folder_service
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
&user.latest_workspace.id,
)
.await
.unwrap();
{
let mut txn = old_version_doc.transact_mut();
let update = Update::decode_v1(&doc_state).unwrap();
txn.apply_update(update).unwrap();
}
let txn = old_version_doc.transact();
let json = map.to_json(&txn);
assert_json_eq!(
json,
json!({
"1": "a",
"2": "b",
"inner_map": {}
})
);
}
// #[tokio::test]
// async fn print_folder_object_test() {
// if get_supabase_dev_config().is_none() {
// return;
// }
// let secret = Some("43bSxEPHeNkk5ZxxEYOfAjjd7sK2DJ$vVnxwuNc5ru0iKFvhs8wLg==".to_string());
// print_encryption_folder("f8b14b84-e8ec-4cf4-a318-c1e008ecfdfa", secret).await;
// }
//
// #[tokio::test]
// async fn print_folder_snapshot_object_test() {
// if get_supabase_dev_config().is_none() {
// return;
// }
// let secret = Some("NTXRXrDSybqFEm32jwMBDzbxvCtgjU$8np3TGywbBdJAzHtu1QIyQ==".to_string());
// // let secret = None;
// print_encryption_folder_snapshot("12533251-bdd4-41f4-995f-ff12fceeaa42", secret).await;
// }

View file

@ -1,5 +0,0 @@
mod database_test;
mod file_test;
mod folder_test;
mod user_test;
mod util;

View file

@ -1,141 +0,0 @@
use uuid::Uuid;
use flowy_encrypt::{encrypt_text, generate_encryption_secret};
use flowy_error::FlowyError;
use flowy_user_pub::entities::*;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
get_supabase_ci_config, third_party_sign_up_param, user_auth_service,
};
// ‼️‼️‼️ Warning: this test will create a table in the database
#[tokio::test]
async fn supabase_user_sign_up_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert!(!user.latest_workspace.id.is_empty());
assert!(!user.user_workspaces.is_empty());
assert!(!user.latest_workspace.database_indexer_id.is_empty());
}
#[tokio::test]
async fn supabase_user_sign_up_with_existing_uuid_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let _user: AuthResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
.unwrap();
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert!(!user.latest_workspace.id.is_empty());
assert!(!user.latest_workspace.database_indexer_id.is_empty());
assert!(!user.user_workspaces.is_empty());
}
#[tokio::test]
async fn supabase_update_user_profile_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
.unwrap();
let params = UpdateUserProfileParams::new(user.user_id)
.with_name("123")
.with_email(format!("{}@test.com", Uuid::new_v4()));
user_service
.update_user(UserCredentials::from_uid(user.user_id), params)
.await
.unwrap();
let user_profile = user_service
.get_user_profile(UserCredentials::from_uid(user.user_id))
.await
.unwrap();
assert_eq!(user_profile.name, "123");
}
#[tokio::test]
async fn supabase_get_user_profile_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
.unwrap();
let credential = UserCredentials::from_uid(user.user_id);
user_service
.get_user_profile(credential.clone())
.await
.unwrap();
}
#[tokio::test]
async fn supabase_get_not_exist_user_profile_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let result: FlowyError = user_service
.get_user_profile(UserCredentials::from_uid(i64::MAX))
.await
.unwrap_err();
// user not found
assert!(result.is_record_not_found());
}
#[tokio::test]
async fn user_encryption_sign_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: AuthResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
// generate encryption sign
let secret = generate_encryption_secret();
let sign = encrypt_text(user.user_id.to_string(), &secret).unwrap();
user_service
.update_user(
UserCredentials::from_uid(user.user_id),
UpdateUserProfileParams::new(user.user_id)
.with_encryption_type(EncryptionType::SelfEncryption(sign.clone())),
)
.await
.unwrap();
let user_profile: UserProfile = user_service
.get_user_profile(UserCredentials::from_uid(user.user_id))
.await
.unwrap();
assert_eq!(
user_profile.encryption_type,
EncryptionType::SelfEncryption(sign)
);
}

View file

@ -1,162 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use collab::core::collab::{DataSource, MutexCollab};
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use uuid::Uuid;
use flowy_database_pub::cloud::DatabaseCloudService;
use flowy_error::FlowyError;
use flowy_folder_pub::cloud::{Folder, FolderCloudService};
use flowy_server::supabase::api::{
RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
};
use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID};
use flowy_server::{AppFlowyEncryption, EncryptionImpl};
use flowy_server_pub::supabase_config::SupabaseConfiguration;
use flowy_user_pub::cloud::UserCloudService;
use lib_infra::future::FutureResult;
use crate::setup_log;
pub fn get_supabase_ci_config() -> Option<SupabaseConfiguration> {
dotenv::from_filename("./.env.ci").ok()?;
setup_log();
SupabaseConfiguration::from_env().ok()
}
#[allow(dead_code)]
pub fn get_supabase_dev_config() -> Option<SupabaseConfiguration> {
dotenv::from_filename("./.env.dev").ok()?;
setup_log();
SupabaseConfiguration::from_env().ok()
}
pub fn collab_service() -> Arc<dyn RemoteCollabStorage> {
let (server, encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseCollabStorageImpl::new(
server,
None,
Arc::downgrade(&encryption_impl),
))
}
pub fn database_service() -> Arc<dyn DatabaseCloudService> {
let (server, _encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseDatabaseServiceImpl::new(server))
}
pub fn user_auth_service() -> Arc<dyn UserCloudService> {
let (server, _encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseUserServiceImpl::new(server, vec![], None))
}
pub fn folder_service() -> Arc<dyn FolderCloudService> {
let (server, _encryption_impl) = supabase_server_service(None);
Arc::new(SupabaseFolderServiceImpl::new(server))
}
#[allow(dead_code)]
pub fn file_storage_service() -> Arc<dyn ObjectStorageCloudService> {
let encryption_impl: Arc<dyn AppFlowyEncryption> = Arc::new(EncryptionImpl::new(None));
let config = SupabaseConfiguration::from_env().unwrap();
Arc::new(
SupabaseFileStorage::new(
&config,
Arc::downgrade(&encryption_impl),
Arc::new(TestFileStoragePlan),
)
.unwrap(),
)
}
#[allow(dead_code)]
pub fn encryption_folder_service(
secret: Option<String>,
) -> (Arc<dyn FolderCloudService>, Arc<dyn AppFlowyEncryption>) {
let (server, encryption_impl) = supabase_server_service(secret);
let service = Arc::new(SupabaseFolderServiceImpl::new(server));
(service, encryption_impl)
}
#[allow(dead_code)]
pub fn encryption_collab_service(
secret: Option<String>,
) -> (Arc<dyn RemoteCollabStorage>, Arc<dyn AppFlowyEncryption>) {
let (server, encryption_impl) = supabase_server_service(secret);
let service = Arc::new(SupabaseCollabStorageImpl::new(
server,
None,
Arc::downgrade(&encryption_impl),
));
(service, encryption_impl)
}
#[allow(dead_code)]
pub async fn print_encryption_folder(
uid: &i64,
folder_id: &str,
encryption_secret: Option<String>,
) {
let (cloud_service, _encryption) = encryption_folder_service(encryption_secret);
let folder_data = cloud_service.get_folder_data(folder_id, uid).await.unwrap();
let json = serde_json::to_value(folder_data).unwrap();
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
#[allow(dead_code)]
pub async fn print_encryption_folder_snapshot(
uid: &i64,
folder_id: &str,
encryption_secret: Option<String>,
) {
let (cloud_service, _encryption) = encryption_collab_service(encryption_secret);
let snapshot = cloud_service
.get_snapshots(folder_id, 1)
.await
.pop()
.unwrap();
let collab = Arc::new(MutexCollab::new(
Collab::new_with_source(
CollabOrigin::Empty,
folder_id,
DataSource::DocStateV1(snapshot.blob),
vec![],
false,
)
.unwrap(),
));
let folder_data = Folder::open(uid, collab, None)
.unwrap()
.get_folder_data(folder_id)
.unwrap();
let json = serde_json::to_value(folder_data).unwrap();
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
pub fn supabase_server_service(
encryption_secret: Option<String>,
) -> (SupabaseServerServiceImpl, Arc<dyn AppFlowyEncryption>) {
let config = SupabaseConfiguration::from_env().unwrap();
let encryption_impl: Arc<dyn AppFlowyEncryption> =
Arc::new(EncryptionImpl::new(encryption_secret));
let encryption = Arc::downgrade(&encryption_impl);
let server = Arc::new(RESTfulPostgresServer::new(config, encryption));
(SupabaseServerServiceImpl::new(server), encryption_impl)
}
pub fn third_party_sign_up_param(uuid: String) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert(USER_UUID.to_string(), uuid);
params.insert(
USER_EMAIL.to_string(),
format!("{}@test.com", Uuid::new_v4()),
);
params.insert(USER_DEVICE_ID.to_string(), Uuid::new_v4().to_string());
params
}
pub struct TestFileStoragePlan;

View file

@ -1 +0,0 @@
hello world

View file

@ -84,7 +84,11 @@ pub trait UserCloudServiceProvider: Send + Sync {
/// * `enable_sync`: A boolean indicating whether synchronization should be enabled or disabled.
fn set_enable_sync(&self, uid: i64, enable_sync: bool);
fn set_server_auth_type(&self, auth_type: &AuthType);
fn set_server_auth_type(
&self,
auth_type: &AuthType,
token: Option<String>,
) -> Result<(), FlowyError>;
fn get_server_auth_type(&self) -> AuthType;
@ -231,9 +235,7 @@ pub trait UserCloudService: Send + Sync + 'static {
async fn get_workspace_members(
&self,
workspace_id: Uuid,
) -> Result<Vec<WorkspaceMember>, FlowyError> {
Ok(vec![])
}
) -> Result<Vec<WorkspaceMember>, FlowyError>;
async fn get_user_awareness_doc_state(
&self,

View file

@ -114,6 +114,12 @@ pub struct UserWorkspace {
pub member_count: i64,
#[serde(default)]
pub role: Option<Role>,
#[serde(default = "default_workspace_type")]
pub workspace_type: AuthType,
}
fn default_workspace_type() -> AuthType {
AuthType::AppFlowyCloud
}
impl UserWorkspace {
@ -131,6 +137,7 @@ impl UserWorkspace {
icon: "".to_string(),
member_count: 1,
role: Some(Role::Owner),
workspace_type: AuthType::Local,
}
}
}

View file

@ -1,4 +1,4 @@
use crate::entities::{UserAuthResponse, UserWorkspace};
use crate::entities::{AuthType, UserAuthResponse, UserWorkspace};
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use chrono::Utc;
@ -77,6 +77,7 @@ impl<'de> Visitor<'de> for SessionVisitor {
icon: "".to_owned(),
member_count: 1,
role: None,
workspace_type: AuthType::Local,
})
}
}

View file

@ -1,10 +1,12 @@
use crate::cloud::UserUpdate;
use crate::entities::{AuthType, UpdateUserProfileParams, UserProfile};
use crate::sql::select_user_workspace;
use crate::entities::{AuthType, Role, UpdateUserProfileParams, UserProfile, UserWorkspace};
use crate::sql::{
select_user_workspace, upsert_user_workspace, upsert_workspace_member, WorkspaceMemberTable,
};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::schema::user_table;
use flowy_sqlite::{prelude::*, DBConnection, ExpressionMethods, RunQueryDsl};
use tracing::{trace, warn};
use tracing::trace;
/// The order of the fields in the struct must be the same as the order of the fields in the table.
/// Check out the [schema.rs] for table schema.
@ -92,6 +94,33 @@ pub fn update_user_profile(
Ok(())
}
pub fn insert_local_workspace(
uid: i64,
workspace_id: &str,
workspace_name: &str,
conn: &mut SqliteConnection,
) -> FlowyResult<UserWorkspace> {
let user_workspace = UserWorkspace::new_local(workspace_id.to_string(), workspace_name);
conn.immediate_transaction(|conn| {
let row = select_user_table_row(uid, conn)?;
let row = WorkspaceMemberTable {
email: row.email,
role: Role::Owner as i32,
name: row.name,
avatar_url: Some(row.icon_url),
uid,
workspace_id: workspace_id.to_string(),
updated_at: chrono::Utc::now().naive_utc(),
};
upsert_user_workspace(uid, AuthType::Local, user_workspace.clone(), conn)?;
upsert_workspace_member(conn, row)?;
Ok::<_, FlowyError>(())
})?;
Ok(user_workspace)
}
fn select_user_table_row(uid: i64, conn: &mut SqliteConnection) -> Result<UserTable, FlowyError> {
let row = user_table::dsl::user_table
.filter(user_table::id.eq(&uid.to_string()))
@ -128,26 +157,17 @@ pub fn select_user_profile(
Ok(user)
}
pub fn select_workspace_auth_type(
pub fn select_user_auth_type(
uid: i64,
workspace_id: &str,
conn: &mut SqliteConnection,
) -> Result<AuthType, FlowyError> {
match select_user_workspace(workspace_id, conn) {
Ok(workspace) => Ok(AuthType::from(workspace.workspace_type)),
Err(err) => {
if err.is_record_not_found() {
let row = select_user_table_row(uid, conn)?;
warn!(
"user user auth type:{} as workspace auth type",
row.auth_type
);
Ok(AuthType::from(row.auth_type))
} else {
Err(err)
}
},
}
}
pub fn select_user_token(uid: i64, conn: &mut SqliteConnection) -> Result<String, FlowyError> {
let row = select_user_table_row(uid, conn)?;
Ok(row.token)
}
pub fn upsert_user(user: UserTable, mut conn: DBConnection) -> FlowyResult<()> {

View file

@ -149,6 +149,7 @@ impl From<UserWorkspaceTable> for UserWorkspace {
icon: value.icon,
member_count: value.member_count,
role: value.role.map(|v| v.into()),
workspace_type: AuthType::from(value.workspace_type),
}
}
}

View file

@ -156,14 +156,10 @@ pub struct RepeatedUserWorkspacePB {
pub items: Vec<UserWorkspacePB>,
}
impl From<(AuthType, Vec<UserWorkspace>)> for RepeatedUserWorkspacePB {
fn from(value: (AuthType, Vec<UserWorkspace>)) -> Self {
let (auth_type, workspaces) = value;
impl From<Vec<UserWorkspace>> for RepeatedUserWorkspacePB {
fn from(workspaces: Vec<UserWorkspace>) -> Self {
Self {
items: workspaces
.into_iter()
.map(|w| UserWorkspacePB::from((auth_type, w)))
.collect(),
items: workspaces.into_iter().map(UserWorkspacePB::from).collect(),
}
}
}
@ -193,16 +189,16 @@ pub struct UserWorkspacePB {
pub workspace_auth_type: AuthTypePB,
}
impl From<(AuthType, UserWorkspace)> for UserWorkspacePB {
fn from(value: (AuthType, UserWorkspace)) -> Self {
impl From<UserWorkspace> for UserWorkspacePB {
fn from(workspace: UserWorkspace) -> Self {
Self {
workspace_id: value.1.id,
name: value.1.name,
created_at_timestamp: value.1.created_at.timestamp(),
icon: value.1.icon,
member_count: value.1.member_count,
role: value.1.role.map(AFRolePB::from),
workspace_auth_type: AuthTypePB::from(value.0),
workspace_id: workspace.id,
name: workspace.name,
created_at_timestamp: workspace.created_at.timestamp(),
icon: workspace.icon,
member_count: workspace.member_count,
role: workspace.role.map(AFRolePB::from),
workspace_auth_type: AuthTypePB::from(workspace.workspace_type),
}
}
}

View file

@ -147,7 +147,7 @@ pub struct UpdateWorkspaceMemberPB {
}
// Workspace Role
#[derive(Debug, ProtoBuf_Enum, Clone, Default)]
#[derive(Debug, ProtoBuf_Enum, Clone, Default, Eq, PartialEq)]
pub enum AFRolePB {
Owner = 0,
Member = 1,
@ -200,7 +200,7 @@ pub struct OpenUserWorkspacePB {
pub workspace_id: String,
#[pb(index = 2)]
pub auth_type: AuthTypePB,
pub workspace_auth_type: AuthTypePB,
}
#[derive(ProtoBuf, Default, Clone, Validate)]
@ -242,7 +242,7 @@ pub struct CreateWorkspacePB {
pub auth_type: AuthTypePB,
}
#[derive(ProtoBuf_Enum, Default, Debug, Clone, Eq, PartialEq)]
#[derive(ProtoBuf_Enum, Copy, Default, Debug, Clone, Eq, PartialEq)]
#[repr(u8)]
pub enum AuthTypePB {
#[default]

View file

@ -439,10 +439,7 @@ pub async fn get_all_workspace_handler(
.get_all_user_workspaces(profile.uid, profile.auth_type)
.await?;
data_result_ok(RepeatedUserWorkspacePB::from((
profile.auth_type,
user_workspaces,
)))
data_result_ok(RepeatedUserWorkspacePB::from(user_workspaces))
}
#[tracing::instrument(level = "info", skip(data, manager), err)]
@ -454,7 +451,7 @@ pub async fn open_workspace_handler(
let params = data.try_into_inner()?;
let workspace_id = Uuid::from_str(&params.workspace_id)?;
manager
.open_workspace(&workspace_id, AuthType::from(params.auth_type))
.open_workspace(&workspace_id, AuthType::from(params.workspace_auth_type))
.await?;
Ok(())
}
@ -627,7 +624,7 @@ pub async fn create_workspace_handler(
let auth_type = AuthType::from(data.auth_type);
let manager = upgrade_manager(manager)?;
let new_workspace = manager.create_workspace(&data.name, auth_type).await?;
data_result_ok(UserWorkspacePB::from((auth_type, new_workspace)))
data_result_ok(UserWorkspacePB::from(new_workspace))
}
#[tracing::instrument(level = "debug", skip_all, err)]

View file

@ -291,6 +291,11 @@ pub trait UserStatusCallback: Send + Sync + 'static {
) -> FlowyResult<()> {
Ok(())
}
async fn did_launch(&self) -> FlowyResult<()> {
Ok(())
}
/// Fires right after the user successfully signs in.
async fn on_sign_in(
&self,

View file

@ -1,7 +1,7 @@
use diesel::SqliteConnection;
use semver::Version;
use std::sync::Arc;
use tracing::{info, instrument};
use tracing::instrument;
use collab_integrate::CollabKVDB;
use flowy_error::FlowyResult;
@ -9,7 +9,7 @@ use flowy_user_pub::entities::AuthType;
use crate::migrations::migration::UserDataMigration;
use flowy_user_pub::session::Session;
use flowy_user_pub::sql::{select_user_workspace, upsert_user_workspace};
use flowy_user_pub::sql::upsert_user_workspace;
pub struct AnonUserWorkspaceTableMigration;
@ -34,23 +34,15 @@ impl UserDataMigration for AnonUserWorkspaceTableMigration {
&self,
session: &Session,
_collab_db: &Arc<CollabKVDB>,
auth_type: &AuthType,
user_auth_type: &AuthType,
db: &mut SqliteConnection,
) -> FlowyResult<()> {
// For historical reason, anon user doesn't have a workspace in user_workspace_table.
// So we need to create a new entry for the anon user in the user_workspace_table.
if matches!(auth_type, AuthType::Local) {
let user_workspace = &session.user_workspace;
let result = select_user_workspace(&user_workspace.id, db);
if let Err(e) = result {
if e.is_record_not_found() {
info!(
"Anon user workspace not found in the database, creating a new entry for user_id: {}",
session.user_id
);
upsert_user_workspace(session.user_id, *auth_type, user_workspace.clone(), db)?;
}
}
if matches!(user_auth_type, AuthType::Local) {
let mut user_workspace = session.user_workspace.clone();
user_workspace.workspace_type = AuthType::Local;
upsert_user_workspace(session.user_id, *user_auth_type, user_workspace, db)?;
}
Ok(())

View file

@ -40,7 +40,7 @@ impl UserDataMigration for CollabDocKeyWithWorkspaceIdMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
_authenticator: &AuthType,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
trace!(

View file

@ -42,13 +42,13 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
authenticator: &AuthType,
user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
// - The `empty document` struct has already undergone refactoring prior to the launch of the AppFlowy cloud version.
// - Consequently, if a user is utilizing the AppFlowy cloud version, there is no need to perform any migration for the `empty document` struct.
// - This migration step is only necessary for users who are transitioning from a local version of AppFlowy to the cloud version.
if !matches!(authenticator, AuthType::Local) {
if !matches!(user_auth_type, AuthType::Local) {
return Ok(());
}
collab_db.with_write_txn(|write_txn| {

View file

@ -54,7 +54,7 @@ impl UserLocalDataMigration {
pub fn run(
self,
migrations: Vec<Box<dyn UserDataMigration>>,
auth_type: &AuthType,
user_auth_type: &AuthType,
app_version: &Version,
) -> FlowyResult<Vec<String>> {
let mut applied_migrations = vec![];
@ -75,7 +75,7 @@ impl UserLocalDataMigration {
let migration_name = migration.name().to_string();
if !duplicated_names.contains(&migration_name) {
migration.run(&self.session, &self.collab_db, auth_type, &mut conn)?;
migration.run(&self.session, &self.collab_db, user_auth_type, &mut conn)?;
applied_migrations.push(migration.name().to_string());
save_migration_record(&mut conn, &migration_name);
duplicated_names.push(migration_name);
@ -98,7 +98,7 @@ pub trait UserDataMigration {
&self,
user: &Session,
collab_db: &Arc<CollabKVDB>,
authenticator: &AuthType,
user_auth_type: &AuthType,
db: &mut SqliteConnection,
) -> FlowyResult<()>;
}

View file

@ -40,7 +40,7 @@ impl UserDataMigration for FavoriteV1AndWorkspaceArrayMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
_authenticator: &AuthType,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
collab_db.with_write_txn(|write_txn| {

View file

@ -38,7 +38,7 @@ impl UserDataMigration for WorkspaceTrashMapToSectionMigration {
&self,
session: &Session,
collab_db: &Arc<CollabKVDB>,
_authenticator: &AuthType,
_user_auth_type: &AuthType,
_db: &mut SqliteConnection,
) -> FlowyResult<()> {
collab_db.with_write_txn(|write_txn| {

View file

@ -10,7 +10,7 @@ use collab_plugins::local_storage::kv::KVTransactionDB;
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
use flowy_user_pub::entities::UserWorkspace;
use flowy_user_pub::entities::{AuthType, UserWorkspace};
use flowy_user_pub::session::Session;
use std::path::PathBuf;
use std::str::FromStr;
@ -48,14 +48,11 @@ impl AuthenticateUser {
}
pub async fn is_local_mode(&self) -> FlowyResult<bool> {
let uid = self.user_id()?;
if let Ok(anon_user) = self.get_anon_user().await {
if anon_user == uid {
return Ok(true);
}
}
Ok(false)
let session = self.get_session()?;
Ok(matches!(
session.user_workspace.workspace_type,
AuthType::Local
))
}
pub fn device_id(&self) -> FlowyResult<String> {
@ -150,28 +147,24 @@ impl AuthenticateUser {
match self
.store_preferences
.get_object::<Arc<Session>>(&self.user_config.session_cache_key)
.get_object::<Session>(&self.user_config.session_cache_key)
{
None => Err(FlowyError::new(
ErrorCode::RecordNotFound,
"User is not logged in",
"Can't find user session. Please login again",
)),
Some(session) => {
Some(mut session) => {
// Set the workspace type to local if the user is anon.
if let Some(anon_session) = self.store_preferences.get_object::<Session>(ANON_USER) {
if session.user_id == anon_session.user_id {
session.user_workspace.workspace_type = AuthType::Local;
}
}
let session = Arc::new(session);
self.session.store(Some(session.clone()));
Ok(session)
},
}
}
async fn get_anon_user(&self) -> FlowyResult<i64> {
let anon_session = self
.store_preferences
.get_object::<Session>(ANON_USER)
.ok_or(FlowyError::new(
ErrorCode::RecordNotFound,
"Anon user not found",
))?;
Ok(anon_session.user_id)
}
}

View file

@ -3,7 +3,7 @@ use crate::migrations::session_migration::migrate_session_with_user_uuid;
use crate::services::data_import::importer::load_collab_by_object_ids;
use crate::services::db::UserDBPath;
use crate::services::entities::UserPaths;
use crate::user_manager::run_collab_data_migration;
use crate::user_manager::run_data_migration;
use anyhow::anyhow;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
@ -36,7 +36,7 @@ use std::collections::{HashMap, HashSet};
use collab_document::blocks::TextDelta;
use collab_document::document::Document;
use flowy_user_pub::sql::{select_user_profile, select_workspace_auth_type};
use flowy_user_pub::sql::{select_user_auth_type, select_user_profile};
use semver::Version;
use serde_json::json;
use std::ops::{Deref, DerefMut};
@ -103,23 +103,17 @@ pub(crate) fn prepare_import(
);
let mut conn = imported_sqlite_db.get_connection()?;
let imported_workspace_auth_type = select_user_profile(
let imported_user_auth_type = select_user_profile(
imported_session.user_id,
&imported_session.user_workspace.id,
&mut conn,
)
.map(|v| v.workspace_auth_type)
.or_else(|_| {
select_workspace_auth_type(
imported_session.user_id,
&imported_session.user_workspace.id,
&mut conn,
)
})?;
.map(|v| v.auth_type)
.or_else(|_| select_user_auth_type(imported_session.user_id, &mut conn))?;
run_collab_data_migration(
run_data_migration(
&imported_session,
&imported_workspace_auth_type,
&imported_user_auth_type,
imported_collab_db.clone(),
imported_sqlite_db.get_pool(),
other_store_preferences.clone(),

View file

@ -7,7 +7,6 @@ use arc_swap::ArcSwapOption;
use collab::lock::RwLock;
use collab_user::core::UserAwareness;
use dashmap::DashMap;
use flowy_server_pub::AuthenticatorType;
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::schema::user_table;
use flowy_sqlite::ConnectionPool;
@ -131,30 +130,16 @@ impl UserManager {
let user = self
.get_user_profile_from_disk(session.user_id, &session.user_workspace.id)
.await?;
self.cloud_service.set_server_auth_type(&user.auth_type);
// Get the current authenticator from the environment variable
let env_auth_type = current_authenticator();
// If the current authenticator is different from the authenticator in the session and it's
// not a local authenticator, we need to sign out the user.
if user.auth_type != AuthType::Local && user.auth_type != env_auth_type {
event!(
tracing::Level::INFO,
"Auth type changed from {:?} to {:?}",
user.auth_type,
env_auth_type
);
self.sign_out().await?;
return Ok(());
}
let auth_type = user.workspace_auth_type;
let token = self.token_from_auth_type(&auth_type)?;
self.cloud_service.set_server_auth_type(&auth_type, token)?;
event!(
tracing::Level::INFO,
"init user session: {}:{}, auth type: {:?}",
user.uid,
user.email,
user.auth_type,
auth_type,
);
self.prepare_user(&session).await;
@ -251,7 +236,7 @@ impl UserManager {
self.authenticate_user.database.get_pool(session.user_id),
) {
(Ok(collab_db), Ok(sqlite_pool)) => {
run_collab_data_migration(
run_data_migration(
&session,
&user.auth_type,
collab_db,
@ -267,7 +252,7 @@ impl UserManager {
self.set_first_time_installed_version();
let cloud_config = get_cloud_config(session.user_id, &self.store_preferences);
// Init the user awareness. here we ignore the error
let _ = self.initial_user_awareness(&session, &user.auth_type).await;
let _ = self.initial_user_awareness(&session, &auth_type).await;
user_status_callback
.on_launch_if_authenticated(
@ -275,7 +260,7 @@ impl UserManager {
&cloud_config,
&session.user_workspace,
&self.authenticate_user.user_config.device_id,
&user.auth_type,
&auth_type,
)
.await?;
} else {
@ -342,7 +327,7 @@ impl UserManager {
params: SignInParams,
auth_type: AuthType,
) -> Result<UserProfile, FlowyError> {
self.cloud_service.set_server_auth_type(&auth_type);
self.cloud_service.set_server_auth_type(&auth_type, None)?;
let response: AuthResponse = self
.cloud_service
@ -357,7 +342,7 @@ impl UserManager {
self.save_auth_data(&response, auth_type, &session).await?;
let _ = self
.initial_user_awareness(&session, &user_profile.auth_type)
.initial_user_awareness(&session, &user_profile.workspace_auth_type)
.await;
self
.user_status_callback
@ -390,7 +375,7 @@ impl UserManager {
auth_type: AuthType,
params: BoxAny,
) -> Result<UserProfile, FlowyError> {
self.cloud_service.set_server_auth_type(&auth_type);
self.cloud_service.set_server_auth_type(&auth_type, None)?;
// sign out the current user if there is one
let migration_user = self.get_migration_user(&auth_type).await;
@ -556,7 +541,7 @@ impl UserManager {
workspace_id: &str,
) -> FlowyResult<()> {
// If the user is a local user, no need to refresh the user profile
if old_user_profile.auth_type.is_local() {
if old_user_profile.workspace_auth_type.is_local() {
return Ok(());
}
@ -616,6 +601,16 @@ impl UserManager {
self.authenticate_user.user_paths.user_data_dir(uid)
}
pub fn token_from_auth_type(&self, auth_type: &AuthType) -> FlowyResult<Option<String>> {
match auth_type {
AuthType::Local => Ok(None),
AuthType::AppFlowyCloud => {
let uid = self.user_id()?;
let mut conn = self.db_connection(uid)?;
Ok(select_user_token(uid, &mut conn).ok())
},
}
}
pub fn user_setting(&self) -> Result<UserSettingPB, FlowyError> {
let session = self.get_session()?;
let user_setting = UserSettingPB {
@ -655,7 +650,9 @@ impl UserManager {
authenticator: &AuthType,
email: &str,
) -> Result<String, FlowyError> {
self.cloud_service.set_server_auth_type(authenticator);
self
.cloud_service
.set_server_auth_type(authenticator, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let url = auth_service.generate_sign_in_url_with_email(email).await?;
@ -670,7 +667,7 @@ impl UserManager {
) -> Result<GotrueTokenResponse, FlowyError> {
self
.cloud_service
.set_server_auth_type(&AuthType::AppFlowyCloud);
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let response = auth_service.sign_in_with_password(email, password).await?;
Ok(response)
@ -684,7 +681,7 @@ impl UserManager {
) -> Result<(), FlowyError> {
self
.cloud_service
.set_server_auth_type(&AuthType::AppFlowyCloud);
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
auth_service
.sign_in_with_magic_link(email, redirect_to)
@ -700,7 +697,7 @@ impl UserManager {
) -> Result<GotrueTokenResponse, FlowyError> {
self
.cloud_service
.set_server_auth_type(&AuthType::AppFlowyCloud);
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let response = auth_service.sign_in_with_passcode(email, passcode).await?;
Ok(response)
@ -713,7 +710,7 @@ impl UserManager {
) -> Result<String, FlowyError> {
self
.cloud_service
.set_server_auth_type(&AuthType::AppFlowyCloud);
.set_server_auth_type(&AuthType::AppFlowyCloud, None)?;
let auth_service = self.cloud_service.get_user_service()?;
let url = auth_service
.generate_oauth_url_with_provider(oauth_provider)
@ -800,13 +797,6 @@ impl UserManager {
}
}
fn current_authenticator() -> AuthType {
match AuthenticatorType::from_env() {
AuthenticatorType::Local => AuthType::Local,
AuthenticatorType::AppFlowyCloud => AuthType::AppFlowyCloud,
}
}
pub fn upsert_user_profile_change(
uid: i64,
workspace_id: &str,
@ -867,9 +857,9 @@ fn mark_all_migrations_as_applied(sqlite_pool: &Arc<ConnectionPool>) {
}
}
pub(crate) fn run_collab_data_migration(
pub(crate) fn run_data_migration(
session: &Session,
auth_type: &AuthType,
user_auth_type: &AuthType,
collab_db: Arc<CollabKVDB>,
sqlite_pool: Arc<ConnectionPool>,
kv: Arc<KVStorePreferences>,
@ -878,7 +868,7 @@ pub(crate) fn run_collab_data_migration(
let migrations = collab_migration_list();
match UserLocalDataMigration::new(session.clone(), collab_db, sqlite_pool, kv).run(
migrations,
auth_type,
user_auth_type,
app_version,
) {
Ok(applied_migrations) => {

View file

@ -53,6 +53,18 @@ impl UserManager {
Ok(UserProfilePB::from(profile))
}
pub fn get_anon_user_id(&self) -> FlowyResult<i64> {
let anon_session = self
.store_preferences
.get_object::<Session>(ANON_USER)
.ok_or(FlowyError::new(
ErrorCode::RecordNotFound,
"Anon user not found",
))?;
Ok(anon_session.user_id)
}
/// Opens a historical user's session based on their user ID, device ID, and authentication type.
///
/// This function facilitates the re-opening of a user's session from historical tracking.

View file

@ -153,9 +153,10 @@ impl UserManager {
#[instrument(skip(self), err)]
pub async fn open_workspace(&self, workspace_id: &Uuid, auth_type: AuthType) -> FlowyResult<()> {
info!("open workspace: {}, auth_type:{}", workspace_id, auth_type);
info!("open workspace: {}, auth type:{}", workspace_id, auth_type);
let workspace_id_str = workspace_id.to_string();
self.cloud_service.set_server_auth_type(&auth_type);
let token = self.token_from_auth_type(&auth_type)?;
self.cloud_service.set_server_auth_type(&auth_type, token)?;
let uid = self.user_id()?;
let profile = self
@ -227,7 +228,8 @@ impl UserManager {
workspace_name: &str,
auth_type: AuthType,
) -> FlowyResult<UserWorkspace> {
self.cloud_service.set_server_auth_type(&auth_type);
let token = self.token_from_auth_type(&auth_type)?;
self.cloud_service.set_server_auth_type(&auth_type, token)?;
let new_workspace = self
.cloud_service
@ -451,7 +453,7 @@ impl UserManager {
);
// only send notification if there were real changes
if let Ok(updated_list) = select_all_user_workspace(uid, &mut conn) {
let repeated_pb = RepeatedUserWorkspacePB::from((auth_copy, updated_list));
let repeated_pb = RepeatedUserWorkspacePB::from(updated_list);
send_notification(&uid.to_string(), UserNotification::DidUpdateUserWorkspaces)
.payload(repeated_pb)
.send();