chore: working on query conversions

This commit is contained in:
Bartosz Sypytkowski 2025-03-19 12:37:49 +01:00
parent 6e6e25fb14
commit c2e14a9639
9 changed files with 133 additions and 67 deletions

View file

@ -0,0 +1,29 @@
{
"db_name": "PostgreSQL",
"query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.oid = $2)\n END as has_index\nFROM af_workspace w\nWHERE w.workspace_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "disable_search_indexing",
"type_info": "Bool"
},
{
"ordinal": 1,
"name": "has_index",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Uuid",
"Uuid"
]
},
"nullable": [
null,
null
]
},
"hash": "1ec283e1071ab41653630a6e4e6570d30d8ed1622e137d692c428f6e44a8ae5d"
}

View file

@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)\n VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (oid, partition_key)\n DO UPDATE SET blob = $2, len = $3, owner_uid = $5 WHERE excluded.workspace_id = af_collab.workspace_id;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Bytea",
"Int4",
"Int4",
"Int8",
"Uuid"
]
},
"nullable": []
},
"hash": "435fb563d3f6da280483996dc1803cbea4a99f1136796c2aaeb20cf7bb897f5e"
}

View file

@ -0,0 +1,30 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n updated_at as updated_at,\n oid as row_id\n FROM af_collab\n WHERE workspace_id = $1\n AND oid = ANY($2)\n AND updated_at > $3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "updated_at",
"type_info": "Timestamptz"
},
{
"ordinal": 1,
"name": "row_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Uuid",
"UuidArray",
"Timestamptz"
]
},
"nullable": [
false,
false
]
},
"hash": "6ca2a2fa10d5334183d98176998d41f36948fe5624e290a32d0b50bc9fb256bf"
}

View file

@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)\n SELECT * FROM UNNEST($1::uuid[], $2::bytea[], $3::int[], $4::int[], $5::bigint[], $6::uuid[])\n ON CONFLICT (oid, partition_key)\n DO UPDATE SET blob = excluded.blob, len = excluded.len where af_collab.workspace_id = excluded.workspace_id\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"UuidArray",
"ByteaArray",
"Int4Array",
"Int4Array",
"Int8Array",
"UuidArray"
]
},
"nullable": []
},
"hash": "e040e3e3a1821bbf892a7b1dced0a8cdbeb3bec493bc2ced8de0b6a1175f2c17"
}

View file

