chore: load user id and uuid when start (#329)

* chore: load user id and uuid when start

* chore: fix test

* chore: fix test
This commit is contained in:
Nathan.fooo 2024-02-20 05:43:33 +08:00 committed by GitHub
parent d4f7aac93a
commit 5cd16d7544
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 142 additions and 58 deletions

View file

@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": " SELECT uid, uuid FROM af_user",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "uid",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "uuid",
"type_info": "Uuid"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
},
"hash": "1b1ff4352abb6dad982279ee99c8dccb3621b55a838998c1b9803982ae10f622"
}

View file

@ -8,7 +8,7 @@ use {
pub fn setup_log() {
static START: Once = Once::new();
START.call_once(|| {
let level = "trace";
let level = "info";
let mut filters = vec![];
filters.push(format!("client_api={}", level));
std::env::set_var("RUST_LOG", filters.join(","));

View file

@ -259,9 +259,11 @@ impl TestClient {
let duration = Duration::from_secs(secs);
while let Ok(Some(state)) = timeout(duration, sync_state.next()).await {
if state == SyncState::SyncFinished {
break;
return;
}
}
panic!("Timeout or SyncState stream ended before reaching SyncFinished");
}
#[allow(dead_code)]

View file

@ -19,6 +19,7 @@ use crate::{platform_spawn, retry_connect};
use realtime_entity::collab_msg::CollabMessage;
use realtime_entity::message::{RealtimeMessage, SystemMessage};
use realtime_entity::user::UserMessage;
use tokio::sync::{oneshot, Mutex};
use tracing::{debug, error, info, trace, warn};
use websocket::{CloseCode, CloseFrame, Message};
@ -68,7 +69,6 @@ pub struct WSClient {
rate_limiter:
Arc<tokio::sync::RwLock<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>>,
}
impl WSClient {
pub fn new<H>(config: WSClientConfig, http_sender: H) -> Self
where
@ -209,8 +209,9 @@ impl WSClient {
// ping from server
Message::Ping(_) => match sender.send(Message::Pong(vec![])) {
Ok(_) => {},
Err(e) => {
error!("failed to send pong message to websocket: {}", e);
Err(_e) => {
// if the sender returns an error, it means the receiver has been dropped
break;
},
},
Message::Close(close) => {
@ -261,6 +262,7 @@ impl WSClient {
}
}
}
info!("exit websocket send loop");
});
Ok(())

View file

@ -341,11 +341,11 @@ impl CollabStoragePgImpl {
COLLAB_SNAPSHOT_LIMIT,
)
.await?;
Ok(meta)
},
_ => Err(AppError::Internal(anyhow!(
"fail to acquire transaction to create snapshot",
"fail to acquire transaction to create snapshot for object:{}",
params.object_id,
))),
}
}

View file

@ -58,6 +58,12 @@ pub struct AFUserRow {
pub created_at: Option<DateTime<Utc>>,
}
#[derive(Debug, FromRow)]
pub struct AFUserIdRow {
pub uid: i64,
pub uuid: Uuid,
}
/// Represent the row of the af_user_profile_view
#[derive(Debug, FromRow, Deserialize, Serialize)]
pub struct AFUserProfileRow {

View file

@ -1,4 +1,6 @@
use crate::pg_row::AFUserIdRow;
use app_error::AppError;
use futures_util::stream::BoxStream;
use sqlx::postgres::PgArguments;
use sqlx::types::JsonValue;
use sqlx::{Arguments, Executor, PgPool, Postgres};
@ -143,6 +145,12 @@ pub async fn select_uid_from_uuid<'a, E: Executor<'a, Database = Postgres>>(
Ok(uid)
}
pub fn select_all_uid_uuid<'a, E: Executor<'a, Database = Postgres> + 'a>(
executor: E,
) -> BoxStream<'a, sqlx::Result<AFUserIdRow>> {
sqlx::query_as!(AFUserIdRow, r#" SELECT uid, uuid FROM af_user"#,).fetch(executor)
}
#[inline]
pub async fn select_uid_from_email<'a, E: Executor<'a, Database = Postgres>>(
executor: E,

View file

@ -197,7 +197,7 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
)
.await,
);
let users = UserCache::new(pg_pool.clone());
let users = UserCache::new(pg_pool.clone()).await;
info!("Application state initialized");
Ok(AppState {

View file

@ -79,10 +79,6 @@ impl AccessControl {
WorkspaceAccessControlImpl::new(self.clone())
}
pub async fn contains(&self, obj: &ObjectType<'_>) -> bool {
self.enforcer.contains(obj).await
}
pub async fn update(
&self,
uid: &i64,

View file

@ -31,14 +31,6 @@ impl AFEnforcer {
action_cache,
}
}
pub async fn contains(&self, obj: &ObjectType<'_>) -> bool {
self
.enforcer
.read()
.await
.get_all_objects()
.contains(&obj.to_object_id())
}
pub async fn policies_for_user_with_given_object(
&self,

View file

@ -8,14 +8,16 @@ use app_error::AppError;
use crate::api::metrics::RequestMetrics;
use crate::biz::casbin::access_control::AccessControl;
use crate::biz::casbin::metrics::AccessControlMetrics;
use dashmap::DashMap;
use database::file::bucket_s3_impl::S3BucketStorage;
use database::user::select_uid_from_uuid;
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
use realtime::collaborate::RealtimeMetrics;
use snowflake::Snowflake;
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use uuid::Uuid;
pub type RedisClient = redis::aio::ConnectionManager;
@ -55,31 +57,38 @@ pub const EXPIRED_DURATION_DAYS: i64 = 30;
pub struct UserCache {
pool: PgPool,
users: RwLock<HashMap<Uuid, AuthenticateUser>>,
users: DashMap<Uuid, AuthenticateUser>,
}
impl UserCache {
pub fn new(pool: PgPool) -> Self {
Self {
pool,
users: RwLock::new(HashMap::new()),
}
/// Load all users from database when initializing the cache.
pub async fn new(pool: PgPool) -> Self {
let users = {
let users = DashMap::new();
let mut stream = select_all_uid_uuid(&pool);
while let Some(Ok(af_user_id)) = stream.next().await {
users.insert(
af_user_id.uuid,
AuthenticateUser {
uid: af_user_id.uid,
},
);
}
users
};
Self { pool, users }
}
/// Get the user's uid from the cache or the database.
pub async fn get_user_uid(&self, uuid: &Uuid) -> Result<i64, AppError> {
// Attempt to acquire a read lock and check if the user exists to minimize lock contention.
{
let users_read = self.users.read().await;
if let Some(user) = users_read.get(uuid) {
return Ok(user.uid);
}
if let Some(entry) = self.users.get(uuid) {
return Ok(entry.value().uid);
}
// If the user is not found in the cache, query the database.
let uid = select_uid_from_uuid(&self.pool, uuid).await?;
let mut users_write = self.users.write().await;
users_write.insert(*uuid, AuthenticateUser { uid });
self.users.insert(*uuid, AuthenticateUser { uid });
Ok(uid)
}
}

View file

@ -8,6 +8,8 @@ use database_entity::dto::{AFAccessLevel, AFRole};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
#[tokio::test]
@ -389,6 +391,9 @@ async fn multiple_user_with_read_and_write_permission_edit_same_collab_test() {
let workspace_id = workspace_id.clone();
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user().await;
// sleep 2 secs to make sure it do not trigger register user too fast in gotrue
sleep(Duration::from_secs(i % 3)).await;
owner
.add_workspace_member(&workspace_id, &new_user, AFRole::Member)
.await;
@ -472,6 +477,8 @@ async fn multiple_user_with_read_only_permission_edit_same_collab_test() {
let workspace_id = workspace_id.clone();
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user().await;
// sleep 2 secs to make sure it do not trigger register user too fast in gotrue
sleep(Duration::from_secs(i % 2)).await;
owner
.add_client_as_collab_member(
&workspace_id,
@ -493,7 +500,10 @@ async fn multiple_user_with_read_only_permission_edit_same_collab_test() {
.collab
.lock()
.insert(&i.to_string(), random_str.clone());
new_user.wait_object_sync_complete(&object_id).await;
// wait 3 seconds to let the client try to send the update to the server
// can't use want_object_sync_complete because the client do not have permission to send the update
sleep(Duration::from_secs(3)).await;
(random_str, new_user)
});
tasks.push(task);

View file

@ -2,6 +2,8 @@ use client_api_test_util::*;
use collab_entity::CollabType;
use database_entity::dto::AFRole;
use serde_json::json;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn edit_workspace_without_permission() {
@ -75,7 +77,7 @@ async fn edit_workspace_with_guest_permission() {
let workspace_id = client_1.workspace_id().await;
client_1.open_workspace_collab(&workspace_id).await;
// add client 2 as the member of the workspace then the client 2 will receive the update.
// add client 2 as the member of the workspace then the client 2 can receive the update.
client_1
.add_workspace_member(&workspace_id, &client_2, AFRole::Guest)
.await;
@ -91,9 +93,7 @@ async fn edit_workspace_with_guest_permission() {
client_2.open_workspace_collab(&workspace_id).await;
// make sure the client 2 has received the remote updates before the client 2 edits the collab
client_2
.wait_object_sync_complete_with_secs(&workspace_id, 10)
.await;
sleep(Duration::from_secs(3)).await;
// client_2 only has the guest permission, so it can not edit the collab
client_2
@ -103,9 +103,7 @@ async fn edit_workspace_with_guest_permission() {
.collab
.lock()
.insert("name", "nathan");
client_2
.wait_object_sync_complete_with_secs(&workspace_id, 5)
.await;
assert_client_collab_include_value(&mut client_1, &workspace_id, json!({"name": "zack"})).await;
assert_client_collab_include_value(&mut client_2, &workspace_id, json!({"name": "nathan"})).await;

View file

@ -5,6 +5,9 @@ use collab_entity::CollabType;
use database_entity::dto::AFAccessLevel;
use serde_json::json;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
#[tokio::test]
@ -407,20 +410,6 @@ async fn multiple_collab_edit_test() {
.await;
}
// #[tokio::test]
// async fn open_100_collab_test() {
// let mut clients = vec![];
// for _ in 0..40 {
// let mut test_client = TestClient::new_user().await;
// let workspace_id = test_client.workspace_id().await;
// let object_id = test_client
// .create_and_edit_collab(&workspace_id, CollabType::Document)
// .await;
// clients.push(test_client);
// }
//
// tokio::time::sleep(Duration::from_secs(50)).await;
// }
#[tokio::test]
async fn simulate_multiple_user_edit_collab_test() {
let mut tasks = Vec::new();
@ -467,3 +456,49 @@ async fn simulate_multiple_user_edit_collab_test() {
assert_json_eq!(expected_json, json);
}
}
#[tokio::test]
async fn post_realtime_message_test() {
let mut tasks = Vec::new();
let big_text = generate_random_string(1024 * 1024 * 3);
for i in 0..20 {
let cloned_text = big_text.clone();
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user().await;
// sleep 2 secs to make sure it do not trigger register user too fast in gotrue
sleep(Duration::from_secs(i % 3)).await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = new_user.workspace_id().await;
let doc_state = make_big_collab_doc_state(&object_id, "text", cloned_text);
// the big doc_state will force the init_sync using the http request.
// It will trigger the POST_REALTIME_MESSAGE_STREAM_HANDLER to handle the request.
new_user
.open_collab_with_doc_state(&workspace_id, &object_id, CollabType::Document, doc_state)
.await;
new_user.wait_object_sync_complete(&object_id).await;
(new_user, object_id, workspace_id)
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
for result in results.into_iter() {
let (mut client, object_id, workspace_id) = result.unwrap();
assert_server_collab(
&workspace_id,
&mut client.api_client,
&object_id,
&CollabType::Document,
10,
json!({
"text": big_text
}),
)
.await;
drop(client);
}
}