chore: mid-way

This commit is contained in:
Bartosz Sypytkowski 2025-03-19 08:20:41 +01:00
parent 87a12b37f5
commit 6e6e25fb14
18 changed files with 75 additions and 269 deletions

View file

@ -6,7 +6,7 @@
{
"ordinal": 0,
"name": "oid",
"type_info": "Text"
"type_info": "Uuid"
}
],
"parameters": {

View file

@ -1,30 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n updated_at as updated_at,\n oid as row_id\n FROM af_collab_database_row\n WHERE workspace_id = $1\n AND oid = ANY($2)\n AND updated_at > $3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "updated_at",
"type_info": "Timestamptz"
},
{
"ordinal": 1,
"name": "row_id",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid",
"TextArray",
"Timestamptz"
]
},
"nullable": [
false,
false
]
},
"hash": "1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d"
}

View file

@ -11,7 +11,7 @@
],
"parameters": {
"Left": [
"Text",
"Uuid",
"Int4"
]
},

View file

@ -1,30 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = $3 AND m.oid = $2)\n END as has_index\nFROM af_workspace w\nWHERE w.workspace_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "disable_search_indexing",
"type_info": "Bool"
},
{
"ordinal": 1,
"name": "has_index",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid",
"Text",
"Int4"
]
},
"nullable": [
null,
null
]
},
"hash": "773aac7e401c3e6c04d1dc8ea412b9678b7227832a3487270d724f623072fe89"
}

View file

@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE af_collab\n SET deleted_at = $2\n WHERE oid = $1;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Timestamptz"
]
},
"nullable": []
},
"hash": "78a191e21a7e7a07eee88ed02c7fbf7035f908b8e4057f7ace1b3b5d433424fe"
}

View file

@ -1,20 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)\n SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::int[], $5::int[], $6::bigint[], $7::uuid[])\n ON CONFLICT (oid, partition_key)\n DO UPDATE SET blob = excluded.blob, len = excluded.len, encrypt = excluded.encrypt where af_collab.workspace_id = excluded.workspace_id\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"ByteaArray",
"Int4Array",
"Int4Array",
"Int4Array",
"Int8Array",
"UuidArray"
]
},
"nullable": []
},
"hash": "8df42aa8353a5fa510c0ab23412daebb263e8cf57b62e838460882cbf09cd551"
}

View file

@ -1,41 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n ac.oid AS object_id,\n ac.partition_key,\n ac.indexed_at,\n ace.updated_at\n FROM af_collab_embeddings ac\n JOIN af_collab ace\n ON ac.oid = ace.oid\n AND ac.partition_key = ace.partition_key\n WHERE ac.oid = $1 AND ac.partition_key = $2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "object_id",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "partition_key",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "indexed_at",
"type_info": "Timestamp"
},
{
"ordinal": 3,
"name": "updated_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Text",
"Int4"
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "968c7a6f13255220b3d497d9a1edb181b062747d6463e400158cfdc753a82c5b"
}

View file

@ -6,7 +6,7 @@
{
"ordinal": 0,
"name": "oid",
"type_info": "Text"
"type_info": "Uuid"
},
{
"ordinal": 1,
@ -16,7 +16,7 @@
],
"parameters": {
"Left": [
"TextArray",
"UuidArray",
"Int4"
]
},

View file

@ -1,20 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)\n VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (oid, partition_key)\n DO UPDATE SET blob = $2, len = $3, encrypt = $5, owner_uid = $6 WHERE excluded.workspace_id = af_collab.workspace_id;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Bytea",
"Int4",
"Int4",
"Int4",
"Int8",
"Uuid"
]
},
"nullable": []
},
"hash": "c62e3c19160fdbcf2ef7bc2c85ec012f628d593c8b2eba5e6ef3ba313045a696"
}

View file

@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM af_collab_embeddings e\n USING af_collab c\n WHERE e.oid = c.oid\n AND e.partition_key = c.partition_key\n AND c.workspace_id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "cbe43eb734e0afd865a7c1082ba7ded940d66320270d5ab4431271dd50a9d50b"
}

View file

@ -1,41 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n ac.oid AS object_id,\n ac.partition_key,\n ac.indexed_at,\n ace.updated_at\n FROM af_collab_embeddings ac\n JOIN af_collab ace\n ON ac.oid = ace.oid\n AND ac.partition_key = ace.partition_key\n WHERE ac.oid = ANY($1) AND ac.partition_key = ANY($2)\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "object_id",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "partition_key",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "indexed_at",
"type_info": "Timestamp"
},
{
"ordinal": 3,
"name": "updated_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"TextArray",
"Int4Array"
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "cdbbea42600d61b6541808867397de29e2e5df569faefa098254f1afd3aa662d"
}

View file

@ -6,7 +6,7 @@
"parameters": {
"Left": [
"Timestamptz",
"Text",
"Uuid",
"Int4"
]
},

View file

@ -11,7 +11,7 @@
],
"parameters": {
"Left": [
"Text"
"Uuid"
]
},
"nullable": [

View file

@ -6,7 +6,7 @@
{
"ordinal": 0,
"name": "oid",
"type_info": "Text"
"type_info": "Uuid"
},
{
"ordinal": 1,
@ -26,7 +26,7 @@
],
"parameters": {
"Left": [
"Text",
"Uuid",
"Int4"
]
},

View file

@ -11,7 +11,7 @@
{
"ordinal": 1,
"name": "oid",
"type_info": "Text"
"type_info": "Uuid"
},
{
"ordinal": 2,

View file

@ -1,29 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT oid, indexed_at\n FROM af_collab\n WHERE (oid, partition_key) = ANY (\n SELECT UNNEST($1::text[]), UNNEST($2::int[])\n )\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "oid",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "indexed_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"TextArray",
"Int4Array"
]
},
"nullable": [
false,
true
]
},
"hash": "f8c909517885cb30e3f7d573edf47138f90ea9c5fa73eb927cc5487c3d9ad0be"
}

