refactor: only notify when user workspaces were changed

This commit is contained in:
Nathan 2025-04-21 11:41:58 +08:00
parent c7bf8bb1ba
commit 1356382524
54 changed files with 497 additions and 494 deletions

View file

@ -2,8 +2,10 @@ use crate::entities::{AuthType, UserWorkspace};
use chrono::{TimeZone, Utc};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::schema::user_workspace_table::dsl;
use flowy_sqlite::DBConnection;
use flowy_sqlite::{prelude::*, ExpressionMethods, RunQueryDsl, SqliteConnection};
use std::collections::{HashMap, HashSet};
use tracing::{info, warn};
#[derive(Clone, Default, Queryable, Identifiable, Insertable)]
@ -26,11 +28,43 @@ pub struct UserWorkspaceChangeset {
pub id: String,
pub name: Option<String>,
pub icon: Option<String>,
pub role: Option<i32>,
pub member_count: Option<i64>,
}
impl UserWorkspaceChangeset {
pub fn has_changes(&self) -> bool {
self.name.is_some() || self.icon.is_some() || self.role.is_some() || self.member_count.is_some()
}
pub fn from_version(old: &UserWorkspace, new: &UserWorkspace) -> Self {
let mut changeset = Self {
id: new.id.clone(),
name: None,
icon: None,
role: None,
member_count: None,
};
if old.name != new.name {
changeset.name = Some(new.name.clone());
}
if old.icon != new.icon {
changeset.icon = Some(new.icon.clone());
}
if old.role != new.role {
changeset.role = new.role.map(|v| v as i32);
}
if old.member_count != new.member_count {
changeset.member_count = Some(new.member_count);
}
changeset
}
}
impl UserWorkspaceTable {
pub fn from_workspace(
uid: i64,
uid_val: i64,
workspace: &UserWorkspace,
auth_type: AuthType,
) -> Result<Self, FlowyError> {
@ -44,12 +78,12 @@ impl UserWorkspaceTable {
Ok(Self {
id: workspace.id.clone(),
name: workspace.name.clone(),
uid,
uid: uid_val,
created_at: workspace.created_at.timestamp(),
database_storage_id: workspace.workspace_database_id.clone(),
icon: workspace.icon.clone(),
member_count: workspace.member_count,
role: workspace.role.clone().map(|v| v as i32),
role: workspace.role.map(|v| v as i32),
workspace_type: auth_type as i32,
})
}
@ -59,19 +93,20 @@ pub fn select_user_workspace(
workspace_id: &str,
conn: &mut SqliteConnection,
) -> FlowyResult<UserWorkspaceTable> {
let row = user_workspace_table::dsl::user_workspace_table
let row = dsl::user_workspace_table
.filter(user_workspace_table::id.eq(workspace_id))
.first::<UserWorkspaceTable>(conn)?;
Ok(row)
}
pub fn select_all_user_workspace(
user_id: i64,
mut conn: DBConnection,
uid: i64,
conn: &mut SqliteConnection,
) -> Result<Vec<UserWorkspace>, FlowyError> {
let rows = user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(user_id))
.load::<UserWorkspaceTable>(&mut *conn)?;
.filter(user_workspace_table::uid.eq(uid))
.order(user_workspace_table::created_at.desc())
.load::<UserWorkspaceTable>(conn)?;
Ok(rows.into_iter().map(UserWorkspace::from).collect())
}
@ -87,32 +122,6 @@ pub fn update_user_workspace(
Ok(())
}
pub fn upsert_user_workspace(
uid: i64,
auth_type: AuthType,
user_workspace: UserWorkspace,
conn: &mut SqliteConnection,
) -> Result<(), FlowyError> {
let row = UserWorkspaceTable::from_workspace(uid, &user_workspace, auth_type)?;
diesel::insert_into(user_workspace_table::table)
.values(row.clone())
.on_conflict(user_workspace_table::id)
.do_update()
.set((
user_workspace_table::name.eq(row.name),
user_workspace_table::uid.eq(row.uid),
user_workspace_table::created_at.eq(row.created_at),
user_workspace_table::database_storage_id.eq(row.database_storage_id),
user_workspace_table::icon.eq(row.icon),
user_workspace_table::member_count.eq(row.member_count),
user_workspace_table::role.eq(row.role),
user_workspace_table::workspace_type.eq(row.workspace_type),
))
.execute(conn)?;
Ok(())
}
pub fn delete_user_workspace(mut conn: DBConnection, workspace_id: &str) -> FlowyResult<()> {
let n = conn.immediate_transaction(|conn| {
let rows_affected: usize =
@ -151,7 +160,7 @@ pub fn delete_user_all_workspace(
conn: &mut SqliteConnection,
) -> FlowyResult<()> {
let n = diesel::delete(
user_workspace_table::dsl::user_workspace_table
dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(uid))
.filter(user_workspace_table::workspace_type.eq(auth_type as i32)),
)
@ -163,24 +172,93 @@ pub fn delete_user_all_workspace(
Ok(())
}
/// Delete all user workspaces for the given user and auth type, then insert the provided user workspaces.
pub fn delete_all_then_insert_user_workspaces(
uid: i64,
mut conn: DBConnection,
#[derive(Debug)]
pub enum WorkspaceChange {
Inserted(String),
Updated(String),
}
pub fn upsert_user_workspace(
uid_val: i64,
auth_type: AuthType,
user_workspace: UserWorkspace,
conn: &mut SqliteConnection,
) -> Result<usize, FlowyError> {
let row = UserWorkspaceTable::from_workspace(uid_val, &user_workspace, auth_type)?;
let n = insert_into(user_workspace_table::table)
.values(row.clone())
.on_conflict(user_workspace_table::id)
.do_update()
.set((
user_workspace_table::name.eq(row.name),
user_workspace_table::uid.eq(row.uid),
user_workspace_table::created_at.eq(row.created_at),
user_workspace_table::database_storage_id.eq(row.database_storage_id),
user_workspace_table::icon.eq(row.icon),
user_workspace_table::member_count.eq(row.member_count),
user_workspace_table::role.eq(row.role),
))
.execute(conn)?;
Ok(n)
}
pub fn sync_user_workspaces_with_diff(
uid_val: i64,
auth_type: AuthType,
user_workspaces: &[UserWorkspace],
) -> FlowyResult<()> {
conn.immediate_transaction(|conn| {
delete_user_all_workspace(uid, auth_type, conn)?;
info!(
"Insert {} workspaces for user {} and auth type {:?}",
user_workspaces.len(),
uid,
auth_type
);
for user_workspace in user_workspaces {
upsert_user_workspace(uid, auth_type, user_workspace.clone(), conn)?;
conn: &mut SqliteConnection,
) -> FlowyResult<Vec<WorkspaceChange>> {
let diff = conn.immediate_transaction(|conn| {
// 1) Load all existing workspaces into a map
let existing_rows: Vec<UserWorkspaceTable> = dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(uid_val))
.filter(user_workspace_table::workspace_type.eq(auth_type as i32))
.load(conn)?;
let mut existing_map: HashMap<String, UserWorkspaceTable> = existing_rows
.into_iter()
.map(|r| (r.id.clone(), r))
.collect();
// 2) Build incoming ID set and delete any stale ones
let incoming_ids: HashSet<String> = user_workspaces.iter().map(|uw| uw.id.clone()).collect();
let to_delete: Vec<String> = existing_map
.keys()
.filter(|id| !incoming_ids.contains(*id))
.cloned()
.collect();
if !to_delete.is_empty() {
diesel::delete(dsl::user_workspace_table.filter(user_workspace_table::id.eq_any(&to_delete)))
.execute(conn)?;
}
Ok::<(), FlowyError>(())
})
// 3) For each incoming workspace, either INSERT or UPDATE if changed
let mut diffs = Vec::new();
for uw in user_workspaces {
match existing_map.remove(&uw.id) {
None => {
// new workspace → insert
let new_row = UserWorkspaceTable::from_workspace(uid_val, uw, auth_type)?;
diesel::insert_into(user_workspace_table::table)
.values(new_row)
.execute(conn)?;
diffs.push(WorkspaceChange::Inserted(uw.id.clone()));
},
Some(old) => {
let changes = UserWorkspaceChangeset::from_version(&UserWorkspace::from(old), uw);
if changes.has_changes() {
diesel::update(dsl::user_workspace_table.find(&uw.id))
.set(&changes)
.execute(conn)?;
diffs.push(WorkspaceChange::Updated(uw.id.clone()));
}
},
}
}
Ok::<_, FlowyError>(diffs)
})?;
Ok(diff)
}