mod migration; pub mod version_1; mod version_2; use flowy_collaboration::{ client_folder::FolderPad, entities::revision::{Revision, RevisionState}, }; use std::sync::Arc; use tokio::sync::RwLock; pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*}; use crate::{ controller::FolderId, event_map::WorkspaceDatabase, services::{folder_editor::FolderEditor, persistence::migration::FolderMigration}, }; use flowy_error::{FlowyError, FlowyResult}; use flowy_folder_data_model::entities::{ app::App, trash::{RepeatedTrash, Trash}, view::View, workspace::Workspace, }; use flowy_sync::{mk_revision_disk_cache, RevisionRecord}; use lib_sqlite::ConnectionPool; pub trait FolderPersistenceTransaction { fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()>; fn read_workspaces(&self, user_id: &str, workspace_id: Option) -> FlowyResult>; fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()>; fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()>; fn create_app(&self, app: App) -> FlowyResult<()>; fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()>; fn read_app(&self, app_id: &str) -> FlowyResult; fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult>; fn delete_app(&self, app_id: &str) -> FlowyResult; fn create_view(&self, view: View) -> FlowyResult<()>; fn read_view(&self, view_id: &str) -> FlowyResult; fn read_views(&self, belong_to_id: &str) -> FlowyResult>; fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()>; fn delete_view(&self, view_id: &str) -> FlowyResult<()>; fn create_trash(&self, trashes: Vec) -> FlowyResult<()>; fn read_trash(&self, trash_id: Option) -> FlowyResult; fn delete_trash(&self, trash_ids: Option>) -> FlowyResult<()>; } pub struct FolderPersistence { database: Arc, folder_editor: Arc>>>, } impl FolderPersistence { pub fn new(database: Arc, folder_editor: Arc>>>) -> Self { Self { database, folder_editor, } } #[deprecated( since = "0.0.3", note = "please use `begin_transaction` instead, this interface will be removed in the future" )] #[allow(dead_code)] pub fn begin_transaction_v_1(&self, f: F) -> FlowyResult where F: for<'a> FnOnce(Box) -> FlowyResult, { //[[immediate_transaction]] // https://sqlite.org/lang_transaction.html // IMMEDIATE cause the database connection to start a new write immediately, // without waiting for a write statement. The BEGIN IMMEDIATE might fail // with SQLITE_BUSY if another write transaction is already active on another // database connection. // // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in // other journaling modes, EXCLUSIVE prevents other database connections from // reading the database while the transaction is underway. let conn = self.database.db_connection()?; conn.immediate_transaction::<_, FlowyError, _>(|| f(Box::new(V1Transaction(&conn)))) } pub async fn begin_transaction(&self, f: F) -> FlowyResult where F: FnOnce(Arc) -> FlowyResult, { match self.folder_editor.read().await.clone() { None => Err(FlowyError::internal().context("FolderEditor should be initialized after user login in.")), Some(editor) => f(editor), } } pub fn db_pool(&self) -> FlowyResult> { self.database.db_pool() } pub async fn initialize(&self, user_id: &str, folder_id: &FolderId) -> FlowyResult<()> { let migrations = FolderMigration::new(user_id, self.database.clone()); if let Some(migrated_folder) = migrations.run_v1_migration()? { tracing::trace!("Save migration folder"); self.save_folder(user_id, folder_id, migrated_folder).await?; } Ok(()) } pub async fn save_folder(&self, user_id: &str, folder_id: &FolderId, folder: FolderPad) -> FlowyResult<()> { let pool = self.database.db_pool()?; let delta_data = folder.delta().to_bytes(); let md5 = folder.md5(); let revision = Revision::new(folder_id.as_ref(), 0, 0, delta_data, user_id, md5); let record = RevisionRecord { revision, state: RevisionState::Sync, write_to_disk: true, }; let conn = pool.get()?; let disk_cache = mk_revision_disk_cache(user_id, pool); disk_cache.create_revision_records(vec![record], &conn) } }