chore: first phase of tests

This commit is contained in:
Bartosz Sypytkowski 2025-03-21 08:41:22 +01:00
parent 2687127310
commit ec45ef23fe
16 changed files with 206 additions and 37 deletions

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.oid = $2)\n END as has_index\nFROM af_workspace w\nWHERE w.workspace_id = $1",
"query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.oid = $2::uuid)\n END as has_index\nFROM af_workspace w\nWHERE w.workspace_id = $1",
"describe": {
"columns": [
{
@ -25,5 +25,5 @@
null
]
},
"hash": "1ec283e1071ab41653630a6e4e6570d30d8ed1622e137d692c428f6e44a8ae5d"
"hash": "2c0a776a787bc748857873b682d2fa3c549ffeaf767aa8ee05b09b3857505ded"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT oid, indexed_at\n FROM af_collab\n WHERE oid = ANY (SELECT UNNEST($1::uuid[]))\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "oid",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "indexed_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"UuidArray"
]
},
"nullable": [
false,
true
]
},
"hash": "3865d921d76ac0d0eb16065738cddf82cb71945504116b0a04da759209b9c250"
}

View file

@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n ac.oid as object_id,\n ace.partition_key,\n ac.indexed_at,\n ace.updated_at\n FROM af_collab_embeddings ac\n JOIN af_collab ace ON ac.oid = ace.oid\n WHERE ac.oid = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "object_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "partition_key",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "indexed_at",
"type_info": "Timestamp"
},
{
"ordinal": 3,
"name": "updated_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "4fc0611c846f86be652d42eb8ae21a5da0353fe810856aaabe91d7963329d098"
}

View file

@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM af_collab_embeddings e\n USING af_collab c\n WHERE e.oid = c.oid\n AND c.workspace_id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "5c2d58bfdedbb1be71337a97d5ed5a2921f83dd549507b2834a4d2582d2c361b"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT oid, blob\n FROM af_collab\n WHERE oid = ANY($1) AND partition_key = $2 AND deleted_at IS NULL;\n ",
"query": "\n SELECT oid, blob\n FROM af_collab\n WHERE oid = ANY($1) AND deleted_at IS NULL;\n ",
"describe": {
"columns": [
{
@ -16,8 +16,7 @@
],
"parameters": {
"Left": [
"UuidArray",
"Int4"
"UuidArray"
]
},
"nullable": [
@ -25,5 +24,5 @@
false
]
},
"hash": "a7f47366a4016e10dfe9195f865ca0f0a2877738144afbd82844d75c4ea0ea8e"
"hash": "6935572cb23700243fbbd3dc382cdbf56edaadc4aab7855c237bce68e29414c0"
}

View file

@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE af_collab\n SET deleted_at = $2\n WHERE oid = $1;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Timestamptz"
]
},
"nullable": []
},
"hash": "78a191e21a7e7a07eee88ed02c7fbf7035f908b8e4057f7ace1b3b5d433424fe"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)\n VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (oid, partition_key)\n DO UPDATE SET blob = $2, len = $3, owner_uid = $5 WHERE excluded.workspace_id = af_collab.workspace_id;\n ",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)\n VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (oid)\n DO UPDATE SET blob = $2, len = $3, owner_uid = $5 WHERE excluded.workspace_id = af_collab.workspace_id;\n ",
"describe": {
"columns": [],
"parameters": {
@ -15,5 +15,5 @@
},
"nullable": []
},
"hash": "435fb563d3f6da280483996dc1803cbea4a99f1136796c2aaeb20cf7bb897f5e"
"hash": "8724214da0311c43988035526454ed1747ebc89dc350ee45827381418f71313e"
}

View file

@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n ac.oid as object_id,\n ace.partition_key,\n ac.indexed_at,\n ace.updated_at\n FROM af_collab_embeddings ac\n JOIN af_collab ace ON ac.oid = ace.oid\n WHERE ac.oid = ANY($1)\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "object_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "partition_key",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "indexed_at",
"type_info": "Timestamp"
},
{
"ordinal": 3,
"name": "updated_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"UuidArray"
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "cbf1d3d9fdeb672eacd4b008879787bc1f0b22a554fb249d4e12a665d9767cbd"
}

