chore: passed through first circle of hell

This commit is contained in:
Bartosz Sypytkowski 2025-03-20 07:23:50 +01:00
parent 4c9dd9eaf1
commit a5d6cc0a59
47 changed files with 707 additions and 664 deletions

1
Cargo.lock generated
View file

@ -2088,6 +2088,7 @@ dependencies = [
"thiserror 1.0.63",
"tokio",
"tracing",
"uuid",
"yrs",
]

View file

@ -1,13 +1,13 @@
use app_error::AppError;
use async_trait::async_trait;
use database_entity::dto::AFAccessLevel;
use tracing::instrument;
use crate::{
act::Action,
collab::{CollabAccessControl, RealtimeAccessControl},
entity::ObjectType,
};
use app_error::AppError;
use async_trait::async_trait;
use database_entity::dto::AFAccessLevel;
use tracing::instrument;
use uuid::Uuid;
use super::access::AccessControl;
@ -26,9 +26,9 @@ impl CollabAccessControlImpl {
impl CollabAccessControl for CollabAccessControlImpl {
async fn enforce_action(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
_oid: &str,
_oid: &Uuid,
action: Action,
) -> Result<(), AppError> {
// TODO: allow non workspace member to read a collab.
@ -57,9 +57,9 @@ impl CollabAccessControl for CollabAccessControlImpl {
async fn enforce_access_level(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
_oid: &str,
_oid: &Uuid,
access_level: AFAccessLevel,
) -> Result<(), AppError> {
// TODO: allow non workspace member to read a collab.
@ -91,7 +91,7 @@ impl CollabAccessControl for CollabAccessControlImpl {
async fn update_access_level_policy(
&self,
_uid: &i64,
_oid: &str,
_oid: &Uuid,
_level: AFAccessLevel,
) -> Result<(), AppError> {
// TODO: allow non workspace member to read a collab.
@ -99,7 +99,7 @@ impl CollabAccessControl for CollabAccessControlImpl {
}
#[instrument(level = "info", skip_all)]
async fn remove_access_level(&self, _uid: &i64, _oid: &str) -> Result<(), AppError> {
async fn remove_access_level(&self, _uid: &i64, _oid: &Uuid) -> Result<(), AppError> {
// TODO: allow non workspace member to read a collab.
Ok(())
}
@ -117,9 +117,9 @@ impl RealtimeCollabAccessControlImpl {
async fn can_perform_action(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
_oid: &str,
_oid: &Uuid,
required_action: Action,
) -> Result<bool, AppError> {
// TODO: allow non workspace member to read a collab.
@ -146,9 +146,9 @@ impl RealtimeCollabAccessControlImpl {
impl RealtimeAccessControl for RealtimeCollabAccessControlImpl {
async fn can_write_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<bool, AppError> {
self
.can_perform_action(workspace_id, uid, oid, Action::Write)
@ -157,9 +157,9 @@ impl RealtimeAccessControl for RealtimeCollabAccessControlImpl {
async fn can_read_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<bool, AppError> {
self
.can_perform_action(workspace_id, uid, oid, Action::Read)

View file

@ -25,7 +25,7 @@ impl WorkspaceAccessControl for WorkspaceAccessControlImpl {
async fn enforce_role(
&self,
uid: &i64,
workspace_id: Uuid,
workspace_id: &Uuid,
role: AFRole,
) -> Result<(), AppError> {
let result = self
@ -42,7 +42,7 @@ impl WorkspaceAccessControl for WorkspaceAccessControlImpl {
async fn enforce_action(
&self,
uid: &i64,
workspace_id: Uuid,
workspace_id: &Uuid,
action: Action,
) -> Result<(), AppError> {
let result = self
@ -137,21 +137,21 @@ mod tests {
let workspace_access_control = super::WorkspaceAccessControlImpl::new(access_control);
for uid in [member_uid, owner_uid] {
workspace_access_control
.enforce_role(&uid, workspace_id, AFRole::Member)
.enforce_role(&uid, &workspace_id, AFRole::Member)
.await
.unwrap_or_else(|_| panic!("Failed to enforce role for {}", uid));
workspace_access_control
.enforce_action(&uid, workspace_id, crate::act::Action::Read)
.enforce_action(&uid, &workspace_id, crate::act::Action::Read)
.await
.unwrap_or_else(|_| panic!("Failed to enforce action for {}", uid));
}
let result = workspace_access_control
.enforce_action(&member_uid, workspace_id, crate::act::Action::Delete)
.enforce_action(&member_uid, &workspace_id, crate::act::Action::Delete)
.await;
let error_code = result.unwrap_err().code();
assert_eq!(error_code, ErrorCode::NotEnoughPermissions);
workspace_access_control
.enforce_action(&owner_uid, workspace_id, crate::act::Action::Delete)
.enforce_action(&owner_uid, &workspace_id, crate::act::Action::Delete)
.await
.unwrap();
}

View file

@ -2,6 +2,7 @@ use crate::act::Action;
use app_error::AppError;
use async_trait::async_trait;
use database_entity::dto::AFAccessLevel;
use uuid::Uuid;
#[async_trait]
pub trait CollabAccessControl: Sync + Send + 'static {
@ -9,9 +10,9 @@ pub trait CollabAccessControl: Sync + Send + 'static {
/// Returns AppError::NotEnoughPermission if the user does not have the permission.
async fn enforce_action(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
action: Action,
) -> Result<(), AppError>;
@ -19,9 +20,9 @@ pub trait CollabAccessControl: Sync + Send + 'static {
/// Returns AppError::NotEnoughPermission if the user does not have the access level.
async fn enforce_access_level(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
access_level: AFAccessLevel,
) -> Result<(), AppError>;
@ -29,11 +30,11 @@ pub trait CollabAccessControl: Sync + Send + 'static {
async fn update_access_level_policy(
&self,
uid: &i64,
oid: &str,
oid: &Uuid,
level: AFAccessLevel,
) -> Result<(), AppError>;
async fn remove_access_level(&self, uid: &i64, oid: &str) -> Result<(), AppError>;
async fn remove_access_level(&self, uid: &i64, oid: &Uuid) -> Result<(), AppError>;
}
#[async_trait]
@ -47,9 +48,9 @@ pub trait RealtimeAccessControl: Sync + Send + 'static {
/// 3. If the collab object is not found which means the collab object is created by the user.
async fn can_write_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<bool, AppError>;
/// Return true if the user is allowed to observe the changes of given collab.
@ -58,8 +59,8 @@ pub trait RealtimeAccessControl: Sync + Send + 'static {
/// The user can recv the message if the user is the member of the collab object
async fn can_read_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<bool, AppError>;
}

View file

@ -1,6 +1,7 @@
use app_error::AppError;
use async_trait::async_trait;
use database_entity::dto::AFAccessLevel;
use uuid::Uuid;
use crate::{
act::Action,
@ -26,9 +27,9 @@ impl Default for CollabAccessControlImpl {
impl CollabAccessControl for CollabAccessControlImpl {
async fn enforce_action(
&self,
_workspace_id: &str,
_workspace_id: &Uuid,
_uid: &i64,
_oid: &str,
_oid: &Uuid,
_action: Action,
) -> Result<(), AppError> {
Ok(())
@ -36,9 +37,9 @@ impl CollabAccessControl for CollabAccessControlImpl {
async fn enforce_access_level(
&self,
_workspace_id: &str,
_workspace_id: &Uuid,
_uid: &i64,
_oid: &str,
_oid: &Uuid,
_access_level: AFAccessLevel,
) -> Result<(), AppError> {
Ok(())
@ -47,13 +48,13 @@ impl CollabAccessControl for CollabAccessControlImpl {
async fn update_access_level_policy(
&self,
_uid: &i64,
_oid: &str,
_oid: &Uuid,
_level: AFAccessLevel,
) -> Result<(), AppError> {
Ok(())
}
async fn remove_access_level(&self, _uid: &i64, _oid: &str) -> Result<(), AppError> {
async fn remove_access_level(&self, _uid: &i64, _oid: &Uuid) -> Result<(), AppError> {
Ok(())
}
}
@ -77,18 +78,18 @@ impl Default for RealtimeCollabAccessControlImpl {
impl RealtimeAccessControl for RealtimeCollabAccessControlImpl {
async fn can_write_collab(
&self,
_workspace_id: &str,
_workspace_id: &Uuid,
_uid: &i64,
_oid: &str,
_oid: &Uuid,
) -> Result<bool, AppError> {
Ok(true)
}
async fn can_read_collab(
&self,
_workspace_id: &str,
_workspace_id: &Uuid,
_uid: &i64,
_oid: &str,
_oid: &Uuid,
) -> Result<bool, AppError> {
Ok(true)
}

View file

@ -26,7 +26,7 @@ impl WorkspaceAccessControl for WorkspaceAccessControlImpl {
async fn enforce_role(
&self,
_uid: &i64,
_workspace_id: Uuid,
_workspace_id: &Uuid,
_role: AFRole,
) -> Result<(), AppError> {
Ok(())
@ -35,7 +35,7 @@ impl WorkspaceAccessControl for WorkspaceAccessControlImpl {
async fn enforce_action(
&self,
_uid: &i64,
_workspace_id: Uuid,
_workspace_id: &Uuid,
_action: Action,
) -> Result<(), AppError> {
Ok(())

View file

@ -8,15 +8,19 @@ use sqlx::types::Uuid;
pub trait WorkspaceAccessControl: Send + Sync + 'static {
/// Check if the user has the role in the workspace.
/// Returns AppError::NotEnoughPermission if the user does not have the role.
async fn enforce_role(&self, uid: &i64, workspace_id: Uuid, role: AFRole)
-> Result<(), AppError>;
async fn enforce_role(
&self,
uid: &i64,
workspace_id: &Uuid,
role: AFRole,
) -> Result<(), AppError>;
/// Check if the user can perform action on the workspace.
/// Returns AppError::NotEnoughPermission if the user does not have the role.
async fn enforce_action(
&self,
uid: &i64,
workspace_id: Uuid,
workspace_id: &Uuid,
action: Action,
) -> Result<(), AppError>;

View file

@ -1,6 +1,6 @@
use uuid::Uuid;
pub fn user_awareness_object_id(user_uuid: &Uuid, workspace_id: &str) -> Uuid {
pub fn user_awareness_object_id(user_uuid: &Uuid, workspace_id: &Uuid) -> Uuid {
Uuid::new_v5(
user_uuid,
format!("user_awareness:{}", workspace_id).as_bytes(),

View file

@ -9,8 +9,10 @@ use collab_database::workspace_database::{
use collab_entity::CollabType;
use database_entity::dto::QueryCollabResult::{Failed, Success};
use database_entity::dto::{QueryCollab, QueryCollabParams};
use futures::TryStreamExt;
use std::sync::Arc;
use tracing::error;
use uuid::{Error, Uuid};
pub struct TestDatabaseCollabService {
pub api_client: client_api::Client,
@ -28,9 +30,9 @@ impl DatabaseCollabService for TestDatabaseCollabService {
let encoded_collab = match encoded_collab {
None => {
let params = QueryCollabParams {
workspace_id: self.workspace_id.clone(),
workspace_id: self.workspace_id.parse()?,
inner: QueryCollab {
object_id: object_id.to_string(),
object_id: object_id.parse()?,
collab_type: object_type,
},
};
@ -62,7 +64,10 @@ impl DatabaseCollabService for TestDatabaseCollabService {
) -> Result<EncodeCollabByOid, DatabaseError> {
let params = object_ids
.into_iter()
.map(|object_id| QueryCollab::new(object_id, collab_type))
.flat_map(|object_id| match Uuid::parse_str(&object_id) {
Ok(object_id) => Ok(QueryCollab::new(object_id, collab_type.clone())),
Err(err) => Err(err),
})
.collect();
let results = self
.api_client

View file

@ -148,7 +148,7 @@ impl TestClient {
view_name: &str,
view_layout: ViewLayout,
) {
let mut folder = self.get_folder(workspace_id).await;
let mut folder = self.get_folder(workspace_id.parse().unwrap()).await;
let general_space_id = folder
.get_view(workspace_id)
.unwrap()
@ -185,14 +185,14 @@ impl TestClient {
.unwrap();
}
pub async fn get_folder(&self, workspace_id: &str) -> Folder {
pub async fn get_folder(&self, workspace_id: Uuid) -> Folder {
let uid = self.uid().await;
let folder_collab = self
.api_client
.get_collab(QueryCollabParams::new(
workspace_id.to_string(),
workspace_id,
CollabType::Folder,
workspace_id.to_string(),
workspace_id,
))
.await
.unwrap()
@ -201,7 +201,7 @@ impl TestClient {
uid,
CollabOrigin::Client(CollabClient::new(uid, self.device_id.clone())),
folder_collab.into(),
workspace_id,
&workspace_id.to_string(),
vec![],
)
.unwrap()
@ -216,39 +216,34 @@ impl TestClient {
Database::open(database_id, context).await.unwrap()
}
pub async fn get_document(&self, workspace_id: &str, document_id: &str) -> Document {
pub async fn get_document(&self, workspace_id: Uuid, document_id: Uuid) -> Document {
let collab = self
.get_collab_to_collab(
workspace_id.to_string(),
document_id.to_string(),
CollabType::Document,
)
.get_collab_to_collab(workspace_id, document_id, CollabType::Document)
.await
.unwrap();
Document::open(collab).unwrap()
}
pub async fn get_workspace_database(&self, workspace_id: &str) -> WorkspaceDatabase {
pub async fn get_workspace_database(&self, workspace_id: Uuid) -> WorkspaceDatabase {
let workspaces = self.api_client.get_workspaces().await.unwrap();
let workspace_database_id = workspaces
.iter()
.find(|w| w.workspace_id.to_string() == workspace_id)
.find(|w| w.workspace_id == workspace_id)
.unwrap()
.database_storage_id
.to_string();
.database_storage_id;
let collab = self
.api_client
.get_collab(QueryCollabParams::new(
workspace_database_id.clone(),
CollabType::WorkspaceDatabase,
workspace_id.to_string(),
workspace_id,
))
.await
.unwrap();
WorkspaceDatabase::from_collab_doc_state(
&workspace_database_id,
&workspace_database_id.to_string(),
CollabOrigin::Empty,
collab.encode_collab.into(),
)
@ -297,7 +292,7 @@ impl TestClient {
self.api_client.get_user_workspace_info().await.unwrap()
}
pub async fn open_workspace(&self, workspace_id: &str) -> AFWorkspace {
pub async fn open_workspace(&self, workspace_id: &Uuid) -> AFWorkspace {
self.api_client.open_workspace(workspace_id).await.unwrap()
}
@ -307,9 +302,9 @@ impl TestClient {
let data = self
.api_client
.get_collab(QueryCollabParams::new(
&workspace_id,
workspace_id,
CollabType::Folder,
&workspace_id,
workspace_id,
))
.await
.unwrap();
@ -318,20 +313,16 @@ impl TestClient {
uid,
CollabOrigin::Empty,
data.encode_collab.into(),
&workspace_id,
&workspace_id.to_string(),
vec![],
)
.unwrap()
}
pub async fn get_workspace_database_collab(&self, workspace_id: &str) -> Collab {
let db_storage_id = self.open_workspace(workspace_id).await.database_storage_id;
pub async fn get_workspace_database_collab(&self, workspace_id: Uuid) -> Collab {
let db_storage_id = self.open_workspace(&workspace_id).await.database_storage_id;
let collab_resp = self
.get_collab(
workspace_id.to_string(),
db_storage_id.to_string(),
CollabType::WorkspaceDatabase,
)
.get_collab(workspace_id, db_storage_id, CollabType::WorkspaceDatabase)
.await
.unwrap();
Collab::new_with_source(
@ -344,18 +335,14 @@ impl TestClient {
.unwrap()
}
pub async fn create_document_collab(&self, workspace_id: &str, object_id: &str) -> Document {
pub async fn create_document_collab(&self, workspace_id: Uuid, object_id: Uuid) -> Document {
let collab_resp = self
.get_collab(
workspace_id.to_string(),
object_id.to_string(),
CollabType::Document,
)
.get_collab(workspace_id, object_id, CollabType::Document)
.await
.unwrap();
let collab = Collab::new_with_source(
CollabOrigin::Server,
object_id,
&object_id.to_string(),
collab_resp.encode_collab.into(),
vec![],
false,
@ -364,7 +351,7 @@ impl TestClient {
Document::open(collab).unwrap()
}
pub async fn get_db_collab_from_view(&mut self, workspace_id: &str, view_id: &str) -> Collab {
pub async fn get_db_collab_from_view(&mut self, workspace_id: Uuid, view_id: &str) -> Collab {
let ws_db_collab = self.get_workspace_database_collab(workspace_id).await;
let ws_db_body = WorkspaceDatabase::open(ws_db_collab).unwrap();
let db_id = ws_db_body
@ -372,18 +359,16 @@ impl TestClient {
.into_iter()
.find(|db_meta| db_meta.linked_views.contains(&view_id.to_string()))
.unwrap()
.database_id;
.database_id
.parse::<Uuid>()
.unwrap();
let db_collab_collab_resp = self
.get_collab(
workspace_id.to_string(),
db_id.clone(),
CollabType::Database,
)
.get_collab(workspace_id, db_id, CollabType::Database)
.await
.unwrap();
Collab::new_with_source(
CollabOrigin::Server,
&db_id,
&db_id.to_string(),
db_collab_collab_resp.encode_collab.into(),
vec![],
false,
@ -394,19 +379,19 @@ impl TestClient {
pub async fn get_user_awareness(&self) -> UserAwareness {
let workspace_id = self.workspace_id().await;
let profile = self.get_user_profile().await;
let awareness_object_id = user_awareness_object_id(&profile.uuid, &workspace_id).to_string();
let awareness_object_id = user_awareness_object_id(&profile.uuid, &workspace_id);
let data = self
.api_client
.get_collab(QueryCollabParams::new(
&awareness_object_id,
awareness_object_id,
CollabType::UserAwareness,
&workspace_id,
workspace_id,
))
.await
.unwrap();
let collab = Collab::new_with_source(
CollabOrigin::Empty,
&awareness_object_id,
&awareness_object_id.to_string(),
DataSource::DocStateV1(data.encode_collab.doc_state.to_vec()),
vec![],
false,
@ -501,9 +486,9 @@ impl TestClient {
self.api_client.get_workspace_members(workspace_id).await
}
pub async fn get_workspace_member(&self, workspace_id: &str, user_id: i64) -> AFWorkspaceMember {
pub async fn get_workspace_member(&self, workspace_id: Uuid, user_id: i64) -> AFWorkspaceMember {
let params = QueryWorkspaceMember {
workspace_id: workspace_id.to_string(),
workspace_id,
uid: user_id,
};
self.api_client.get_workspace_member(params).await.unwrap()
@ -511,11 +496,11 @@ impl TestClient {
pub async fn try_get_workspace_member(
&self,
workspace_id: &str,
workspace_id: Uuid,
user_id: i64,
) -> Result<AFWorkspaceMember, AppResponseError> {
let params = QueryWorkspaceMember {
workspace_id: workspace_id.to_string(),
workspace_id,
uid: user_id,
};
@ -551,7 +536,7 @@ impl TestClient {
}
#[allow(dead_code)]
pub async fn get_blob_metadata(&self, workspace_id: &str, file_id: &str) -> BlobMetadata {
pub async fn get_blob_metadata(&self, workspace_id: &Uuid, file_id: &str) -> BlobMetadata {
let url = self.api_client.get_blob_url(workspace_id, file_id);
self.api_client.get_blob_metadata(&url).await.unwrap()
}
@ -577,7 +562,7 @@ impl TestClient {
.unwrap()
}
pub async fn workspace_id(&self) -> String {
pub async fn workspace_id(&self) -> Uuid {
self
.api_client
.get_workspaces()
@ -586,7 +571,6 @@ impl TestClient {
.first()
.unwrap()
.workspace_id
.to_string()
}
pub async fn email(&self) -> String {
@ -783,8 +767,8 @@ impl TestClient {
pub async fn get_collab(
&self,
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) -> Result<CollabResponse, AppResponseError> {
self
@ -801,16 +785,16 @@ impl TestClient {
pub async fn get_collab_to_collab(
&self,
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) -> Result<Collab, AppResponseError> {
let resp = self
.get_collab(workspace_id, object_id.clone(), collab_type)
.get_collab(workspace_id, object_id, collab_type)
.await?;
let collab = Collab::new_with_source(
CollabOrigin::Server,
&object_id,
&object_id.to_string(),
resp.encode_collab.into(),
vec![],
false,
@ -830,12 +814,12 @@ impl TestClient {
#[allow(clippy::await_holding_lock)]
pub async fn create_and_edit_collab(
&mut self,
workspace_id: &str,
workspace_id: Uuid,
collab_type: CollabType,
) -> String {
let object_id = Uuid::new_v4().to_string();
) -> Uuid {
let object_id = Uuid::new_v4();
self
.create_and_edit_collab_with_data(&object_id, workspace_id, collab_type, None)
.create_and_edit_collab_with_data(object_id, workspace_id, collab_type, None)
.await;
object_id
}
@ -843,18 +827,18 @@ impl TestClient {
#[allow(unused_variables)]
pub async fn create_and_edit_collab_with_data(
&mut self,
object_id: &str,
workspace_id: &str,
object_id: Uuid,
workspace_id: Uuid,
collab_type: CollabType,
encoded_collab_v1: Option<EncodedCollab>,
) {
// Subscribe to object
let origin = CollabOrigin::Client(CollabClient::new(self.uid().await, self.device_id.clone()));
let mut collab = match encoded_collab_v1 {
None => Collab::new_with_origin(origin.clone(), object_id, vec![], false),
None => Collab::new_with_origin(origin.clone(), object_id.to_string(), vec![], false),
Some(data) => Collab::new_with_source(
origin.clone(),
object_id,
&object_id.to_string(),
DataSource::DocStateV1(data.doc_state.to_vec()),
vec![],
false,
@ -872,10 +856,10 @@ impl TestClient {
self
.api_client
.create_collab(CreateCollabParams {
object_id: object_id.to_string(),
object_id,
encoded_collab_v1,
collab_type,
workspace_id: workspace_id.to_string(),
collab_type: collab_type.clone(),
workspace_id,
})
.await
.unwrap();
@ -912,10 +896,13 @@ impl TestClient {
}
let test_collab = TestCollab { origin, collab };
self.collabs.insert(object_id.to_string(), test_collab);
self.wait_object_sync_complete(object_id).await.unwrap();
self
.wait_object_sync_complete(&object_id.to_string())
.await
.unwrap();
}
pub async fn open_workspace_collab(&mut self, workspace_id: &str) {
pub async fn open_workspace_collab(&mut self, workspace_id: Uuid) {
self
.open_collab(workspace_id, workspace_id, CollabType::Unknown)
.await;
@ -924,8 +911,8 @@ impl TestClient {
#[allow(clippy::await_holding_lock)]
pub async fn open_collab(
&mut self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) {
self
@ -936,8 +923,8 @@ impl TestClient {
#[allow(unused_variables)]
pub async fn open_collab_with_doc_state(
&mut self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
doc_state: Vec<u8>,
) {
@ -945,7 +932,7 @@ impl TestClient {
let origin = CollabOrigin::Client(CollabClient::new(self.uid().await, self.device_id.clone()));
let mut collab = Collab::new_with_source(
origin.clone(),
object_id,
&object_id.to_string(),
DataSource::DocStateV1(doc_state),
vec![],
false,
@ -991,8 +978,8 @@ impl TestClient {
#[allow(unused_variables)]
pub async fn create_collab_with_data(
&mut self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
encoded_collab_v1: EncodedCollab,
) -> Result<(), AppResponseError> {
@ -1000,7 +987,7 @@ impl TestClient {
let origin = CollabOrigin::Client(CollabClient::new(self.uid().await, self.device_id.clone()));
let collab = Collab::new_with_source(
origin.clone(),
object_id,
&object_id.to_string(),
DataSource::DocStateV1(encoded_collab_v1.doc_state.to_vec()),
vec![],
false,
@ -1016,10 +1003,10 @@ impl TestClient {
self
.api_client
.create_collab(CreateCollabParams {
object_id: object_id.to_string(),
object_id,
encoded_collab_v1,
collab_type,
workspace_id: workspace_id.to_string(),
collab_type: collab_type.clone(),
workspace_id,
})
.await
}
@ -1156,15 +1143,15 @@ pub async fn assert_server_snapshot(
}
pub async fn assert_server_collab(
workspace_id: &str,
workspace_id: Uuid,
client: &mut client_api::Client,
object_id: &str,
collab_type: CollabType,
object_id: Uuid,
collab_type: &CollabType,
timeout_secs: u64,
expected: Value,
) -> Result<(), Error> {
let duration = Duration::from_secs(timeout_secs);
let object_id = object_id.to_string();
let collab_type = collab_type.clone();
let final_json = Arc::new(Mutex::from(json!({})));
// Use tokio::time::timeout to apply a timeout to the entire operation
@ -1173,8 +1160,8 @@ pub async fn assert_server_collab(
loop {
let result = client
.get_collab(QueryCollabParams::new(
&object_id,
collab_type,
object_id,
collab_type.clone(),
workspace_id,
))
.await;
@ -1183,7 +1170,7 @@ pub async fn assert_server_collab(
Ok(data) => {
let json = Collab::new_with_source(
CollabOrigin::Empty,
&object_id,
&object_id.to_string(),
DataSource::DocStateV1(data.encode_collab.doc_state.to_vec()),
vec![],
false,
@ -1292,29 +1279,6 @@ pub async fn assert_client_collab_include_value(
}
}
#[allow(dead_code)]
pub async fn get_collab_json_from_server(
client: &client_api::Client,
workspace_id: &str,
object_id: &str,
collab_type: CollabType,
) -> Value {
let bytes = client
.get_collab(QueryCollabParams::new(object_id, collab_type, workspace_id))
.await
.unwrap();
Collab::new_with_source(
CollabOrigin::Empty,
object_id,
DataSource::DocStateV1(bytes.encode_collab.doc_state.to_vec()),
vec![],
false,
)
.unwrap()
.to_json_value()
}
pub async fn collect_answer(mut stream: QuestionStream) -> String {
let mut answer = String::new();
while let Some(value) = stream.next().await {

View file

@ -12,6 +12,7 @@ use futures_util::{SinkExt, StreamExt};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, trace, warn};
use uuid::Uuid;
use yrs::encoding::read::Cursor;
use yrs::updates::decoder::DecoderV1;
use yrs::updates::encoder::Encode;
@ -31,7 +32,7 @@ pub type CollabRef = Weak<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>
/// Use to continuously receive updates from remote.
pub struct ObserveCollab<Sink, Stream> {
object_id: String,
object_id: Uuid,
#[allow(dead_code)]
weak_collab: CollabRef,
phantom_sink: PhantomData<Sink>,
@ -76,7 +77,7 @@ where
sink.clone(),
cloned_weak_collab.clone(),
interval,
object_id.clone(),
object_id.to_string(),
));
}
tokio::spawn(ObserveCollab::<Sink, Stream>::observer_collab_message(
@ -339,7 +340,7 @@ where
// before sending the SyncStep1 to the server.
if is_server_sync_step_1 && sync_object.collab_type == CollabType::Folder {
let lock = collab.read().await;
validate_data_for_folder((*lock).borrow(), &sync_object.workspace_id)
validate_data_for_folder((*lock).borrow(), &sync_object.workspace_id.to_string())
.map_err(|err| SyncError::OverrideWithIncorrectData(err.to_string()))?;
}
@ -352,14 +353,14 @@ where
if is_server_sync_step_1 {
ClientCollabMessage::new_server_init_sync(ServerInit::new(
message_origin.clone(),
object_id,
object_id.to_string(),
return_payload,
msg_id,
))
} else {
ClientCollabMessage::new_update_sync(UpdateSync::new(
message_origin.clone(),
object_id,
object_id.to_string(),
return_payload,
msg_id,
))
@ -450,7 +451,7 @@ impl SeqNumCounter {
/// messages may have been missed, and an error is returned.
pub fn check_broadcast_contiguous(
&self,
_object_id: &str,
_object_id: &Uuid,
broadcast_seq_num: u32,
) -> Result<(), SyncError> {
let current = self.broadcast_seq_counter.load(Ordering::SeqCst);
@ -467,7 +468,7 @@ impl SeqNumCounter {
Ok(())
}
pub fn check_ack_broadcast_contiguous(&self, object_id: &str) -> Result<(), SyncError> {
pub fn check_ack_broadcast_contiguous(&self, object_id: &Uuid) -> Result<(), SyncError> {
let ack_seq_num = self.ack_seq_counter.load(Ordering::SeqCst);
let broadcast_seq_num = self.broadcast_seq_counter.load(Ordering::SeqCst);
if cfg!(feature = "sync_verbose_log") {

View file

@ -15,6 +15,7 @@ use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Action, Condition, RetryIf};
use tokio_stream::StreamExt;
use tracing::{error, trace};
use uuid::Uuid;
use yrs::updates::encoder::Encode;
use client_api_entity::{CollabObject, CollabType};
@ -170,7 +171,7 @@ where
self.sync_queue.queue_msg(|msg_id| {
let update_sync = UpdateSync::new(
origin.clone(),
self.object.object_id.clone(),
self.object.object_id.to_string(),
payload,
msg_id,
);
@ -219,36 +220,37 @@ where
#[derive(Clone, Debug)]
pub struct SyncObject {
pub object_id: String,
pub workspace_id: String,
pub object_id: Uuid,
pub workspace_id: Uuid,
pub collab_type: CollabType,
pub device_id: String,
}
impl SyncObject {
pub fn new(
object_id: &str,
workspace_id: &str,
object_id: Uuid,
workspace_id: Uuid,
collab_type: CollabType,
device_id: &str,
) -> Self {
Self {
object_id: object_id.to_string(),
workspace_id: workspace_id.to_string(),
object_id,
workspace_id,
collab_type,
device_id: device_id.to_string(),
}
}
}
impl From<CollabObject> for SyncObject {
fn from(collab_object: CollabObject) -> Self {
Self {
object_id: collab_object.object_id,
workspace_id: collab_object.workspace_id,
impl TryFrom<CollabObject> for SyncObject {
type Error = anyhow::Error;
fn try_from(collab_object: CollabObject) -> Result<Self, Self::Error> {
Ok(Self {
object_id: Uuid::parse_str(&collab_object.object_id)?,
workspace_id: Uuid::parse_str(&collab_object.workspace_id)?,
collab_type: collab_object.collab_type,
device_id: collab_object.device_id,
}
})
}
}

View file

@ -209,9 +209,9 @@ where
sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
sync_object.object_id.clone(),
sync_object.collab_type,
sync_object.workspace_id.clone(),
sync_object.object_id.to_string(),
sync_object.collab_type.clone(),
sync_object.workspace_id.to_string(),
msg_id,
payload,
);
@ -228,7 +228,7 @@ where
sink.queue_msg(|msg_id| {
let update_sync = UpdateSync::new(
origin.clone(),
sync_object.object_id.clone(),
sync_object.object_id.to_string(),
update,
msg_id,
);
@ -250,9 +250,9 @@ where
sink.queue_init_sync(|msg_id| {
let init_sync = InitSync::new(
origin,
sync_object.object_id.clone(),
sync_object.collab_type,
sync_object.workspace_id.clone(),
sync_object.object_id.to_string(),
sync_object.collab_type.clone(),
sync_object.workspace_id.to_string(),
msg_id,
payload,
);

View file

@ -23,11 +23,15 @@ use parking_lot::RwLock;
use reqwest::Method;
use reqwest::RequestBuilder;
use crate::retry::{RefreshTokenAction, RefreshTokenRetryCondition};
use crate::ws::ConnectInfo;
use anyhow::anyhow;
use client_api_entity::SignUpResponse::{Authenticated, NotAuthenticated};
use client_api_entity::{
AFSnapshotMeta, AFSnapshotMetas, AFUserProfile, AFUserWorkspaceInfo, AFWorkspace,
QuerySnapshotParams, SnapshotData,
};
use client_api_entity::{GotrueTokenResponse, UpdateGotrueUserParams, User};
use semver::Version;
use shared_entity::dto::auth_dto::SignInTokenResponse;
use shared_entity::dto::auth_dto::UpdateUserParams;
@ -40,11 +44,7 @@ use tokio_retry::strategy::FixedInterval;
use tokio_retry::RetryIf;
use tracing::{debug, error, event, info, instrument, trace, warn};
use url::Url;
use crate::retry::{RefreshTokenAction, RefreshTokenRetryCondition};
use crate::ws::ConnectInfo;
use client_api_entity::SignUpResponse::{Authenticated, NotAuthenticated};
use client_api_entity::{GotrueTokenResponse, UpdateGotrueUserParams, User};
use uuid::Uuid;
pub const X_COMPRESSION_TYPE: &str = "X-Compression-Type";
pub const X_COMPRESSION_BUFFER_SIZE: &str = "X-Compression-Buffer-Size";
@ -713,7 +713,7 @@ impl Client {
}
#[instrument(level = "info", skip_all, err)]
pub async fn open_workspace(&self, workspace_id: &str) -> Result<AFWorkspace, AppResponseError> {
pub async fn open_workspace(&self, workspace_id: &Uuid) -> Result<AFWorkspace, AppResponseError> {
let url = format!("{}/api/workspace/{}/open", self.base_url, workspace_id);
let resp = self
.http_client_with_auth(Method::PUT, &url)
@ -959,7 +959,7 @@ impl Client {
#[instrument(level = "info", skip_all)]
pub async fn get_workspace_usage(
&self,
workspace_id: &str,
workspace_id: &Uuid,
) -> Result<WorkspaceSpaceUsage, AppResponseError> {
let url = format!("{}/api/file_storage/{}/usage", self.base_url, workspace_id);
let resp = self

View file

@ -13,9 +13,10 @@ use shared_entity::response::{AppResponse, AppResponseError};
use shared_entity::dto::file_dto::PutFileResponse;
use tracing::instrument;
use url::Url;
use uuid::Uuid;
impl Client {
pub fn get_blob_url(&self, workspace_id: &str, file_id: &str) -> String {
pub fn get_blob_url(&self, workspace_id: &Uuid, file_id: &str) -> String {
format!(
"{}/api/file_storage/{}/blob/{}",
self.base_url, workspace_id, file_id

View file

@ -18,6 +18,7 @@ tracing.workspace = true
async-trait.workspace = true
tokio = "1.36.0"
collab-entity.workspace = true
uuid.workspace = true
[features]
verbose_log = []

View file

@ -5,9 +5,10 @@ use collab::entity::EncodedCollab;
use collab::preclude::Collab;
use collab_entity::CollabType;
use tracing::instrument;
use uuid::Uuid;
#[inline]
pub async fn collab_from_encode_collab(object_id: &str, data: &[u8]) -> Result<Collab, Error> {
pub async fn collab_from_encode_collab(object_id: &Uuid, data: &[u8]) -> Result<Collab, Error> {
let object_id = object_id.to_string();
let data = data.to_vec();
@ -29,7 +30,7 @@ pub async fn collab_from_encode_collab(object_id: &str, data: &[u8]) -> Result<C
#[instrument(level = "trace", skip(data), fields(len = %data.len()))]
#[inline]
pub async fn validate_encode_collab(
object_id: &str,
object_id: &Uuid,
data: &[u8],
collab_type: &CollabType,
) -> Result<(), Error> {

View file

@ -52,8 +52,8 @@ impl AwarenessGossip {
pub async fn sink(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: &Uuid,
object_id: &Uuid,
) -> Result<AwarenessUpdateSink, StreamError> {
let sink = AwarenessUpdateSink::new(self.conn.clone(), workspace_id, object_id);
Ok(sink)

View file

@ -72,15 +72,15 @@ impl CollabRedisStream {
.await
}
pub fn collab_update_sink(&self, workspace_id: &str, object_id: &str) -> CollabUpdateSink {
pub fn collab_update_sink(&self, workspace_id: &Uuid, object_id: &Uuid) -> CollabUpdateSink {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
CollabUpdateSink::new(self.connection_manager.clone(), stream_key)
}
pub async fn awareness_update_sink(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: &Uuid,
object_id: &Uuid,
) -> Result<AwarenessUpdateSink, StreamError> {
self.awareness_gossip.sink(workspace_id, object_id).await
}
@ -89,8 +89,8 @@ impl CollabRedisStream {
/// from a given message id. Once Redis stream return no more results, the stream will be closed.
pub async fn current_collab_updates(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: &Uuid,
object_id: &Uuid,
since: Option<MessageId>,
) -> Result<Vec<(MessageId, CollabStreamUpdate)>, StreamError> {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
@ -115,8 +115,8 @@ impl CollabRedisStream {
/// coming from corresponding Redis stream until explicitly closed.
pub fn live_collab_updates(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: &Uuid,
object_id: &Uuid,
since: Option<MessageId>,
) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);

View file

@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use uuid::Uuid;
/// The [MessageId] generated by XADD has two parts: a timestamp and a sequence number, separated by
/// a hyphen (-). The timestamp is based on the server's time when the message is added, and the
@ -111,7 +112,7 @@ impl CollabStreamUpdate {
}
/// Returns Redis stream key, that's storing entries mapped to/from [CollabStreamUpdate].
pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
pub fn stream_key(workspace_id: &Uuid, object_id: &Uuid) -> String {
// use `:` separator as it adheres to Redis naming conventions
format!("af:{}:{}:updates", workspace_id, object_id)
}

View file

@ -19,16 +19,36 @@ use tracing::error;
use uuid::Uuid;
use validator::Validate;
mod uuid_str {
use serde::Deserialize;
use uuid::Uuid;
pub fn serialize<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&uuid.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Uuid, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Uuid::parse_str(&s).map_err(serde::de::Error::custom)
}
}
/// The default compression level of ZSTD-compressed collabs.
pub const ZSTD_COMPRESSION_LEVEL: i32 = 3;
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct CreateCollabParams {
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
#[validate(custom(function = "validate_not_empty_payload"))]
pub encoded_collab_v1: Vec<u8>,
@ -36,8 +56,8 @@ pub struct CreateCollabParams {
pub collab_type: CollabType,
}
impl From<(String, CollabParams)> for CreateCollabParams {
fn from((workspace_id, collab_params): (String, CollabParams)) -> Self {
impl From<(Uuid, CollabParams)> for CreateCollabParams {
fn from((workspace_id, collab_params): (Uuid, CollabParams)) -> Self {
Self {
workspace_id,
object_id: collab_params.object_id,
@ -48,7 +68,7 @@ impl From<(String, CollabParams)> for CreateCollabParams {
}
impl CreateCollabParams {
pub fn split(self) -> (CollabParams, String) {
pub fn split(self) -> (CollabParams, Uuid) {
(
CollabParams {
object_id: self.object_id,
@ -70,13 +90,13 @@ impl CreateCollabParams {
pub struct CollabIndexParams {}
pub struct PendingCollabWrite {
pub workspace_id: String,
pub workspace_id: Uuid,
pub uid: i64,
pub params: CollabParams,
}
impl PendingCollabWrite {
pub fn new(workspace_id: String, uid: i64, params: CollabParams) -> Self {
pub fn new(workspace_id: Uuid, uid: i64, params: CollabParams) -> Self {
PendingCollabWrite {
workspace_id,
uid,
@ -87,8 +107,8 @@ impl PendingCollabWrite {
#[derive(Debug, Clone, Validate, Serialize, Deserialize, PartialEq)]
pub struct CollabParams {
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
#[validate(custom(function = "validate_not_empty_payload"))]
pub encoded_collab_v1: Bytes,
pub collab_type: CollabType,
@ -107,12 +127,11 @@ impl Display for CollabParams {
}
impl CollabParams {
pub fn new<T: ToString, B: Into<Bytes>>(
object_id: T,
pub fn new<B: Into<Bytes>>(
object_id: Uuid,
collab_type: CollabType,
encoded_collab_v1: B,
) -> Self {
let object_id = object_id.to_string();
Self {
object_id,
collab_type,
@ -141,7 +160,7 @@ impl CollabParams {
pub fn to_proto(&self) -> proto::collab::CollabParams {
proto::collab::CollabParams {
object_id: self.object_id.clone(),
object_id: self.object_id.to_string(),
encoded_collab: self.encoded_collab_v1.to_vec(),
collab_type: self.collab_type.to_proto() as i32,
embeddings: None,
@ -167,7 +186,8 @@ impl TryFrom<proto::collab::CollabParams> for CollabParams {
let collab_type_proto = proto::collab::CollabType::try_from(proto.collab_type).unwrap();
let collab_type = CollabType::from_proto(&collab_type_proto);
Ok(Self {
object_id: proto.object_id,
object_id: Uuid::from_str(&proto.object_id)
.map_err(|e| EntityError::DeserializationError(e.to_string()))?,
encoded_collab_v1: Bytes::from(proto.encoded_collab),
collab_type,
})
@ -176,7 +196,8 @@ impl TryFrom<proto::collab::CollabParams> for CollabParams {
#[derive(Serialize, Deserialize)]
struct CollabParamsV0 {
object_id: String,
#[serde(with = "uuid_str")]
object_id: Uuid,
encoded_collab_v1: Vec<u8>,
collab_type: CollabType,
}
@ -206,28 +227,30 @@ pub struct UpdateCollabWebParams {
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct DeleteCollabParams {
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
}
#[derive(Debug, Clone, Validate)]
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct InsertSnapshotParams {
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
#[validate(custom(function = "validate_not_empty_payload"))]
pub doc_state: Bytes,
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
pub collab_type: CollabType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotData {
pub object_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
pub encoded_collab_v1: Vec<u8>,
pub workspace_id: String,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
@ -235,10 +258,9 @@ pub struct QuerySnapshotParams {
pub snapshot_id: i64,
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
#[derive(Debug, Clone, Validate)]
pub struct QueryCollabParams {
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
pub workspace_id: Uuid,
#[validate(nested)]
pub inner: QueryCollab,
}
@ -254,13 +276,7 @@ impl Display for QueryCollabParams {
}
impl QueryCollabParams {
pub fn new<T1: Into<String>, T2: Into<String>>(
object_id: T1,
collab_type: CollabType,
workspace_id: T2,
) -> Self {
let workspace_id = workspace_id.into();
let object_id = object_id.into();
pub fn new(object_id: Uuid, collab_type: CollabType, workspace_id: Uuid) -> Self {
let inner = QueryCollab {
object_id,
collab_type,
@ -282,11 +298,11 @@ impl Deref for QueryCollabParams {
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct QueryCollab {
pub object_id: String,
pub object_id: Uuid,
pub collab_type: CollabType,
}
impl QueryCollab {
pub fn new(object_id: String, collab_type: CollabType) -> Self {
pub fn new(object_id: Uuid, collab_type: CollabType) -> Self {
Self {
object_id,
collab_type,
@ -323,7 +339,8 @@ pub struct AFSnapshotMetas(pub Vec<AFSnapshotMeta>);
#[derive(Debug, Clone, Deserialize)]
pub struct QueryObjectSnapshotParams {
pub object_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
}
#[derive(Serialize, Deserialize, Debug)]
@ -354,10 +371,10 @@ pub struct WorkspaceUsage {
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct InsertCollabMemberParams {
pub uid: i64,
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
pub access_level: AFAccessLevel,
}
@ -366,10 +383,10 @@ pub type UpdateCollabMemberParams = InsertCollabMemberParams;
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct WorkspaceCollabIdentify {
pub uid: i64,
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
}
#[derive(Serialize, Deserialize)]
@ -391,16 +408,16 @@ pub struct DefaultPublishViewInfoMeta {
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct QueryCollabMembers {
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
}
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct QueryWorkspaceMember {
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
pub uid: i64,
}
@ -741,7 +758,8 @@ impl From<i16> for AFWorkspaceInvitationStatus {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AFCollabEmbeddedChunk {
pub fragment_id: String,
pub object_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
pub content_type: EmbeddingContentType,
pub content: String,
pub embedding: Option<Vec<f32>>,

View file

@ -46,25 +46,23 @@ use uuid::Uuid;
pub async fn insert_into_af_collab(
tx: &mut Transaction<'_, sqlx::Postgres>,
uid: &i64,
workspace_id: &str,
workspace_id: &Uuid,
params: &CollabParams,
) -> Result<(), AppError> {
let partition_key = crate::collab::partition_key_from_collab_type(&params.collab_type);
let workspace_id = Uuid::from_str(workspace_id)?;
tracing::trace!(
"upsert collab:{}, len:{}",
params.object_id,
params.encoded_collab_v1.len(),
);
let oid = params.object_id.parse::<Uuid>()?;
sqlx::query!(
r#"
INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (oid)
DO UPDATE SET blob = $2, len = $3, owner_uid = $5 WHERE excluded.workspace_id = af_collab.workspace_id;
"#,
oid,
params.object_id,
params.encoded_collab_v1.as_ref(),
params.encoded_collab_v1.len() as i32,
partition_key,
@ -141,7 +139,7 @@ pub async fn insert_into_af_collab(
pub async fn insert_into_af_collab_bulk_for_user(
tx: &mut Transaction<'_, Postgres>,
uid: &i64,
workspace_id: &str,
workspace_id: Uuid,
collab_params_list: &[CollabParams],
) -> Result<(), AppError> {
if collab_params_list.is_empty() {
@ -149,7 +147,6 @@ pub async fn insert_into_af_collab_bulk_for_user(
}
let encrypt = 0;
let workspace_uuid = Uuid::from_str(workspace_id)?;
// Insert values into `af_collab` tables in bulk
let len = collab_params_list.len();
@ -159,7 +156,7 @@ pub async fn insert_into_af_collab_bulk_for_user(
let mut partition_keys: Vec<i32> = Vec::with_capacity(len);
let mut visited = HashSet::with_capacity(collab_params_list.len());
for params in collab_params_list {
let oid = params.object_id.parse::<Uuid>()?;
let oid = params.object_id;
if visited.insert(oid) {
let partition_key = partition_key_from_collab_type(&params.collab_type);
object_ids.push(oid);
@ -170,7 +167,7 @@ pub async fn insert_into_af_collab_bulk_for_user(
}
let uids: Vec<i64> = vec![*uid; object_ids.len()];
let workspace_ids: Vec<Uuid> = vec![workspace_uuid; object_ids.len()];
let workspace_ids: Vec<Uuid> = vec![workspace_id; object_ids.len()];
// Bulk insert into `af_collab` for the provided collab params
sqlx::query!(
r#"
@ -257,7 +254,7 @@ pub async fn batch_select_collab_blob(
object_ids_by_collab_type
.entry(params.collab_type)
.or_default()
.push(params.object_id.parse().unwrap());
.push(params.object_id);
}
for (collab_type, mut object_ids) in object_ids_by_collab_type.into_iter() {
@ -338,7 +335,7 @@ pub async fn create_snapshot(
///
#[inline]
pub async fn latest_snapshot_time<'a, E: Executor<'a, Database = Postgres>>(
oid: &str,
oid: &Uuid,
executor: E,
) -> Result<Option<chrono::DateTime<Utc>>, sqlx::Error> {
let latest_snapshot_time: Option<chrono::DateTime<Utc>> = sqlx::query_scalar(
@ -352,7 +349,7 @@ pub async fn latest_snapshot_time<'a, E: Executor<'a, Database = Postgres>>(
}
#[inline]
pub async fn should_create_snapshot2<'a, E: Executor<'a, Database = Postgres>>(
oid: &str,
oid: &Uuid,
executor: E,
) -> Result<bool, sqlx::Error> {
let hours = Utc::now() - Duration::hours(SNAPSHOT_PER_HOUR);
@ -375,12 +372,11 @@ pub async fn should_create_snapshot2<'a, E: Executor<'a, Database = Postgres>>(
///
pub async fn create_snapshot_and_maintain_limit<'a>(
mut transaction: Transaction<'a, Postgres>,
workspace_id: &str,
oid: &str,
workspace_id: &Uuid,
oid: &Uuid,
encoded_collab_v1: &[u8],
snapshot_limit: i64,
) -> Result<AFSnapshotMeta, AppError> {
let workspace_id = Uuid::from_str(workspace_id)?;
let snapshot_meta = sqlx::query_as!(
AFSnapshotMeta,
r#"
@ -388,7 +384,7 @@ pub async fn create_snapshot_and_maintain_limit<'a>(
VALUES ($1, $2, $3, $4, $5)
RETURNING sid AS snapshot_id, oid AS object_id, created_at
"#,
oid,
oid.to_string(),
encoded_collab_v1,
encoded_collab_v1.len() as i64,
0,
@ -420,11 +416,10 @@ pub async fn create_snapshot_and_maintain_limit<'a>(
#[inline]
pub async fn select_snapshot(
pg_pool: &PgPool,
workspace_id: &str,
object_id: &str,
workspace_id: &Uuid,
object_id: &Uuid,
snapshot_id: &i64,
) -> Result<Option<AFSnapshotRow>, Error> {
let workspace_id = Uuid::from_str(workspace_id).map_err(|err| Error::Decode(err.into()))?;
let row = sqlx::query_as!(
AFSnapshotRow,
r#"
@ -432,7 +427,7 @@ pub async fn select_snapshot(
WHERE sid = $1 AND oid = $2 AND workspace_id = $3 AND deleted_at IS NULL;
"#,
snapshot_id,
object_id,
object_id.to_string(),
workspace_id
)
.fetch_optional(pg_pool)
@ -465,7 +460,7 @@ pub async fn select_latest_snapshot(
/// Returns list of snapshots for given object_id in descending order of creation time.
pub async fn get_all_collab_snapshot_meta(
pg_pool: &PgPool,
object_id: &str,
object_id: &Uuid,
) -> Result<AFSnapshotMetas, Error> {
let snapshots: Vec<AFSnapshotMeta> = sqlx::query_as!(
AFSnapshotMeta,
@ -475,7 +470,7 @@ pub async fn get_all_collab_snapshot_meta(
WHERE oid = $1 AND deleted_at IS NULL
ORDER BY created_at DESC;
"#,
object_id
object_id.to_string()
)
.fetch_all(pg_pool)
.await?;

View file

@ -11,6 +11,7 @@ use collab::entity::EncodedCollab;
use serde::{Deserialize, Serialize};
use sqlx::Transaction;
use std::collections::HashMap;
use uuid::Uuid;
pub const COLLAB_SNAPSHOT_LIMIT: i64 = 30;
pub const SNAPSHOT_PER_HOUR: i64 = 6;
@ -21,30 +22,39 @@ pub type AppResult<T, E = AppError> = core::result::Result<T, E>;
#[async_trait]
pub trait CollabStorageAccessControl: Send + Sync + 'static {
/// Updates the cache of the access level of the user for given collab object.
async fn update_policy(&self, uid: &i64, oid: &str, level: AFAccessLevel)
-> Result<(), AppError>;
async fn update_policy(
&self,
uid: &i64,
oid: &Uuid,
level: AFAccessLevel,
) -> Result<(), AppError>;
/// Removes the access level of the user for given collab object.
async fn enforce_read_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<(), AppError>;
/// Enforce the user's permission to write to the collab object.
async fn enforce_write_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<(), AppError>;
/// Enforce the user's permission to write to the workspace.
async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &str) -> Result<(), AppError>;
async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &Uuid) -> Result<(), AppError>;
/// Enforce the user's permission to delete the collab object.
async fn enforce_delete(&self, workspace_id: &str, uid: &i64, oid: &str) -> Result<(), AppError>;
async fn enforce_delete(
&self,
workspace_id: &Uuid,
uid: &i64,
oid: &Uuid,
) -> Result<(), AppError>;
}
#[derive(Clone)]
@ -70,7 +80,7 @@ pub trait CollabStorage: Send + Sync + 'static {
///
async fn queue_insert_or_update_collab(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params: CollabParams,
flush_to_disk: bool,
@ -78,7 +88,7 @@ pub trait CollabStorage: Send + Sync + 'static {
async fn batch_insert_new_collab(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params: Vec<CollabParams>,
) -> AppResult<()>;
@ -94,7 +104,7 @@ pub trait CollabStorage: Send + Sync + 'static {
/// * `Result<()>` - Returns `Ok(())` if the collaboration was created successfully, `Err` otherwise.
async fn upsert_new_collab_with_transaction(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params: CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
@ -120,7 +130,7 @@ pub trait CollabStorage: Send + Sync + 'static {
async fn batch_get_collab(
&self,
uid: &i64,
workspace_id: &str,
workspace_id: Uuid,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult>;
@ -134,37 +144,60 @@ pub trait CollabStorage: Send + Sync + 'static {
/// # Returns
///
/// * `Result<()>` - Returns `Ok(())` if the collaboration was deleted successfully, `Err` otherwise.
async fn delete_collab(&self, workspace_id: &str, uid: &i64, object_id: &str) -> AppResult<()>;
async fn delete_collab(&self, workspace_id: &Uuid, uid: &i64, object_id: &Uuid) -> AppResult<()>;
async fn should_create_snapshot(&self, workspace_id: &str, oid: &str) -> Result<bool, AppError>;
async fn should_create_snapshot(&self, workspace_id: &Uuid, oid: &Uuid)
-> Result<bool, AppError>;
async fn create_snapshot(&self, params: InsertSnapshotParams) -> AppResult<AFSnapshotMeta>;
async fn queue_snapshot(&self, params: InsertSnapshotParams) -> AppResult<()>;
async fn get_collab_snapshot(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
snapshot_id: &i64,
) -> AppResult<SnapshotData>;
async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) -> AppResult<Option<SnapshotData>>;
/// Returns list of snapshots for given object_id in descending order of creation time.
async fn get_collab_snapshot_list(
&self,
workspace_id: &str,
oid: &str,
workspace_id: &Uuid,
oid: &Uuid,
) -> AppResult<AFSnapshotMetas>;
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollabMetadata {
pub object_id: String,
pub workspace_id: String,
#[serde(with = "uuid_str")]
pub object_id: Uuid,
#[serde(with = "uuid_str")]
pub workspace_id: Uuid,
}
mod uuid_str {
use serde::Deserialize;
use uuid::Uuid;
pub fn serialize<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&uuid.to_string())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Uuid, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Uuid::parse_str(&s).map_err(serde::de::Error::custom)
}
}

View file

@ -164,7 +164,7 @@ async fn insert_snapshot_state<'a, E: Executor<'a, Database = Postgres>>(
/// that has a `created_at` timestamp greater than or equal to the specified timestamp.
///
pub async fn get_latest_snapshot_state<'a, E: Executor<'a, Database = Postgres>>(
oid: &str,
oid: &Uuid,
timestamp: i64,
collab_type: &CollabType,
executor: E,
@ -179,7 +179,7 @@ pub async fn get_latest_snapshot_state<'a, E: Executor<'a, Database = Postgres>>
ORDER BY created_at ASC
LIMIT 1
"#,
oid,
oid.to_string(),
partition_key,
timestamp,
)
@ -190,7 +190,7 @@ pub async fn get_latest_snapshot_state<'a, E: Executor<'a, Database = Postgres>>
/// Gets the latest snapshot for the specified object identifier and partition key.
pub async fn get_latest_snapshot(
oid: &str,
oid: &Uuid,
collab_type: &CollabType,
pool: &PgPool,
) -> Result<Option<SingleSnapshotInfoPb>, sqlx::Error> {
@ -206,7 +206,7 @@ pub async fn get_latest_snapshot(
ORDER BY created_at DESC
LIMIT 1
"#,
oid,
oid.to_string(),
partition_key,
)
.fetch_optional(transaction.deref_mut())

View file

@ -213,9 +213,9 @@ pub struct CollabId {
impl From<CollabId> for QueryCollabParams {
fn from(value: CollabId) -> Self {
QueryCollabParams {
workspace_id: value.workspace_id.to_string(),
workspace_id: value.workspace_id,
inner: QueryCollab {
object_id: value.object_id.to_string(),
object_id: value.object_id,
collab_type: value.collab_type,
},
}

View file

@ -25,7 +25,7 @@ impl Indexer for DocumentIndexer {
collab: &Collab,
embedding_model: EmbeddingModel,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError> {
let object_id = collab.object_id().to_string();
let object_id = collab.object_id().parse()?;
let document = DocumentBody::from_collab(collab).ok_or_else(|| {
anyhow!(
"Failed to get document body from collab `{}`: schema is missing required fields",
@ -48,7 +48,7 @@ impl Indexer for DocumentIndexer {
fn create_embedded_chunks_from_text(
&self,
object_id: String,
object_id: Uuid,
text: String,
model: EmbeddingModel,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError> {
@ -102,7 +102,7 @@ impl Indexer for DocumentIndexer {
}
fn split_text_into_chunks(
object_id: String,
object_id: Uuid,
content: String,
collab_type: CollabType,
embedding_model: &EmbeddingModel,
@ -118,8 +118,7 @@ fn split_text_into_chunks(
// We assume that every token is ~4 bytes. We're going to split document content into fragments
// of ~2000 tokens each.
let split_contents = split_text_by_max_content_len(content, 8000)?;
let metadata =
json!({"id": object_id, "source": "appflowy", "name": "document", "collab_type": collab_type });
let metadata = json!({"id": object_id.to_string(), "source": "appflowy", "name": "document", "collab_type": collab_type });
Ok(
split_contents
.into_iter()

View file

@ -9,6 +9,7 @@ use infra::env_util::get_env_var;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::info;
use uuid::Uuid;
pub trait Indexer: Send + Sync {
fn create_embedded_chunks_from_collab(
@ -19,7 +20,7 @@ pub trait Indexer: Send + Sync {
fn create_embedded_chunks_from_text(
&self,
object_id: String,
object_id: Uuid,
text: String,
model: EmbeddingModel,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError>;

View file

@ -5,7 +5,7 @@ use uuid::Uuid;
pub struct UnindexedCollab {
pub workspace_id: Uuid,
pub object_id: String,
pub object_id: Uuid,
pub collab_type: CollabType,
pub collab: EncodedCollab,
}

View file

@ -241,8 +241,8 @@ impl IndexerScheduler {
pub async fn index_collab_immediately(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab: &Collab,
collab_type: CollabType,
) -> Result<(), AppError> {
@ -263,8 +263,8 @@ impl IndexerScheduler {
if let Some(text) = text {
if !text.is_empty() {
let pending = UnindexedCollabTask::new(
Uuid::parse_str(workspace_id)?,
Uuid::parse_str(object_id)?,
workspace_id,
object_id,
collab_type.clone(),
UnindexedData::Text(text),
);
@ -342,8 +342,8 @@ async fn spawn_rayon_generate_embeddings(
Ok(embedder) => {
records.into_par_iter().for_each(|record| {
let result = threads.install(|| {
let indexer = indexer_provider.indexer_for(record.collab_type);
match process_collab(&embedder, indexer, &record.object_id, record.data, &metrics) {
let indexer = indexer_provider.indexer_for(&record.collab_type);
match process_collab(&embedder, indexer, record.object_id, record.data, &metrics) {
Ok(Some((tokens_used, contents))) => {
if let Err(err) = write_embedding_tx.send(EmbeddingRecord {
workspace_id: record.workspace_id,
@ -483,14 +483,14 @@ pub(crate) async fn batch_insert_records(
fn process_collab(
embedder: &Embedder,
indexer: Option<Arc<dyn Indexer>>,
object_id: &str,
object_id: Uuid,
data: UnindexedData,
metrics: &EmbeddingMetrics,
) -> Result<Option<(u32, Vec<AFCollabEmbeddedChunk>)>, AppError> {
if let Some(indexer) = indexer {
let chunks = match data {
UnindexedData::Text(text) => {
indexer.create_embedded_chunks_from_text(object_id.to_string(), text, embedder.model())?
indexer.create_embedded_chunks_from_text(object_id, text, embedder.model())?
},
};

View file

@ -173,7 +173,7 @@ async fn create_embeddings(
let indexer = indexer_provider.indexer_for(unindexed.collab_type)?;
let collab = Collab::new_with_source(
CollabOrigin::Empty,
&unindexed.object_id,
&unindexed.object_id.to_string(),
DataSource::DocStateV1(unindexed.collab.doc_state.into()),
vec![],
false,

View file

@ -8,6 +8,8 @@ use collab_rt_entity::user::RealtimeUser;
pub use collab_rt_entity::RealtimeMessage;
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::fmt::Debug;
use uuid::Uuid;
#[derive(Debug, Message, Clone)]
#[rtype(result = "Result<(), RealtimeError>")]
pub struct Connect {
@ -49,8 +51,8 @@ pub struct ClientHttpStreamMessage {
#[rtype(result = "Result<(), AppError>")]
pub struct ClientHttpUpdateMessage {
pub user: RealtimeUser,
pub workspace_id: String,
pub object_id: String,
pub workspace_id: Uuid,
pub object_id: Uuid,
/// Encoded yrs::Update or doc state
pub update: Bytes,
/// If the state_vector is not None, it will calculate missing updates base on
@ -65,7 +67,7 @@ pub struct ClientHttpUpdateMessage {
#[derive(Message)]
#[rtype(result = "Result<(), AppError>")]
pub struct ClientGenerateEmbeddingMessage {
pub workspace_id: String,
pub object_id: String,
pub workspace_id: Uuid,
pub object_id: Uuid,
pub return_tx: Option<tokio::sync::oneshot::Sender<Result<(), AppError>>>,
}

View file

@ -1,15 +1,15 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tokio_stream::StreamExt;
use tracing::{error, trace};
use access_control::collab::RealtimeAccessControl;
use async_trait::async_trait;
use collab_rt_entity::user::RealtimeUser;
use collab_rt_entity::ClientCollabMessage;
use collab_rt_entity::{MessageByObjectId, RealtimeMessage};
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tokio_stream::StreamExt;
use tracing::{error, trace};
use uuid::Uuid;
use crate::util::channel_ext::UnboundedSenderSink;
@ -55,9 +55,9 @@ impl ClientMessageRouter {
///
pub fn init_client_communication<T>(
&mut self,
workspace_id: &str,
workspace_id: Uuid,
user: &RealtimeUser,
object_id: &str,
object_id: Uuid,
access_control: Arc<dyn RealtimeAccessControl>,
) -> (UnboundedSenderSink<T>, ReceiverStream<MessageByObjectId>)
where
@ -65,19 +65,17 @@ impl ClientMessageRouter {
{
let client_ws_sink = self.sink.clone();
let mut stream_rx = BroadcastStream::new(self.stream_tx.subscribe());
let target_object_id = object_id.to_string();
// Send the message to the connected websocket client. When the client receive the message,
// it will apply the changes.
let (client_sink_tx, mut client_sink_rx) = tokio::sync::mpsc::unbounded_channel::<T>();
let sink_access_control = access_control.clone();
let sink_workspace_id = workspace_id.to_string();
let uid = user.uid;
let client_sink = UnboundedSenderSink::<T>::new(client_sink_tx);
tokio::spawn(async move {
while let Some(msg) = client_sink_rx.recv().await {
let result = sink_access_control
.can_read_collab(&sink_workspace_id, &uid, &target_object_id)
.can_read_collab(&workspace_id, &uid, &object_id)
.await;
match result {
Ok(is_allowed) => {
@ -85,7 +83,7 @@ impl ClientMessageRouter {
let rt_msg = msg.into();
client_ws_sink.do_send(rt_msg);
} else {
trace!("user:{} is not allowed to read {}", uid, target_object_id);
trace!("user:{} is not allowed to read {}", uid, object_id);
tokio::time::sleep(Duration::from_secs(2)).await;
}
},
@ -96,14 +94,13 @@ impl ClientMessageRouter {
}
}
});
let target_object_id = object_id.to_string();
let stream_workspace_id = workspace_id.to_string();
let user = user.clone();
// stream_rx continuously receive messages from the websocket client and then
// forward the message to the subscriber which is the broadcast channel [CollabBroadcast].
let (client_msg_rx, rx) = tokio::sync::mpsc::channel(100);
let client_stream = ReceiverStream::new(rx);
tokio::spawn(async move {
let target_object_id = object_id.to_string();
while let Some(Ok(messages_by_oid)) = stream_rx.next().await {
for (message_object_id, original_messages) in messages_by_oid.into_inner() {
// if the message is not for the target object, skip it. The stream_rx receives different
@ -116,9 +113,9 @@ impl ClientMessageRouter {
// valid_messages contains the messages that the user is allowed to apply
// invalid_message contains the messages that the user is not allowed to apply
let (valid_messages, invalid_message) = Self::access_control(
&stream_workspace_id,
&workspace_id,
&user.uid,
&message_object_id,
&object_id,
access_control.clone(),
original_messages,
)
@ -164,9 +161,9 @@ impl ClientMessageRouter {
#[inline]
async fn access_control(
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
object_id: &str,
object_id: &Uuid,
access_control: Arc<dyn RealtimeAccessControl>,
messages: Vec<ClientCollabMessage>,
) -> (Vec<ClientCollabMessage>, Vec<ClientCollabMessage>) {

View file

@ -1,14 +1,13 @@
use async_trait::async_trait;
use std::sync::Arc;
use uuid::Uuid;
use crate::collab::cache::CollabCache;
use access_control::act::Action;
use access_control::collab::CollabAccessControl;
use access_control::workspace::WorkspaceAccessControl;
use app_error::AppError;
use async_trait::async_trait;
use database::collab::CollabStorageAccessControl;
use database_entity::dto::AFAccessLevel;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Clone)]
pub struct CollabStorageAccessControlImpl {
@ -22,7 +21,7 @@ impl CollabStorageAccessControl for CollabStorageAccessControlImpl {
async fn update_policy(
&self,
uid: &i64,
oid: &str,
oid: &Uuid,
level: AFAccessLevel,
) -> Result<(), AppError> {
self
@ -33,9 +32,9 @@ impl CollabStorageAccessControl for CollabStorageAccessControlImpl {
async fn enforce_read_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<(), AppError> {
let collab_exists = self.cache.is_exist(workspace_id, oid).await?;
if !collab_exists {
@ -51,9 +50,9 @@ impl CollabStorageAccessControl for CollabStorageAccessControlImpl {
async fn enforce_write_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
oid: &str,
oid: &Uuid,
) -> Result<(), AppError> {
let collab_exists = self.cache.is_exist(workspace_id, oid).await?;
if !collab_exists {
@ -67,15 +66,19 @@ impl CollabStorageAccessControl for CollabStorageAccessControlImpl {
.await
}
async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &str) -> Result<(), AppError> {
let workspace_id = Uuid::parse_str(workspace_id)?;
async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &Uuid) -> Result<(), AppError> {
self
.workspace_access_control
.enforce_action(uid, workspace_id, Action::Write)
.await
}
async fn enforce_delete(&self, workspace_id: &str, uid: &i64, oid: &str) -> Result<(), AppError> {
async fn enforce_delete(
&self,
workspace_id: &Uuid,
uid: &i64,
oid: &Uuid,
) -> Result<(), AppError> {
self
.collab_access_control
.enforce_access_level(workspace_id, uid, oid, AFAccessLevel::FullAccess)

View file

@ -1,19 +1,19 @@
use super::disk_cache::CollabDiskCache;
use super::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache};
use crate::CollabMetrics;
use app_error::AppError;
use bytes::Bytes;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use database::file::s3_client_impl::AwsS3BucketClientImpl;
use database_entity::dto::{CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult};
use futures_util::{stream, StreamExt};
use itertools::{Either, Itertools};
use sqlx::{PgPool, Transaction};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{error, event, Level};
use super::disk_cache::CollabDiskCache;
use super::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache};
use crate::CollabMetrics;
use app_error::AppError;
use database::file::s3_client_impl::AwsS3BucketClientImpl;
use database_entity::dto::{CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult};
use uuid::{Error, Uuid};
#[derive(Clone)]
pub struct CollabCache {
@ -48,7 +48,7 @@ impl CollabCache {
pub async fn bulk_insert_collab(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params_list: Vec<CollabParams>,
) -> Result<(), AppError> {
@ -86,7 +86,7 @@ impl CollabCache {
pub async fn get_encode_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
// Attempt to retrieve encoded collab from memory cache, falling back to disk cache if necessary.
@ -123,9 +123,9 @@ impl CollabCache {
/// returns a hashmap of the object_id to the encoded collab data.
pub async fn batch_get_encode_collab<T: Into<QueryCollab>>(
&self,
workspace_id: &str,
workspace_id: &Uuid,
queries: Vec<T>,
) -> HashMap<String, QueryCollabResult> {
) -> HashMap<Uuid, QueryCollabResult> {
let queries = queries.into_iter().map(Into::into).collect::<Vec<_>>();
let mut results = HashMap::new();
// 1. Processes valid queries against the in-memory cache to retrieve cached values.
@ -166,7 +166,7 @@ impl CollabCache {
/// The data is inserted into both the memory and disk cache.
pub async fn insert_encode_collab_data(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
params: CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
@ -192,7 +192,7 @@ impl CollabCache {
Ok(())
}
fn cache_collab(&self, object_id: String, collab_type: CollabType, encode_collab_data: Bytes) {
fn cache_collab(&self, object_id: Uuid, collab_type: CollabType, encode_collab_data: Bytes) {
let mem_cache = self.mem_cache.clone();
tokio::spawn(async move {
if let Err(err) = mem_cache
@ -214,7 +214,7 @@ impl CollabCache {
pub async fn insert_encode_collab_to_disk(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
params: CollabParams,
) -> Result<(), AppError> {
@ -227,7 +227,7 @@ impl CollabCache {
Ok(())
}
pub async fn delete_collab(&self, workspace_id: &str, object_id: &str) -> Result<(), AppError> {
pub async fn delete_collab(&self, workspace_id: &Uuid, object_id: &Uuid) -> Result<(), AppError> {
self.mem_cache.remove_encode_collab(object_id).await?;
self
.disk_cache
@ -236,7 +236,7 @@ impl CollabCache {
Ok(())
}
pub async fn is_exist(&self, workspace_id: &str, oid: &str) -> Result<bool, AppError> {
pub async fn is_exist(&self, workspace_id: &Uuid, oid: &Uuid) -> Result<bool, AppError> {
if let Ok(value) = self.mem_cache.is_exist(oid).await {
if value {
return Ok(value);

View file

@ -1,18 +1,9 @@
use anyhow::{anyhow, Context};
use bytes::Bytes;
use collab::entity::{EncodedCollab, EncoderVersion};
use sqlx::{Error, PgPool, Transaction};
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
use tokio::time::sleep;
use tracing::{error, instrument};
use crate::collab::cache::encode_collab_from_bytes;
use crate::CollabMetrics;
use anyhow::{anyhow, Context};
use app_error::AppError;
use bytes::Bytes;
use collab::entity::{EncodedCollab, EncoderVersion};
use database::collab::{
batch_select_collab_blob, insert_into_af_collab, insert_into_af_collab_bulk_for_user,
is_collab_exists, select_blob_from_af_collab, AppResult,
@ -22,6 +13,15 @@ use database::file::{BucketClient, ResponseBlob};
use database_entity::dto::{
CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult, ZSTD_COMPRESSION_LEVEL,
};
use sqlx::{Error, PgPool, Transaction};
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
use tokio::time::sleep;
use tracing::{error, instrument};
use uuid::Uuid;
#[derive(Clone)]
pub struct CollabDiskCache {
@ -46,7 +46,7 @@ impl CollabDiskCache {
}
}
pub async fn is_exist(&self, workspace_id: &str, object_id: &str) -> AppResult<bool> {
pub async fn is_exist(&self, workspace_id: &Uuid, object_id: &Uuid) -> AppResult<bool> {
let dir = collab_key_prefix(workspace_id, object_id);
let resp = self.s3.list_dir(&dir, 1).await?;
if resp.is_empty() {
@ -59,7 +59,7 @@ impl CollabDiskCache {
pub async fn upsert_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
params: CollabParams,
) -> AppResult<()> {
@ -100,7 +100,7 @@ impl CollabDiskCache {
}
pub async fn upsert_collab_with_transaction(
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
mut params: CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
@ -133,7 +133,7 @@ impl CollabDiskCache {
#[instrument(level = "trace", skip_all)]
pub async fn get_collab_encoded_from_disk(
&self,
workspace_id: &str,
workspace_id: &Uuid,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
tracing::debug!("try get {}:{} from s3", query.collab_type, query.object_id);
@ -205,7 +205,7 @@ impl CollabDiskCache {
//FIXME: this and `batch_insert_collab` duplicate similar logic.
pub async fn bulk_insert_collab(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
mut params_list: Vec<CollabParams>,
) -> Result<(), AppError> {
@ -216,7 +216,7 @@ impl CollabDiskCache {
let mut delete_from_s3 = Vec::new();
let mut blobs = HashMap::new();
for param in params_list.iter_mut() {
let key = collab_key(workspace_id, &param.object_id);
let key = collab_key(&workspace_id, &param.object_id);
if param.encoded_collab_v1.len() > self.s3_collab_threshold {
let blob = std::mem::take(&mut param.encoded_collab_v1);
blobs.insert(key, blob);
@ -315,9 +315,9 @@ impl CollabDiskCache {
pub async fn batch_get_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
queries: Vec<QueryCollab>,
) -> HashMap<String, QueryCollabResult> {
) -> HashMap<Uuid, QueryCollabResult> {
let mut results = HashMap::new();
let not_found = batch_get_collab_from_s3(&self.s3, workspace_id, queries, &mut results).await;
let s3_fetch = results.len() as u64;
@ -328,7 +328,7 @@ impl CollabDiskCache {
results
}
pub async fn delete_collab(&self, workspace_id: &str, object_id: &str) -> AppResult<()> {
pub async fn delete_collab(&self, workspace_id: &Uuid, object_id: &Uuid) -> AppResult<()> {
sqlx::query!(
r#"
UPDATE af_collab
@ -420,19 +420,19 @@ async fn batch_put_collab_to_s3(
async fn batch_get_collab_from_s3(
s3: &AwsS3BucketClientImpl,
workspace_id: &str,
workspace_id: &Uuid,
params: Vec<QueryCollab>,
results: &mut HashMap<String, QueryCollabResult>,
results: &mut HashMap<Uuid, QueryCollabResult>,
) -> Vec<QueryCollab> {
enum GetResult {
Found(String, Vec<u8>),
Found(Uuid, Vec<u8>),
NotFound(QueryCollab),
Error(String, String),
Error(Uuid, String),
}
async fn gather(
join_set: &mut JoinSet<GetResult>,
results: &mut HashMap<String, QueryCollabResult>,
results: &mut HashMap<Uuid, QueryCollabResult>,
not_found: &mut Vec<QueryCollab>,
) {
while let Some(result) = join_set.join_next().await {
@ -499,11 +499,11 @@ async fn batch_get_collab_from_s3(
not_found
}
fn collab_key_prefix(workspace_id: &str, object_id: &str) -> String {
fn collab_key_prefix(workspace_id: &Uuid, object_id: &Uuid) -> String {
format!("collabs/{}/{}/", workspace_id, object_id)
}
fn collab_key(workspace_id: &str, object_id: &str) -> String {
fn collab_key(workspace_id: &Uuid, object_id: &Uuid) -> String {
format!(
"collabs/{}/{}/encoded_collab.v1.zstd",
workspace_id, object_id

View file

@ -1,14 +1,14 @@
use crate::collab::cache::encode_collab_from_bytes;
use crate::CollabMetrics;
use anyhow::anyhow;
use app_error::AppError;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use database::collab::CollabMetadata;
use redis::{pipe, AsyncCommands};
use std::sync::Arc;
use tracing::{error, instrument, trace};
use crate::collab::cache::encode_collab_from_bytes;
use crate::CollabMetrics;
use app_error::AppError;
use database::collab::CollabMetadata;
use uuid::Uuid;
const SEVEN_DAYS: u64 = 604800;
const ONE_MONTH: u64 = 2592000;
@ -43,7 +43,7 @@ impl CollabMemCache {
Ok(())
}
pub async fn get_collab_meta(&self, object_id: &str) -> Result<CollabMetadata, AppError> {
pub async fn get_collab_meta(&self, object_id: &Uuid) -> Result<CollabMetadata, AppError> {
let key = collab_meta_key(object_id);
let value: Option<String> = self
.connection_manager
@ -66,7 +66,7 @@ impl CollabMemCache {
}
/// Checks if an object with the given ID exists in the cache.
pub async fn is_exist(&self, object_id: &str) -> Result<bool, AppError> {
pub async fn is_exist(&self, object_id: &Uuid) -> Result<bool, AppError> {
let cache_object_id = encode_collab_key(object_id);
let exists: bool = self
.connection_manager
@ -77,7 +77,7 @@ impl CollabMemCache {
Ok(exists)
}
pub async fn remove_encode_collab(&self, object_id: &str) -> Result<(), AppError> {
pub async fn remove_encode_collab(&self, object_id: &Uuid) -> Result<(), AppError> {
let cache_object_id = encode_collab_key(object_id);
self
.connection_manager
@ -92,7 +92,7 @@ impl CollabMemCache {
})
}
pub async fn get_encode_collab_data(&self, object_id: &str) -> Option<Vec<u8>> {
pub async fn get_encode_collab_data(&self, object_id: &Uuid) -> Option<Vec<u8>> {
match self.get_data_with_timestamp(object_id).await {
Ok(None) => None,
Ok(Some((_, bytes))) => Some(bytes),
@ -104,7 +104,7 @@ impl CollabMemCache {
}
#[instrument(level = "trace", skip_all)]
pub async fn get_encode_collab(&self, object_id: &str) -> Option<EncodedCollab> {
pub async fn get_encode_collab(&self, object_id: &Uuid) -> Option<EncodedCollab> {
match self.get_encode_collab_data(object_id).await {
Some(bytes) => encode_collab_from_bytes(bytes).await.ok(),
None => {
@ -120,7 +120,7 @@ impl CollabMemCache {
#[instrument(level = "trace", skip_all, fields(object_id=%object_id))]
pub async fn insert_encode_collab(
&self,
object_id: &str,
object_id: &Uuid,
encoded_collab: EncodedCollab,
timestamp: i64,
expiration_seconds: u64,
@ -149,7 +149,7 @@ impl CollabMemCache {
/// if the expiration_seconds is None, the data will be expired after 7 days.
pub async fn insert_encode_collab_data(
&self,
object_id: &str,
object_id: &Uuid,
data: &[u8],
timestamp: i64,
expiration_seconds: Option<u64>,
@ -175,7 +175,7 @@ impl CollabMemCache {
/// A Redis result indicating the success or failure of the operation.
async fn insert_data_with_timestamp(
&self,
object_id: &str,
object_id: &Uuid,
data: &[u8],
timestamp: i64,
expiration_seconds: Option<u64>,
@ -257,7 +257,7 @@ impl CollabMemCache {
/// The function returns `Ok(None)` if no data is found for the given `object_id`.
async fn get_data_with_timestamp(
&self,
object_id: &str,
object_id: &Uuid,
) -> redis::RedisResult<Option<(i64, Vec<u8>)>> {
let cache_object_id = encode_collab_key(object_id);
let mut conn = self.connection_manager.clone();
@ -296,12 +296,12 @@ impl CollabMemCache {
/// changing the prefix, allowing the old data to expire naturally.
///
#[inline]
fn encode_collab_key(object_id: &str) -> String {
fn encode_collab_key(object_id: &Uuid) -> String {
format!("encode_collab_v0:{}", object_id)
}
#[inline]
fn collab_meta_key(object_id: &str) -> String {
fn collab_meta_key(object_id: &Uuid) -> String {
format!("collab_meta_v0:{}", object_id)
}

View file

@ -93,7 +93,7 @@ where
async fn insert_collab(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
params: CollabParams,
) -> AppResult<()> {
@ -106,7 +106,7 @@ where
async fn check_write_workspace_permission(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
) -> Result<(), AppError> {
// If the collab doesn't exist, check if the user has enough permissions to create collab.
@ -120,9 +120,9 @@ where
async fn check_write_collab_permission(
&self,
workspace_id: &str,
workspace_id: &Uuid,
uid: &i64,
object_id: &str,
object_id: &Uuid,
) -> Result<(), AppError> {
// If the collab already exists, check if the user has enough permissions to update collab
self
@ -131,8 +131,7 @@ where
.await?;
Ok(())
}
async fn get_encode_collab_from_editing(&self, oid: &str) -> Option<EncodedCollab> {
let object_id = oid.to_string();
async fn get_encode_collab_from_editing(&self, object_id: Uuid) -> Option<EncodedCollab> {
let (ret, rx) = tokio::sync::oneshot::channel();
let timeout_duration = Duration::from_secs(5);
@ -153,20 +152,20 @@ where
match timeout(timeout_duration, rx).await {
Ok(Ok(Some(encode_collab))) => Some(encode_collab),
Ok(Ok(None)) => {
trace!("Editing collab not found: `{}`", oid);
trace!("Editing collab not found: `{}`", object_id);
None
},
Ok(Err(err)) => {
error!(
"Failed to get collab from realtime server `{}`: {}",
oid, err
object_id, err
);
None
},
Err(_) => {
error!(
"Timeout trying to read collab `{}` from realtime server",
oid
object_id
);
None
},
@ -175,8 +174,8 @@ where
async fn batch_get_encode_collab_from_editing(
&self,
object_ids: Vec<String>,
) -> HashMap<String, EncodedCollab> {
object_ids: Vec<Uuid>,
) -> HashMap<Uuid, EncodedCollab> {
let (ret, rx) = tokio::sync::oneshot::channel();
let timeout_duration = Duration::from_secs(10);
@ -209,7 +208,7 @@ where
async fn queue_insert_collab(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params: CollabParams,
) -> Result<(), AppError> {
@ -225,7 +224,7 @@ where
)));
}
let pending = PendingCollabWrite::new(workspace_id.into(), *uid, params);
let pending = PendingCollabWrite::new(workspace_id, *uid, params);
if let Err(e) = self.queue.send(pending).await {
error!("Failed to queue insert collab doc state: {}", e);
}
@ -234,7 +233,7 @@ where
async fn batch_insert_collabs(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params_list: Vec<CollabParams>,
) -> Result<(), AppError> {
@ -250,7 +249,7 @@ where
/// * `collab_messages` - The list of collab messages to broadcast.
pub async fn broadcast_encode_collab(
&self,
object_id: String,
object_id: Uuid,
collab_messages: Vec<ClientCollabMessage>,
) -> Result<(), AppError> {
let (sender, recv) = tokio::sync::oneshot::channel();
@ -291,22 +290,25 @@ where
{
async fn queue_insert_or_update_collab(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params: CollabParams,
flush_to_disk: bool,
) -> AppResult<()> {
params.validate()?;
let is_exist = self.cache.is_exist(workspace_id, &params.object_id).await?;
let is_exist = self
.cache
.is_exist(&workspace_id, &params.object_id)
.await?;
// If the collab already exists, check if the user has enough permissions to update collab
// Otherwise, check if the user has enough permissions to create collab.
if is_exist {
self
.check_write_collab_permission(workspace_id, uid, &params.object_id)
.check_write_collab_permission(&workspace_id, uid, &params.object_id)
.await?;
} else {
self
.check_write_workspace_permission(workspace_id, uid)
.check_write_workspace_permission(&workspace_id, uid)
.await?;
trace!(
"Update policy for user:{} to create collab:{}",
@ -319,7 +321,7 @@ where
.await?;
}
if flush_to_disk {
self.insert_collab(workspace_id, uid, params).await?;
self.insert_collab(&workspace_id, uid, params).await?;
} else {
self.queue_insert_collab(workspace_id, uid, params).await?;
}
@ -328,12 +330,12 @@ where
async fn batch_insert_new_collab(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params_list: Vec<CollabParams>,
) -> AppResult<()> {
self
.check_write_workspace_permission(workspace_id, uid)
.check_write_workspace_permission(&workspace_id, uid)
.await?;
// TODO(nathan): batch insert permission
@ -362,7 +364,7 @@ where
#[allow(clippy::blocks_in_conditions)]
async fn upsert_new_collab_with_transaction(
&self,
workspace_id: &str,
workspace_id: Uuid,
uid: &i64,
params: CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
@ -370,7 +372,7 @@ where
) -> AppResult<()> {
params.validate()?;
self
.check_write_workspace_permission(workspace_id, uid)
.check_write_workspace_permission(&workspace_id, uid)
.await?;
self
.access_control
@ -381,7 +383,7 @@ where
Duration::from_secs(120),
self
.cache
.insert_encode_collab_data(workspace_id, uid, params, transaction),
.insert_encode_collab_data(&workspace_id, uid, params, transaction),
)
.await
{
@ -419,7 +421,7 @@ where
// Early return if editing collab is initialized, as it indicates no need to query further.
if from_editing_collab {
// Attempt to retrieve encoded collab from the editing collab
if let Some(value) = self.get_encode_collab_from_editing(&params.object_id).await {
if let Some(value) = self.get_encode_collab_from_editing(params.object_id).await {
trace!(
"Did get encode collab {} from editing collab",
params.object_id
@ -438,7 +440,7 @@ where
async fn batch_get_collab(
&self,
_uid: &i64,
workspace_id: &str,
workspace_id: Uuid,
queries: Vec<QueryCollab>,
from_editing_collab: bool,
) -> HashMap<String, QueryCollabResult> {
@ -462,14 +464,9 @@ where
let cache_queries = if from_editing_collab {
let editing_queries = valid_queries.clone();
let editing_results = self
.batch_get_encode_collab_from_editing(
editing_queries
.iter()
.map(|q| q.object_id.clone())
.collect(),
)
.batch_get_encode_collab_from_editing(editing_queries.iter().map(|q| q.object_id).collect())
.await;
let editing_query_collab_results: HashMap<String, QueryCollabResult> =
let editing_query_collab_results: HashMap<Uuid, QueryCollabResult> =
tokio::task::spawn_blocking(move || {
let par_iter = editing_results.into_par_iter();
par_iter
@ -490,7 +487,7 @@ where
})
.await
.unwrap();
let editing_object_ids: Vec<String> = editing_query_collab_results.keys().cloned().collect();
let editing_object_ids: Vec<_> = editing_query_collab_results.keys().cloned().collect();
results.extend(editing_query_collab_results);
valid_queries
.into_iter()
@ -503,13 +500,16 @@ where
results.extend(
self
.cache
.batch_get_encode_collab(workspace_id, cache_queries)
.batch_get_encode_collab(&workspace_id, cache_queries)
.await,
);
results
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect()
}
async fn delete_collab(&self, workspace_id: &str, uid: &i64, object_id: &str) -> AppResult<()> {
async fn delete_collab(&self, workspace_id: &Uuid, uid: &i64, object_id: &Uuid) -> AppResult<()> {
self
.access_control
.enforce_delete(workspace_id, uid, object_id)
@ -518,7 +518,11 @@ where
Ok(())
}
async fn should_create_snapshot(&self, workspace_id: &str, oid: &str) -> Result<bool, AppError> {
async fn should_create_snapshot(
&self,
workspace_id: &Uuid,
oid: &Uuid,
) -> Result<bool, AppError> {
self
.snapshot_control
.should_create_snapshot(workspace_id, oid)
@ -535,8 +539,8 @@ where
async fn get_collab_snapshot(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
snapshot_id: &i64,
) -> AppResult<SnapshotData> {
self
@ -547,8 +551,8 @@ where
async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) -> AppResult<Option<SnapshotData>> {
self
@ -559,8 +563,8 @@ where
async fn get_collab_snapshot_list(
&self,
workspace_id: &str,
oid: &str,
workspace_id: &Uuid,
oid: &Uuid,
) -> AppResult<AFSnapshotMetas> {
self
.snapshot_control

View file

@ -15,22 +15,24 @@ use std::{
sync::{Arc, Weak},
};
use tracing::error;
use uuid::Uuid;
pub type CLCommandSender = tokio::sync::mpsc::Sender<CollaborationCommand>;
pub type CLCommandReceiver = tokio::sync::mpsc::Receiver<CollaborationCommand>;
pub type EncodeCollabSender = tokio::sync::oneshot::Sender<Option<EncodedCollab>>;
pub type BatchEncodeCollabSender = tokio::sync::oneshot::Sender<HashMap<String, EncodedCollab>>;
pub type BatchEncodeCollabSender = tokio::sync::oneshot::Sender<HashMap<Uuid, EncodedCollab>>;
pub enum CollaborationCommand {
GetEncodeCollab {
object_id: String,
object_id: Uuid,
ret: EncodeCollabSender,
},
BatchGetEncodeCollab {
object_ids: Vec<String>,
object_ids: Vec<Uuid>,
ret: BatchEncodeCollabSender,
},
ServerSendCollabMessage {
object_id: String,
object_id: Uuid,
collab_messages: Vec<ClientCollabMessage>,
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
},
@ -40,7 +42,7 @@ const BATCH_GET_ENCODE_COLLAB_CONCURRENCY: usize = 10;
pub(crate) fn spawn_collaboration_command<S>(
mut command_recv: CLCommandReceiver,
group_sender_by_object_id: &Arc<DashMap<String, GroupCommandSender>>,
group_sender_by_object_id: &Arc<DashMap<Uuid, GroupCommandSender>>,
weak_groups: Weak<GroupManager<S>>,
) where
S: CollabStorage,
@ -53,10 +55,7 @@ pub(crate) fn spawn_collaboration_command<S>(
match group_sender_by_object_id.get(&object_id) {
Some(sender) => {
if let Err(err) = sender
.send(GroupCommand::EncodeCollab {
object_id: object_id.clone(),
ret,
})
.send(GroupCommand::EncodeCollab { object_id, ret })
.await
{
error!("Send group command error: {}", err);
@ -85,7 +84,7 @@ pub(crate) fn spawn_collaboration_command<S>(
.collect::<Vec<_>>()
.await;
let mut outputs: HashMap<String, EncodedCollab> = HashMap::new();
let mut outputs: HashMap<_, EncodedCollab> = HashMap::new();
for (object_id, encoded_collab) in tasks.into_iter().flatten() {
if let Some(encoded_collab) = encoded_collab {
outputs.insert(object_id, encoded_collab);

View file

@ -81,6 +81,9 @@ pub enum RealtimeError {
#[error("failed to send ws message: {0}")]
SendWSMessageFailed(String),
#[error("failed to parse UUID: {0}")]
Uuid(#[from] uuid::Error),
}
#[derive(Debug)]

View file

@ -20,6 +20,7 @@ use collab_rt_entity::{
use collab_rt_protocol::{Message, SyncMessage};
use database::collab::CollabStorage;
use tracing::{error, instrument, trace, warn};
use uuid::Uuid;
use yrs::updates::encoder::Encode;
use yrs::StateVector;
@ -30,32 +31,32 @@ use yrs::StateVector;
pub enum GroupCommand {
HandleClientCollabMessage {
user: RealtimeUser,
object_id: String,
object_id: Uuid,
collab_messages: Vec<ClientCollabMessage>,
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
},
HandleClientHttpUpdate {
user: RealtimeUser,
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
update: Bytes,
collab_type: CollabType,
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
},
EncodeCollab {
object_id: String,
object_id: Uuid,
ret: tokio::sync::oneshot::Sender<Option<EncodedCollab>>,
},
HandleServerCollabMessage {
object_id: String,
object_id: Uuid,
collab_messages: Vec<ClientCollabMessage>,
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
},
GenerateCollabEmbedding {
object_id: String,
object_id: Uuid,
},
CalculateMissingUpdate {
object_id: String,
object_id: Uuid,
state_vector: StateVector,
ret: tokio::sync::oneshot::Sender<Result<Vec<u8>, RealtimeError>>,
},
@ -81,7 +82,7 @@ impl<S> GroupCommandRunner<S>
where
S: CollabStorage,
{
pub async fn run(mut self, object_id: String) {
pub async fn run(mut self, object_id: Uuid) {
let mut receiver = self.recv.take().expect("Only take once");
let stream = stream! {
while let Some(msg) = receiver.recv().await {
@ -135,13 +136,7 @@ where
ret,
} => {
let result = self
.handle_client_posted_http_update(
&user,
&workspace_id,
&object_id,
collab_type,
update,
)
.handle_client_posted_http_update(&user, workspace_id, object_id, collab_type, update)
.await;
if let Err(err) = ret.send(result) {
warn!("Send handle client update message result fail: {:?}", err);
@ -163,7 +158,7 @@ where
let group = self.group_manager.get_group(&object_id).await;
match group {
None => {
let _ = ret.send(Err(RealtimeError::GroupNotFound(object_id.clone())));
let _ = ret.send(Err(RealtimeError::GroupNotFound(object_id.to_string())));
},
Some(group) => {
let result = group.calculate_missing_update(state_vector).await;
@ -191,7 +186,7 @@ where
async fn handle_client_collab_message(
&self,
user: &RealtimeUser,
object_id: String,
object_id: Uuid,
messages: Vec<ClientCollabMessage>,
) -> Result<(), RealtimeError> {
if messages.is_empty() {
@ -260,8 +255,8 @@ where
async fn handle_client_posted_http_update(
&self,
user: &RealtimeUser,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: collab_entity::CollabType,
update: Bytes,
) -> Result<(), RealtimeError> {
@ -281,7 +276,7 @@ where
}
// Create group if it's not exist
let is_group_exist = self.group_manager.contains_group(object_id);
let is_group_exist = self.group_manager.contains_group(&object_id);
if !is_group_exist {
trace!("The group:{} is not found, create a new group", object_id);
self
@ -290,7 +285,7 @@ where
}
// Only subscribe when the user is not subscribed to the group
if !self.group_manager.contains_user(object_id, user) {
if !self.group_manager.contains_user(&object_id, user) {
self.subscribe_group(user, object_id, &origin).await?;
}
if let Some(client_stream) = self.msg_router_by_user.get(user) {
@ -322,7 +317,7 @@ where
#[instrument(level = "trace", skip_all)]
async fn handle_server_collab_messages(
&self,
object_id: String,
object_id: Uuid,
messages: Vec<ClientCollabMessage>,
) -> Result<(), RealtimeError> {
if messages.is_empty() {
@ -346,7 +341,7 @@ where
NullSender::default(),
message_by_oid_receiver,
);
let message = HashMap::from([(object_id.clone(), messages)]);
let message = HashMap::from([(object_id.to_string(), messages)]);
if let Err(err) = message_by_oid_sender.try_send(MessageByObjectId(message)) {
error!(
"failed to send message to group: {}, object_id: {}",
@ -363,7 +358,7 @@ where
user: &RealtimeUser,
collab_message: &ClientCollabMessage,
) -> Result<(), RealtimeError> {
let object_id = collab_message.object_id();
let object_id = Uuid::parse_str(collab_message.object_id())?;
let message_origin = collab_message.origin();
self.subscribe_group(user, object_id, message_origin).await
}
@ -371,7 +366,7 @@ where
async fn subscribe_group(
&self,
user: &RealtimeUser,
object_id: &str,
object_id: Uuid,
collab_origin: &CollabOrigin,
) -> Result<(), RealtimeError> {
match self.msg_router_by_user.get_mut(user) {
@ -399,11 +394,12 @@ where
user: &RealtimeUser,
collab_message: &ClientCollabMessage,
) -> Result<(), RealtimeError> {
let object_id = collab_message.object_id();
let object_id = Uuid::parse_str(collab_message.object_id())?;
match collab_message {
ClientCollabMessage::ClientInitSync { data, .. } => {
let workspace_id = Uuid::parse_str(&data.workspace_id)?;
self
.create_group(user, &data.workspace_id, object_id, data.collab_type)
.create_group(user, workspace_id, object_id, data.collab_type.clone())
.await?;
Ok(())
},
@ -415,8 +411,8 @@ where
async fn create_group(
&self,
user: &RealtimeUser,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: collab_entity::CollabType,
) -> Result<(), RealtimeError> {
self
@ -433,7 +429,7 @@ where
#[inline]
pub async fn forward_message_to_group(
user: &RealtimeUser,
object_id: String,
object_id: Uuid,
collab_messages: Vec<ClientCollabMessage>,
client_msg_router: &Arc<DashMap<RealtimeUser, ClientMessageRouter>>,
) {
@ -448,7 +444,7 @@ pub async fn forward_message_to_group(
.map(|v| v.msg_id())
.collect::<Vec<_>>()
);
let message = MessageByObjectId::new_with_message(object_id, collab_messages);
let message = MessageByObjectId::new_with_message(object_id.to_string(), collab_messages);
let err = client_stream.stream_tx.send(message);
if let Err(err) = err {
warn!("Send user:{} message to group:{}", user.uid, err);

View file

@ -49,8 +49,8 @@ pub struct CollabGroup {
/// Inner state of [CollabGroup] that's private and hidden behind Arc, so that it can be moved into
/// tasks.
struct CollabGroupState {
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
/// A list of subscribers to this group. Each subscriber will receive updates from the
/// broadcast.
@ -77,8 +77,8 @@ impl CollabGroup {
#[allow(clippy::too_many_arguments)]
pub async fn new<S>(
uid: i64,
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
metrics: Arc<CollabRealtimeMetrics>,
storage: Arc<S>,
@ -160,13 +160,13 @@ impl CollabGroup {
}
#[inline]
pub fn workspace_id(&self) -> &str {
pub fn workspace_id(&self) -> &Uuid {
&self.state.workspace_id
}
#[inline]
#[allow(dead_code)]
pub fn object_id(&self) -> &str {
pub fn object_id(&self) -> &Uuid {
&self.state.object_id
}
@ -233,7 +233,7 @@ impl CollabGroup {
seq_num
);
let payload = Message::Sync(SyncMessage::Update(update.data)).encode_v1();
let message = BroadcastSync::new(update.sender, state.object_id.clone(), payload, seq_num);
let message = BroadcastSync::new(update.sender, state.object_id.to_string(), payload, seq_num);
for mut e in state.subscribers.iter_mut() {
let subscription = e.value_mut();
if message.origin == subscription.collab_origin {
@ -290,7 +290,7 @@ impl CollabGroup {
);
let sender = update.sender;
let message = AwarenessSync::new(
state.object_id.clone(),
state.object_id.to_string(),
Message::Awareness(update.data.encode_v1()).encode_v1(),
CollabOrigin::Empty,
);
@ -355,15 +355,15 @@ impl CollabGroup {
.map_err(|e| AppError::Internal(e.into()))?;
let collab = Collab::new_with_source(
CollabOrigin::Server,
self.object_id(),
&self.object_id().to_string(),
DataSource::DocStateV1(collab.doc_state.into()),
vec![],
false,
)
.map_err(|e| AppError::Internal(e.into()))?;
let workspace_id = &self.state.workspace_id;
let object_id = &self.state.object_id;
let collab_type = self.state.collab_type;
let workspace_id = self.state.workspace_id;
let object_id = self.state.object_id;
let collab_type = &self.state.collab_type;
self
.state
.persister
@ -387,7 +387,7 @@ impl CollabGroup {
let encoded_collab = self.encode_collab().await?;
let collab = Collab::new_with_source(
CollabOrigin::Server,
self.object_id(),
&self.object_id().to_string(),
DataSource::DocStateV1(encoded_collab.doc_state.into()),
vec![],
false,
@ -519,8 +519,9 @@ impl CollabGroup {
where
Sink: SubscriptionSink + 'static,
{
let object_id = state.object_id.to_string();
for (message_object_id, messages) in msg.0 {
if state.object_id != message_object_id {
if object_id != message_object_id {
error!(
"Expect object id:{} but got:{}",
state.object_id, message_object_id
@ -869,8 +870,8 @@ impl Drop for Subscription {
struct CollabPersister {
uid: i64,
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>,
@ -887,8 +888,8 @@ impl CollabPersister {
#[allow(clippy::too_many_arguments)]
pub async fn new(
uid: i64,
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>,
@ -954,7 +955,12 @@ impl CollabPersister {
let start = Instant::now();
let mut collab = match self.load_collab_full().await? {
Some(collab) => collab,
None => Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false),
None => Collab::new_with_origin(
CollabOrigin::Server,
self.object_id.to_string(),
vec![],
false,
),
};
self.metrics.load_collab_count.inc();
@ -1016,9 +1022,12 @@ impl CollabPersister {
if collab.is_none() {
collab = Some(match self.load_collab_full().await? {
Some(collab) => collab,
None => {
Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false)
},
None => Collab::new_with_origin(
CollabOrigin::Server,
self.object_id.to_string(),
vec![],
false,
),
})
};
let collab = collab.as_mut().unwrap();
@ -1092,7 +1101,7 @@ impl CollabPersister {
// perform snapshot at the same time, so we'll use lease to let only one of them atm.
if let Some(mut lease) = self
.collab_redis_stream
.lease(&self.workspace_id, &self.object_id)
.lease(&self.workspace_id.to_string(), &self.object_id.to_string())
.await?
{
let doc_state_light = collab
@ -1149,32 +1158,30 @@ impl CollabPersister {
.metrics
.collab_size
.observe(encoded_collab.len() as f64);
let params = CollabParams::new(&self.object_id, self.collab_type, encoded_collab);
let params = CollabParams::new(self.object_id, self.collab_type.clone(), encoded_collab);
self
.storage
.queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, true)
.queue_insert_or_update_collab(self.workspace_id, &self.uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
Ok(())
}
fn index_collab_content(&self, text: String) {
if let Ok(workspace_id) = Uuid::parse_str(&self.workspace_id) {
let indexed_collab = UnindexedCollabTask::new(
workspace_id,
self.object_id.clone(),
self.collab_type,
UnindexedData::Text(text),
let indexed_collab = UnindexedCollabTask::new(
self.workspace_id,
self.object_id,
self.collab_type.clone(),
UnindexedData::Text(text),
);
if let Err(err) = self
.indexer_scheduler
.index_pending_collab_one(indexed_collab, false)
{
warn!(
"failed to index collab `{}/{}`: {}",
self.workspace_id, self.object_id, err
);
if let Err(err) = self
.indexer_scheduler
.index_pending_collab_one(indexed_collab, false)
{
warn!(
"failed to index collab `{}/{}`: {}",
self.workspace_id, self.object_id, err
);
}
}
}
@ -1197,7 +1204,7 @@ impl CollabPersister {
let collab: Collab = Collab::new_with_source(
CollabOrigin::Server,
&self.object_id,
&self.object_id.to_string(),
DataSource::DocStateV1(doc_state.into()),
vec![],
false,

View file

@ -14,6 +14,7 @@ use collab_stream::client::CollabRedisStream;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::QueryCollabParams;
use tracing::{instrument, trace};
use uuid::Uuid;
use yrs::{ReadTxn, StateVector};
use crate::client::client_msg_router::ClientMessageRouter;
@ -61,11 +62,11 @@ where
})
}
pub fn get_inactive_groups(&self) -> Vec<String> {
pub fn get_inactive_groups(&self) -> Vec<Uuid> {
self.state.remove_inactive_groups()
}
pub fn contains_user(&self, object_id: &str, user: &RealtimeUser) -> bool {
pub fn contains_user(&self, object_id: &Uuid, user: &RealtimeUser) -> bool {
self.state.contains_user(object_id, user)
}
@ -73,27 +74,27 @@ where
self.state.remove_user(user);
}
pub fn contains_group(&self, object_id: &str) -> bool {
pub fn contains_group(&self, object_id: &Uuid) -> bool {
self.state.contains_group(object_id)
}
pub async fn get_group(&self, object_id: &str) -> Option<Arc<CollabGroup>> {
pub async fn get_group(&self, object_id: &Uuid) -> Option<Arc<CollabGroup>> {
self.state.get_group(object_id).await
}
pub async fn subscribe_group(
&self,
user: &RealtimeUser,
object_id: &str,
object_id: Uuid,
message_origin: &CollabOrigin,
client_msg_router: &mut ClientMessageRouter,
) -> Result<(), RealtimeError> {
// Lock the group and subscribe the user to the group.
if let Some(mut e) = self.state.get_mut_group(object_id).await {
if let Some(mut e) = self.state.get_mut_group(&object_id).await {
let group = e.value_mut();
trace!("[realtime]: {} subscribe group:{}", user, object_id,);
let (sink, stream) = client_msg_router.init_client_communication::<CollabMessage>(
group.workspace_id(),
*group.workspace_id(),
user,
object_id,
self.access_control.clone(),
@ -114,8 +115,8 @@ where
pub async fn create_group(
&self,
user: &RealtimeUser,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) -> Result<(), RealtimeError> {
let params = QueryCollabParams::new(object_id, collab_type, workspace_id);
@ -126,7 +127,7 @@ where
let state_vector = match res {
Ok(collab) => Collab::new_with_source(
CollabOrigin::Server,
object_id,
&object_id.to_string(),
DataSource::DocStateV1(collab.doc_state.into()),
vec![],
false,
@ -147,8 +148,8 @@ where
let group = CollabGroup::new(
user.uid,
workspace_id.to_string(),
object_id.to_string(),
workspace_id,
object_id,
collab_type,
self.metrics_calculate.clone(),
self.storage.clone(),
@ -168,7 +169,7 @@ where
#[instrument(level = "trace", skip_all)]
async fn load_collab<S>(
uid: i64,
object_id: &str,
object_id: &Uuid,
params: QueryCollabParams,
storage: Arc<S>,
) -> Result<(Collab, EncodedCollab), AppError>
@ -180,7 +181,7 @@ where
.await?;
let result = Collab::new_with_source(
CollabOrigin::Server,
object_id,
&object_id.to_string(),
DataSource::DocStateV1(encode_collab.doc_state.to_vec()),
vec![],
false,
@ -194,7 +195,7 @@ where
}
async fn load_collab_from_snapshot<S>(
object_id: &str,
object_id: &Uuid,
params: QueryCollabParams,
storage: Arc<S>,
) -> Option<(Collab, EncodedCollab)>
@ -210,7 +211,7 @@ where
.await?;
let collab = Collab::new_with_source(
CollabOrigin::Server,
object_id,
&object_id.to_string(),
DataSource::DocStateV1(encode_collab.doc_state.to_vec()),
vec![],
false,
@ -220,8 +221,8 @@ where
}
async fn get_latest_snapshot<S>(
workspace_id: &str,
object_id: &str,
workspace_id: &Uuid,
object_id: &Uuid,
storage: &S,
collab_type: &CollabType,
) -> Option<EncodedCollab>
@ -234,14 +235,15 @@ where
.ok()?
.0;
for meta in metas {
let object_id = Uuid::parse_str(&meta.object_id).ok()?;
let snapshot_data = storage
.get_collab_snapshot(workspace_id, &meta.object_id, &meta.snapshot_id)
.get_collab_snapshot(*workspace_id, object_id, &meta.snapshot_id)
.await
.ok()?;
if let Ok(encoded_collab) = EncodedCollab::decode_from_bytes(&snapshot_data.encoded_collab_v1) {
if let Ok(collab) = Collab::new_with_source(
CollabOrigin::Empty,
object_id,
&object_id.to_string(),
DataSource::DocStateV1(encoded_collab.doc_state.to_vec()),
vec![],
false,

View file

@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use tokio::time::sleep;
use tracing::{error, trace};
use uuid::Uuid;
use yrs::TransactionMut;
use database::collab::CollabStorage;
@ -13,8 +14,8 @@ use database_entity::dto::InsertSnapshotParams;
/// [HistoryPlugin] will be moved to history collab server. For now, it's temporarily placed here.
pub struct HistoryPlugin<S> {
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
storage: Arc<S>,
did_create_snapshot: AtomicBool,
@ -29,8 +30,8 @@ where
{
#[allow(dead_code)]
pub fn new(
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
weak_collab: Weak<RwLock<Collab>>,
storage: Arc<S>,
@ -51,8 +52,8 @@ where
async fn enqueue_snapshot(
weak_collab: Weak<RwLock<Collab>>,
storage: Arc<S>,
workspace_id: String,
object_id: String,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) -> Result<(), anyhow::Error> {
trace!("trying to enqueue snapshot for object_id: {}", object_id);
@ -96,10 +97,10 @@ where
self.did_create_snapshot.store(true, Ordering::SeqCst);
let storage = self.storage.clone();
let weak_collab = self.weak_collab.clone();
let collab_type = self.collab_type;
let object_id = self.object_id.clone();
let workspace_id = self.workspace_id.clone();
let collab_type = self.collab_type.clone();
let workspace_id = self.workspace_id;
let object_id = self.object_id;
tokio::spawn(async move {
sleep(std::time::Duration::from_secs(2)).await;
match storage

View file

@ -1,3 +1,8 @@
use crate::config::get_env_var;
use crate::error::RealtimeError;
use crate::group::group_init::CollabGroup;
use crate::metrics::CollabRealtimeMetrics;
use collab_rt_entity::user::RealtimeUser;
use dashmap::mapref::one::RefMut;
use dashmap::try_result::TryResult;
use dashmap::DashMap;
@ -6,16 +11,11 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{error, event, trace, warn};
use crate::config::get_env_var;
use crate::error::RealtimeError;
use crate::group::group_init::CollabGroup;
use crate::metrics::CollabRealtimeMetrics;
use collab_rt_entity::user::RealtimeUser;
use uuid::Uuid;
#[derive(Clone)]
pub(crate) struct GroupManagementState {
group_by_object_id: Arc<DashMap<String, Arc<CollabGroup>>>,
group_by_object_id: Arc<DashMap<Uuid, Arc<CollabGroup>>>,
/// Keep track of all [Collab] objects that a user is subscribed to.
editing_by_user: Arc<DashMap<RealtimeUser, HashSet<Editing>>>,
metrics_calculate: Arc<CollabRealtimeMetrics>,
@ -37,7 +37,7 @@ impl GroupManagementState {
}
/// Returns group ids of inactive groups.
pub fn remove_inactive_groups(&self) -> Vec<String> {
pub fn remove_inactive_groups(&self) -> Vec<Uuid> {
let mut inactive_group_ids = vec![];
for entry in self.group_by_object_id.iter() {
let (object_id, group) = (entry.key(), entry.value());
@ -57,7 +57,7 @@ impl GroupManagementState {
inactive_group_ids
}
pub async fn get_group(&self, object_id: &str) -> Option<Arc<CollabGroup>> {
pub async fn get_group(&self, object_id: &Uuid) -> Option<Arc<CollabGroup>> {
let mut attempts = 0;
let max_attempts = 3;
let retry_delay = Duration::from_millis(100);
@ -85,8 +85,8 @@ impl GroupManagementState {
/// may deadlock when holding the RefMut and trying to read group_by_object_id.
pub(crate) async fn get_mut_group(
&self,
object_id: &str,
) -> Option<RefMut<String, Arc<CollabGroup>>> {
object_id: &Uuid,
) -> Option<RefMut<Uuid, Arc<CollabGroup>>> {
let mut attempts = 0;
let max_attempts = 3;
let retry_delay = Duration::from_millis(300);
@ -108,14 +108,12 @@ impl GroupManagementState {
}
}
pub(crate) fn insert_group(&self, object_id: &str, group: CollabGroup) {
self
.group_by_object_id
.insert(object_id.to_string(), group.into());
pub(crate) fn insert_group(&self, object_id: Uuid, group: CollabGroup) {
self.group_by_object_id.insert(object_id, group.into());
self.metrics_calculate.opening_collab_count.inc();
}
pub(crate) fn contains_group(&self, object_id: &str) -> bool {
pub(crate) fn contains_group(&self, object_id: &Uuid) -> bool {
if let Some(group) = self.group_by_object_id.get(object_id) {
let cancelled = group.is_cancelled();
!cancelled
@ -124,7 +122,7 @@ impl GroupManagementState {
}
}
pub(crate) fn remove_group(&self, object_id: &str) {
pub(crate) fn remove_group(&self, object_id: &Uuid) {
let group_not_found = self.group_by_object_id.remove(object_id).is_none();
if group_not_found {
// Log error if the group doesn't exist
@ -139,11 +137,9 @@ impl GroupManagementState {
pub(crate) fn insert_user(
&self,
user: &RealtimeUser,
object_id: &str,
object_id: Uuid,
) -> Result<(), RealtimeError> {
let editing = Editing {
object_id: object_id.to_string(),
};
let editing = Editing { object_id };
let entry = self.editing_by_user.entry(user.clone());
match entry {
@ -189,7 +185,7 @@ impl GroupManagementState {
}
}
pub fn contains_user(&self, object_id: &str, user: &RealtimeUser) -> bool {
pub fn contains_user(&self, object_id: &Uuid, user: &RealtimeUser) -> bool {
match self.group_by_object_id.try_get(object_id) {
TryResult::Present(entry) => entry.value().contains_user(user),
TryResult::Absent => false,
@ -203,5 +199,5 @@ impl GroupManagementState {
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
struct Editing {
pub object_id: String,
pub object_id: Uuid,
}

View file

@ -26,6 +26,7 @@ use tokio::sync::mpsc::Sender;
use tokio::task::yield_now;
use tokio::time::interval;
use tracing::{error, info, trace, warn};
use uuid::Uuid;
use yrs::updates::decoder::Decode;
use yrs::StateVector;
@ -37,7 +38,7 @@ pub struct CollaborationServer<S> {
/// Keep track of all collab groups
group_manager: Arc<GroupManager<S>>,
connect_state: ConnectState,
group_sender_by_object_id: Arc<DashMap<String, GroupCommandSender>>,
group_sender_by_object_id: Arc<DashMap<Uuid, GroupCommandSender>>,
#[allow(dead_code)]
metrics: Arc<CollabRealtimeMetrics>,
enable_custom_runtime: bool,
@ -88,7 +89,7 @@ where
)
.await?,
);
let group_sender_by_object_id: Arc<DashMap<String, GroupCommandSender>> =
let group_sender_by_object_id: Arc<DashMap<_, GroupCommandSender>> =
Arc::new(Default::default());
spawn_period_check_inactive_group(Arc::downgrade(&group_manager), &group_sender_by_object_id);
@ -164,7 +165,8 @@ where
message_by_oid: MessageByObjectId,
) -> Result<(), RealtimeError> {
for (object_id, collab_messages) in message_by_oid.into_inner() {
let group_cmd_sender = self.create_group_if_not_exist(&object_id);
let object_id = Uuid::parse_str(&object_id)?;
let group_cmd_sender = self.create_group_if_not_exist(object_id);
let cloned_user = user.clone();
// Create a new task to send a message to the group command runner without waiting for the
// result. This approach is used to prevent potential issues with the actor's mailbox in
@ -210,7 +212,7 @@ where
&self,
message: ClientHttpUpdateMessage,
) -> Result<(), RealtimeError> {
let group_cmd_sender = self.create_group_if_not_exist(&message.object_id);
let group_cmd_sender = self.create_group_if_not_exist(message.object_id);
tokio::spawn(async move {
let object_id = message.object_id.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
@ -316,15 +318,15 @@ where
}
#[inline]
fn create_group_if_not_exist(&self, object_id: &str) -> Sender<GroupCommand> {
fn create_group_if_not_exist(&self, object_id: Uuid) -> Sender<GroupCommand> {
let old_sender = self
.group_sender_by_object_id
.get(object_id)
.get(&object_id)
.map(|entry| entry.value().clone());
let sender = match old_sender {
Some(sender) => sender,
None => match self.group_sender_by_object_id.entry(object_id.to_string()) {
None => match self.group_sender_by_object_id.entry(object_id) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => {
let (new_sender, recv) = tokio::sync::mpsc::channel(2000);
@ -354,7 +356,7 @@ where
&self,
message: ClientGenerateEmbeddingMessage,
) -> Result<(), RealtimeError> {
let group_cmd_sender = self.create_group_if_not_exist(&message.object_id);
let group_cmd_sender = self.create_group_if_not_exist(message.object_id);
tokio::spawn(async move {
let result = group_cmd_sender
.send(GroupCommand::GenerateCollabEmbedding {
@ -387,7 +389,7 @@ where
fn spawn_period_check_inactive_group<S>(
weak_groups: Weak<GroupManager<S>>,
group_sender_by_object_id: &Arc<DashMap<String, GroupCommandSender>>,
group_sender_by_object_id: &Arc<DashMap<Uuid, GroupCommandSender>>,
) where
S: CollabStorage,
{

View file

@ -6,6 +6,7 @@ use collab::entity::{EncodedCollab, EncoderVersion};
use collab_entity::CollabType;
use sqlx::PgPool;
use tracing::{debug, error, trace, warn};
use uuid::Uuid;
use validator::Validate;
use app_error::AppError;
@ -24,7 +25,7 @@ use crate::metrics::CollabMetrics;
pub const SNAPSHOT_TICK_INTERVAL: Duration = Duration::from_secs(2);
fn collab_snapshot_key(workspace_id: &str, object_id: &str, snapshot_id: i64) -> String {
fn collab_snapshot_key(workspace_id: &Uuid, object_id: &Uuid, snapshot_id: i64) -> String {
let snapshot_id = u64::MAX - snapshot_id as u64;
format!(
"collabs/{}/{}/snapshot_{:16x}.v1.zstd",
@ -32,7 +33,7 @@ fn collab_snapshot_key(workspace_id: &str, object_id: &str, snapshot_id: i64) ->
)
}
fn collab_snapshot_prefix(workspace_id: &str, object_id: &str) -> String {
fn collab_snapshot_prefix(workspace_id: &Uuid, object_id: &Uuid) -> String {
format!("collabs/{}/{}/snapshot_", workspace_id, object_id)
}
@ -83,14 +84,9 @@ impl SnapshotControl {
pub async fn should_create_snapshot(
&self,
workspace_id: &str,
oid: &str,
workspace_id: &Uuid,
oid: &Uuid,
) -> Result<bool, AppError> {
if oid.is_empty() {
warn!("unexpected empty object id when checking should_create_snapshot");
return Ok(false);
}
let latest_created_at = self.latest_snapshot_time(workspace_id, oid).await?;
// Subtracting a fixed duration that is known not to cause underflow. If `checked_sub_signed` returns `None`,
// it indicates an error in calculation, thus defaulting to creating a snapshot just in case.
@ -151,18 +147,18 @@ impl SnapshotControl {
Ok(AFSnapshotMeta {
snapshot_id,
object_id: params.object_id,
object_id: params.object_id.to_string(),
created_at: timestamp,
})
}
pub async fn get_collab_snapshot(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
snapshot_id: &i64,
) -> AppResult<SnapshotData> {
let key = collab_snapshot_key(workspace_id, object_id, *snapshot_id);
let key = collab_snapshot_key(&workspace_id, &object_id, *snapshot_id);
match self.s3.get_blob(&key).await {
Ok(resp) => {
self.collab_metrics.read_snapshot.inc();
@ -173,9 +169,9 @@ impl SnapshotControl {
version: EncoderVersion::V1,
};
Ok(SnapshotData {
object_id: object_id.to_string(),
object_id,
encoded_collab_v1: encoded_collab.encode_to_bytes()?,
workspace_id: workspace_id.to_string(),
workspace_id,
})
},
Err(AppError::RecordNotFound(_)) => {
@ -183,15 +179,15 @@ impl SnapshotControl {
"snapshot {} for `{}` not found in s3: fallback to postgres",
snapshot_id, object_id
);
match select_snapshot(&self.pg_pool, workspace_id, object_id, snapshot_id).await? {
match select_snapshot(&self.pg_pool, &workspace_id, &object_id, snapshot_id).await? {
None => Err(AppError::RecordNotFound(format!(
"Can't find the snapshot with id:{}",
snapshot_id
))),
Some(row) => Ok(SnapshotData {
object_id: object_id.to_string(),
object_id,
encoded_collab_v1: row.blob,
workspace_id: workspace_id.to_string(),
workspace_id,
}),
}
},
@ -202,8 +198,8 @@ impl SnapshotControl {
/// Returns list of snapshots for given object_id in descending order of creation time.
pub async fn get_collab_snapshot_list(
&self,
workspace_id: &str,
oid: &str,
workspace_id: &Uuid,
oid: &Uuid,
) -> AppResult<AFSnapshotMetas> {
let snapshot_prefix = collab_snapshot_prefix(workspace_id, oid);
let resp = self
@ -233,8 +229,8 @@ impl SnapshotControl {
pub async fn get_snapshot(
&self,
workspace_id: &str,
object_id: &str,
workspace_id: Uuid,
object_id: Uuid,
snapshot_id: &i64,
) -> Result<SnapshotData, AppError> {
self
@ -244,11 +240,11 @@ impl SnapshotControl {
pub async fn get_latest_snapshot(
&self,
workspace_id: &str,
oid: &str,
workspace_id: Uuid,
oid: Uuid,
collab_type: CollabType,
) -> Result<Option<SnapshotData>, AppError> {
let snapshot_prefix = collab_snapshot_prefix(workspace_id, oid);
let snapshot_prefix = collab_snapshot_prefix(&workspace_id, &oid);
let mut resp = self.s3.list_dir(&snapshot_prefix, 1).await?;
if let Some(key) = resp.pop() {
let resp = self.s3.get_blob(&key).await?;
@ -259,19 +255,19 @@ impl SnapshotControl {
version: EncoderVersion::V1,
};
Ok(Some(SnapshotData {
object_id: oid.to_string(),
object_id: oid,
encoded_collab_v1: encoded_collab.encode_to_bytes()?,
workspace_id: workspace_id.to_string(),
workspace_id,
}))
} else {
let snapshot = get_latest_snapshot(oid, &collab_type, &self.pg_pool).await?;
let snapshot = get_latest_snapshot(&oid, &collab_type, &self.pg_pool).await?;
Ok(
snapshot
.and_then(|row| row.snapshot_meta)
.map(|meta| SnapshotData {
object_id: oid.to_string(),
object_id: oid,
encoded_collab_v1: meta.snapshot,
workspace_id: workspace_id.to_string(),
workspace_id,
}),
)
}
@ -279,8 +275,8 @@ impl SnapshotControl {
async fn latest_snapshot_time(
&self,
workspace_id: &str,
oid: &str,
workspace_id: &Uuid,
oid: &Uuid,
) -> Result<Option<DateTime<Utc>>, AppError> {
let snapshot_prefix = collab_snapshot_prefix(workspace_id, oid);
let mut resp = self.s3.list_dir(&snapshot_prefix, 1).await?;

View file

@ -250,17 +250,17 @@ pub async fn get_latest_collab_database_body(
pub async fn get_latest_collab_encoded(
collab_storage: &CollabAccessControlStorage,
collab_origin: GetCollabOrigin,
workspace_id: &str,
oid: &str,
workspace_id: Uuid,
object_id: Uuid,
collab_type: CollabType,
) -> Result<EncodedCollab, AppError> {
collab_storage
.get_encode_collab(
collab_origin,
QueryCollabParams {
workspace_id: workspace_id.to_string(),
workspace_id,
inner: QueryCollab {
object_id: oid.to_string(),
object_id,
collab_type,
},
},
@ -272,8 +272,8 @@ pub async fn get_latest_collab_encoded(
pub async fn batch_get_latest_collab_encoded(
collab_storage: &CollabAccessControlStorage,
collab_origin: GetCollabOrigin,
workspace_id: &str,
oid_list: &[String],
workspace_id: Uuid,
oid_list: &[Uuid],
collab_type: CollabType,
) -> Result<HashMap<String, EncodedCollab>, AppError> {
let uid = match collab_origin {
@ -283,8 +283,8 @@ pub async fn batch_get_latest_collab_encoded(
let queries: Vec<QueryCollab> = oid_list
.iter()
.map(|row_id| QueryCollab {
object_id: row_id.to_string(),
collab_type,
object_id: *row_id,
collab_type: collab_type.clone(),
})
.collect();
let query_collab_results = collab_storage
@ -319,18 +319,24 @@ pub async fn batch_get_latest_collab_encoded(
pub async fn get_latest_collab(
storage: &CollabAccessControlStorage,
origin: GetCollabOrigin,
workspace_id: &str,
oid: &str,
workspace_id: Uuid,
oid: Uuid,
collab_type: CollabType,
) -> Result<Collab, AppError> {
let ec = get_latest_collab_encoded(storage, origin, workspace_id, oid, collab_type).await?;
let collab: Collab = Collab::new_with_source(CollabOrigin::Server, oid, ec.into(), vec![], false)
.map_err(|e| {
AppError::Internal(anyhow::anyhow!(
"Failed to create collab from encoded collab: {:?}",
e
))
})?;
let collab: Collab = Collab::new_with_source(
CollabOrigin::Server,
&oid.to_string(),
ec.into(),
vec![],
false,
)
.map_err(|e| {
AppError::Internal(anyhow::anyhow!(
"Failed to create collab from encoded collab: {:?}",
e
))
})?;
Ok(collab)
}