View file

@ -49,7 +49,6 @@ pub async fn insert_into_af_collab(
workspace_id: &str,
params: &CollabParams,
) -> Result<(), AppError> {
let encrypt = 0;
let partition_key = crate::collab::partition_key_from_collab_type(&params.collab_type);
let workspace_id = Uuid::from_str(workspace_id)?;
tracing::trace!(
@ -60,15 +59,14 @@ pub async fn insert_into_af_collab(
sqlx::query!(
r#"
INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)
VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (oid, partition_key)
DO UPDATE SET blob = $2, len = $3, encrypt = $5, owner_uid = $6 WHERE excluded.workspace_id = af_collab.workspace_id;
INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (oid, partition_key)
DO UPDATE SET blob = $2, len = $3, owner_uid = $5 WHERE excluded.workspace_id = af_collab.workspace_id;
"#,
params.object_id,
params.encoded_collab_v1.as_ref(),
params.encoded_collab_v1.len() as i32,
partition_key,
encrypt,
uid,
workspace_id,
)
@ -175,16 +173,15 @@ pub async fn insert_into_af_collab_bulk_for_user(
// Bulk insert into `af_collab` for the provided collab params
sqlx::query!(
r#"
INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)
SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::int[], $5::int[], $6::bigint[], $7::uuid[])
INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)
SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::int[], $5::bigint[], $6::uuid[])
ON CONFLICT (oid, partition_key)
DO UPDATE SET blob = excluded.blob, len = excluded.len, encrypt = excluded.encrypt where af_collab.workspace_id = excluded.workspace_id
DO UPDATE SET blob = excluded.blob, len = excluded.len where af_collab.workspace_id = excluded.workspace_id
"#,
&object_ids,
&blobs,
&lengths,
&partition_keys,
&vec![encrypt; object_ids.len()],
&uids,
&workspace_ids
)
@ -552,7 +549,7 @@ pub async fn select_last_updated_database_row_ids(
SELECT
updated_at as updated_at,
oid as row_id
FROM af_collab_database_row
FROM af_collab
WHERE workspace_id = $1
AND oid = ANY($2)
AND updated_at > $3
@ -569,32 +566,28 @@ pub async fn select_last_updated_database_row_ids(
pub async fn select_collab_embed_info<'a, E>(
tx: E,
object_id: &str,
collab_type: CollabType,
) -> Result<Option<AFCollabEmbedInfo>, sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
tracing::info!(
"select_collab_embed_info: object_id: {}, collab_type: {:?}",
object_id,
collab_type
object_id
);
let partition_key = partition_key_from_collab_type(&collab_type);
let record = sqlx::query!(
r#"
SELECT
ac.oid AS object_id,
ac.partition_key,
ac.object_id,
ace.partition_key,
ac.indexed_at,
ace.updated_at
FROM af_collab_embeddings ac
JOIN af_collab ace
ON ac.oid = ace.oid
ON ac.object_id = ace.oid
AND ac.partition_key = ace.partition_key
WHERE ac.oid = $1 AND ac.partition_key = $2
WHERE ac.object_id = $1
"#,
object_id,
partition_key
object_id
)
.fetch_optional(tx)
.await?;

View file

@ -0,0 +1,53 @@
-- create new uniform collab table
create table af_collab_temp
(
oid uuid not null primary key,
workspace_id uuid not null references public.af_workspace on delete cascade,
owner_uid bigint not null,
partition_key integer not null,
len integer,
blob bytea not null,
deleted_at timestamp with time zone,
created_at timestamp with time zone default CURRENT_TIMESTAMP,
updated_at timestamp with time zone default CURRENT_TIMESTAMP not null,
indexed_at timestamp with time zone
);
-- copy data from all collab partitions to new collab table
insert into af_collab_temp(oid, workspace_id, owner_uid, partition_key, len, blob, deleted_at, created_at, updated_at, indexed_at)
select oid::uuid as oid, workspace_id, owner_uid, partition_key, len, blob, deleted_at, created_at, updated_at, indexed_at
from af_collab;
-- modify embeddings to make use of new uuid columns
alter table af_collab_embeddings
add column object_id uuid null;
update af_collab_embeddings set object_id = oid::uuid;
alter table af_collab_embeddings
alter column object_id set not null;
alter table af_collab_embeddings
drop column oid,
drop column partition_key;
-- replace af_collab table
drop table af_collab;
alter table af_collab_temp rename to af_collab;
-- rebind embeddings foreign key to new af_collab table
alter table af_collab_embeddings
add constraint fk_af_collab_embeddings_af_collab
foreign key (object_id) references af_collab(oid);
create index ix_af_collab_embeddings_oid on af_collab_embeddings(object_id);
-- add trigger for af_collab.updated_at
create trigger set_updated_at
before insert or update
on af_collab
for each row
execute procedure update_updated_at_column();
-- add remaining indexes to new af_collab table
create index if not exists idx_workspace_id_on_af_collab
on af_collab (workspace_id);
create index if not exists idx_af_collab_updated_at
on af_collab (updated_at);