View file

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)\n SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::int[], $5::bigint[], $6::uuid[])\n ON CONFLICT (oid, partition_key)\n DO UPDATE SET blob = excluded.blob, len = excluded.len where af_collab.workspace_id = excluded.workspace_id\n ",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)\n SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::int[], $5::bigint[], $6::uuid[])\n ON CONFLICT (oid)\n DO UPDATE SET blob = excluded.blob, len = excluded.len where af_collab.workspace_id = excluded.workspace_id\n ",
"describe": {
"columns": [],
"parameters": {
@ -15,5 +15,5 @@
},
"nullable": []
},
"hash": "e040e3e3a1821bbf892a7b1dced0a8cdbeb3bec493bc2ced8de0b6a1175f2c17"
"hash": "fbe0746688157bf563bd6a8fb707ef9553c6751c3dd214f461f2de087f8b29c4"
}

View file

@ -171,7 +171,7 @@ pub async fn insert_into_af_collab_bulk_for_user(
r#"
INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)
SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::int[], $5::bigint[], $6::uuid[])
ON CONFLICT (oid, partition_key)
ON CONFLICT (oid)
DO UPDATE SET blob = excluded.blob, len = excluded.len where af_collab.workspace_id = excluded.workspace_id
"#,
&object_ids,

View file

@ -95,7 +95,6 @@ pub async fn upsert_collab_embeddings(
transaction: &mut Transaction<'_, Postgres>,
workspace_id: &Uuid,
object_id: &Uuid,
collab_type: CollabType,
tokens_used: u32,
records: Vec<AFCollabEmbeddedChunk>,
) -> Result<(), sqlx::Error> {
@ -105,10 +104,9 @@ pub async fn upsert_collab_embeddings(
object_id,
fragments.len()
);
sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment_v3[])"#)
sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4::af_fragment_v3[])"#)
.bind(*workspace_id)
.bind(object_id.to_string())
.bind(crate::collab::partition_key_from_collab_type(&collab_type))
.bind(tokens_used as i32)
.bind(fragments)
.execute(transaction.deref_mut())

View file

@ -464,7 +464,6 @@ pub(crate) async fn batch_insert_records(
&mut txn,
&record.workspace_id,
&record.object_id,
record.collab_type,
record.tokens_used,
record.contents,
)

View file

@ -51,4 +51,31 @@ create index if not exists idx_workspace_id_on_af_collab
on af_collab (workspace_id);
create index if not exists idx_af_collab_updated_at
on af_collab (updated_at);
on af_collab (updated_at);
create or replace procedure af_collab_embeddings_upsert(IN p_workspace_id uuid, IN p_oid text, IN p_tokens_used integer, IN p_fragments af_fragment_v3[])
language plpgsql
as
$$
BEGIN
DELETE FROM af_collab_embeddings WHERE oid = p_oid;
INSERT INTO af_collab_embeddings (fragment_id, oid, content_type, content, embedding, indexed_at, metadata, fragment_index, embedder_type)
SELECT
f.fragment_id,
p_oid,
f.content_type,
f.contents,
f.embedding,
NOW(),
f.metadata,
f.fragment_index,
f.embedder_type
FROM UNNEST(p_fragments) as f;
-- Update the usage tracking table
INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed)
VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used)
ON CONFLICT (created_at, workspace_id)
DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used;
END
$$;

View file

@ -51,6 +51,9 @@ pub enum ImportError {
#[error(transparent)]
Internal(#[from] anyhow::Error),
#[error(transparent)]
InvalidUuid(#[from] uuid::Error),
}
impl From<WorkerError> for ImportError {
@ -214,6 +217,16 @@ impl ImportError {
format!("Task ID: {} - Upload file too large: {} MB", task_id, file_size_in_mb),
)
}
ImportError::InvalidUuid(err) => {
(
format!(
"Task ID: {} - Identifier is not valid UUID: {}",
task_id,
err
),
format!("Task ID: {} - Identifier is not valid UUID", task_id),
)
}
}
}
}

View file