@ -282,13 +282,11 @@ impl Deref for QueryCollabParams {
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct QueryCollab {
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
pub collab_type: CollabType,
}
impl QueryCollab {
pub fn new<T: ToString>(object_id: T, collab_type: CollabType) -> Self {
let object_id = object_id.to_string();
pub fn new(object_id: String, collab_type: CollabType) -> Self {
Self {
object_id,
collab_type,

View file

@ -57,13 +57,14 @@ pub async fn insert_into_af_collab(
params.encoded_collab_v1.len(),
);
let oid = params.object_id.parse::<Uuid>()?;
sqlx::query!(
r#"
INSERT INTO af_collab (oid, blob, len, partition_key, owner_uid, workspace_id)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (oid, partition_key)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (oid)
DO UPDATE SET blob = $2, len = $3, owner_uid = $5 WHERE excluded.workspace_id = af_collab.workspace_id;
"#,
params.object_id,
oid,
params.encoded_collab_v1.as_ref(),
params.encoded_collab_v1.len() as i32,
partition_key,
@ -152,13 +153,13 @@ pub async fn insert_into_af_collab_bulk_for_user(
// Insert values into `af_collab` tables in bulk
let len = collab_params_list.len();
let mut object_ids: Vec<Uuid> = Vec::with_capacity(len);
let mut object_ids = Vec::with_capacity(len);
let mut blobs: Vec<Vec<u8>> = Vec::with_capacity(len);
let mut lengths: Vec<i32> = Vec::with_capacity(len);
let mut partition_keys: Vec<i32> = Vec::with_capacity(len);
let mut visited = HashSet::with_capacity(collab_params_list.len());
for params in collab_params_list {
let oid = Uuid::from_str(&params.object_id)?;
let oid = params.object_id.parse::<Uuid>()?;
if visited.insert(oid) {
let partition_key = partition_key_from_collab_type(&params.collab_type);
object_ids.push(oid);
@ -202,7 +203,7 @@ pub async fn insert_into_af_collab_bulk_for_user(
pub async fn select_blob_from_af_collab<'a, E>(
conn: E,
collab_type: &CollabType,
object_id: &str,
object_id: &Uuid,
) -> Result<Vec<u8>, sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
@ -224,7 +225,7 @@ where
#[inline]
pub async fn select_collab_meta_from_af_collab<'a, E>(
conn: E,
object_id: &str,
object_id: &Uuid,
collab_type: &CollabType,
) -> Result<Option<AFCollabRowMeta>, sqlx::Error>
where
@ -260,16 +261,14 @@ pub async fn batch_select_collab_blob(
}
for (collab_type, mut object_ids) in object_ids_by_collab_type.into_iter() {
let partition_key = partition_key_from_collab_type(&collab_type);
let par_results: Result<Vec<QueryCollabData>, sqlx::Error> = sqlx::query_as!(
QueryCollabData,
r#"
SELECT oid, blob
FROM af_collab
WHERE oid = ANY($1) AND partition_key = $2 AND deleted_at IS NULL;
WHERE oid = ANY($1) AND deleted_at IS NULL;
"#,
&object_ids,
partition_key,
&object_ids
)
.fetch_all(pg_pool)
.await;
@ -570,22 +569,17 @@ pub async fn select_collab_embed_info<'a, E>(
where
E: Executor<'a, Database = Postgres>,
{
tracing::info!(
"select_collab_embed_info: object_id: {}, collab_type: {:?}",
object_id
);
tracing::info!("select_collab_embed_info: object_id: {}", object_id);
let record = sqlx::query!(
r#"
SELECT
ac.object_id,
ac.oid as object_id,
ace.partition_key,
ac.indexed_at,
ace.updated_at
FROM af_collab_embeddings ac
JOIN af_collab ace
ON ac.object_id = ace.oid
AND ac.partition_key = ace.partition_key
WHERE ac.object_id = $1
JOIN af_collab ace ON ac.oid = ace.oid
WHERE ac.oid = $1
"#,
object_id
)
@ -608,37 +602,24 @@ pub async fn batch_select_collab_embed<'a, E>(
where
E: Executor<'a, Database = Postgres>,
{
let collab_types: Vec<CollabType> = embedded_collab
.iter()
.map(|query| query.collab_type)
.collect();
let object_ids: Vec<String> = embedded_collab
.into_iter()
.map(|query| query.object_id)
.collect();
// Collect the partition keys for each collab_type
let partition_keys: Vec<i32> = collab_types
.iter()
.map(partition_key_from_collab_type)
.collect();
// Execute the query to fetch all matching rows
let records = sqlx::query!(
r#"
SELECT
ac.oid AS object_id,
ac.partition_key,
ac.oid as object_id,
ace.partition_key,
ac.indexed_at,
ace.updated_at
FROM af_collab_embeddings ac
JOIN af_collab ace
ON ac.oid = ace.oid
AND ac.partition_key = ace.partition_key
WHERE ac.oid = ANY($1) AND ac.partition_key = ANY($2)
JOIN af_collab ace ON ac.oid = ace.oid
WHERE ac.oid = ANY($1)
"#,
&object_ids,
&partition_keys
&object_ids
)
.fetch_all(executor)
.await?;

View file

@ -15,8 +15,7 @@ use uuid::Uuid;
pub async fn get_index_status<'a, E>(
tx: E,
workspace_id: &Uuid,
object_id: &str,
partition_key: i32,
object_id: &Uuid,
) -> Result<IndexingStatus, sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
@ -29,13 +28,12 @@ SELECT
WHEN w.settings['disable_search_indexing']::boolean THEN
FALSE
ELSE
EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = $3 AND m.oid = $2)
EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.oid = $2::uuid)
END as has_index
FROM af_workspace w
WHERE w.workspace_id = $1"#,
workspace_id,
object_id,
partition_key
object_id
)
.fetch_one(tx)
.await;
@ -150,7 +148,7 @@ pub async fn stream_collabs_without_embeddings(
pub async fn update_collab_indexed_at<'a, E>(
tx: E,
object_id: &str,
object_id: &Uuid,
collab_type: &CollabType,
indexed_at: DateTime<Utc>,
) -> Result<(), Error>
@ -176,26 +174,18 @@ where
pub async fn get_collabs_indexed_at<'a, E>(
executor: E,
collab_ids: Vec<(String, CollabType)>,
) -> Result<HashMap<String, DateTime<Utc>>, Error>
oids: Vec<Uuid>,
) -> Result<HashMap<Uuid, DateTime<Utc>>, Error>
where
E: Executor<'a, Database = Postgres>,
{
let (oids, partition_keys): (Vec<String>, Vec<i32>) = collab_ids
.into_iter()
.map(|(object_id, collab_type)| (object_id, partition_key_from_collab_type(&collab_type)))
.unzip();
let result = sqlx::query!(
r#"
SELECT oid, indexed_at
FROM af_collab
WHERE (oid, partition_key) = ANY (
SELECT UNNEST($1::text[]), UNNEST($2::int[])
)
WHERE oid = ANY (SELECT UNNEST($1::uuid[]))
"#,
&oids,
&partition_keys
&oids
)
.fetch_all(executor)
.await?;
@ -209,7 +199,7 @@ where
None
}
})
.collect::<HashMap<String, DateTime<Utc>>>();
.collect::<HashMap<Uuid, DateTime<Utc>>>();
Ok(map)
}
@ -217,7 +207,7 @@ where
pub struct CollabId {
pub collab_type: CollabType,
pub workspace_id: Uuid,
pub object_id: String,
pub object_id: Uuid,
}
impl From<CollabId> for QueryCollabParams {
@ -225,7 +215,7 @@ impl From<CollabId> for QueryCollabParams {
QueryCollabParams {
workspace_id: value.workspace_id.to_string(),
inner: QueryCollab {
object_id: value.object_id,
object_id: value.object_id.to_string(),
collab_type: value.collab_type,
},
}

View file

@ -1051,7 +1051,6 @@ pub async fn upsert_workspace_settings(
DELETE FROM af_collab_embeddings e
USING af_collab c
WHERE e.oid = c.oid
AND e.partition_key = c.partition_key
AND c.workspace_id = $1
"#,
workspace_id

View file

@ -27,6 +27,7 @@ alter table af_collab_embeddings
alter table af_collab_embeddings
drop column oid,
drop column partition_key;
alter table af_collab_embeddings rename column object_id to oid;
-- replace af_collab table
drop table af_collab;
@ -35,15 +36,15 @@ alter table af_collab_temp rename to af_collab;
-- rebind embeddings foreign key to new af_collab table
alter table af_collab_embeddings
add constraint fk_af_collab_embeddings_af_collab
foreign key (object_id) references af_collab(oid);
create index ix_af_collab_embeddings_oid on af_collab_embeddings(object_id);
foreign key (oid) references af_collab(oid);
create index ix_af_collab_embeddings_oid on af_collab_embeddings(oid);
-- add trigger for af_collab.updated_at
create trigger set_updated_at
before insert or update
on af_collab
for each row
execute procedure update_updated_at_column();
on af_collab
for each row
execute procedure update_updated_at_column();
-- add remaining indexes to new af_collab table
create index if not exists idx_workspace_id_on_af_collab