@ -872,7 +872,7 @@ async fn process_unzip_file(
redis_client: &mut ConnectionManager,
s3_client: &Arc<dyn S3Client>,
) -> Result<(), ImportError> {
let workspace_id =
let _ =
Uuid::parse_str(&import_task.workspace_id).map_err(|err| ImportError::Internal(err.into()))?;
let notion_importer = NotionImporter::new(
import_task.uid,
@ -898,9 +898,10 @@ async fn process_unzip_file(
);
// 1. Open the workspace folder
let workspace_id = Uuid::parse_str(&imported.workspace_id)?;
let folder_collab = get_encode_collab_from_bytes(
&imported.workspace_id,
&imported.workspace_id,
&workspace_id,
&workspace_id,
&CollabType::Folder,
pg_pool,
s3_client,
@ -942,7 +943,7 @@ async fn process_unzip_file(
.imported_collabs
.into_iter()
.map(|imported_collab| CollabParams {
object_id: imported_collab.object_id,
object_id: imported_collab.object_id.parse().unwrap(),
collab_type: imported_collab.collab_type,
encoded_collab_v1: Bytes::from(imported_collab.encoded_collab.encode_to_bytes().unwrap()),
})
@ -971,13 +972,12 @@ async fn process_unzip_file(
"Failed to select workspace database storage id: {:?}",
err
))
})
.map(|id| id.to_string())?;
})?;
// 4. Edit workspace database collab and then encode workspace database collab
if !database_view_ids_by_database_id.is_empty() {
let w_db_collab = get_encode_collab_from_bytes(
&import_task.workspace_id,
&workspace_id,
&w_database_id,
&CollabType::WorkspaceDatabase,
pg_pool,
@ -985,7 +985,7 @@ async fn process_unzip_file(
)
.await?;
let mut w_database = WorkspaceDatabase::from_collab_doc_state(
&w_database_id,
&w_database_id.to_string(),
CollabOrigin::Server,
w_db_collab.into(),
)
@ -1003,7 +1003,7 @@ async fn process_unzip_file(
Ok(bytes) => {
if let Err(err) = redis_client
.set_ex::<String, Vec<u8>, Value>(
encode_collab_key(&w_database_id),
encode_collab_key(&w_database_id.to_string()),
bytes,
2592000, // WorkspaceDatabase => 1 month
)
@ -1026,7 +1026,7 @@ async fn process_unzip_file(
import_task.workspace_id
);
let w_database_collab_params = CollabParams {
object_id: w_database_id.clone(),
object_id: w_database_id,
collab_type: CollabType::WorkspaceDatabase,
encoded_collab_v1: Bytes::from(w_database_collab.encode_to_bytes().unwrap()),
};
@ -1066,7 +1066,7 @@ async fn process_unzip_file(
}
let folder_collab_params = CollabParams {
object_id: import_task.workspace_id.clone(),
object_id: workspace_id,
collab_type: CollabType::Folder,
encoded_collab_v1: Bytes::from(folder_collab.encode_to_bytes().unwrap()),
};
@ -1095,7 +1095,7 @@ async fn process_unzip_file(
insert_into_af_collab_bulk_for_user(
&mut transaction,
&import_task.uid,
&import_task.workspace_id,
workspace_id,
&collab_params_list,
)
.await
@ -1349,8 +1349,8 @@ async fn upload_file_to_s3(
}
async fn get_encode_collab_from_bytes(
workspace_id: &str,
object_id: &str,
workspace_id: &Uuid,
object_id: &Uuid,
collab_type: &CollabType,
pg_pool: &PgPool,
s3: &Arc<dyn S3Client>,
@ -1605,13 +1605,13 @@ async fn insert_meta_from_path(
})
}
fn collab_key(workspace_id: &str, object_id: &str) -> String {
fn collab_key(workspace_id: &Uuid, object_id: &Uuid) -> String {
format!(
"collabs/{}/{}/encoded_collab.v1.zstd",
workspace_id, object_id
)
}
fn encode_collab_key(object_id: &str) -> String {
fn encode_collab_key<T: Display>(object_id: T) -> String {
format!("encode_collab_v0:{}", object_id)
}

View file

@ -1,5 +1,4 @@
use app_error::AppError;
use collab_entity::CollabType;
use database::index::get_collabs_indexed_at;
use indexer::collab_indexer::{Indexer, IndexerProvider};
use indexer::entity::EmbeddingRecord;
@ -133,10 +132,7 @@ async fn process_upcoming_tasks(
.collect();
tasks.retain(|task| !task.data.is_empty());
let collab_ids: Vec<(String, CollabType)> = tasks
.iter()
.map(|task| (task.object_id.clone(), task.collab_type))
.collect();
let collab_ids: Vec<_> = tasks.iter().map(|task| task.object_id).collect();
let indexed_collabs = get_collabs_indexed_at(&pg_pool, collab_ids)
.await