chore: reduce number of SQL statements executed when inserting/updating collab (#1022)

* chore: reduce number of SQL statements executed when inserting new collab

* chore: add workspace id check

* chore: fix previous query

* chore: upsert collabs and embddings as stored procedures

* chore: fix clippy errors

* chore: fix stored procedure to compare workspaces correctly
This commit is contained in:
Bartosz Sypytkowski 2024-11-27 06:00:00 +01:00 committed by GitHub
parent 3806a959b4
commit 4fb6df5c14
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 139 additions and 250 deletions

View file

@ -1,16 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_collab_member (uid, oid, permission_id)\n VALUES ($1, $2, $3)\n ON CONFLICT (uid, oid)\n DO UPDATE\n SET permission_id = excluded.permission_id;\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Int4"
]
},
"nullable": []
},
"hash": "1f3943b7aa640f6a3eeaf1301fd51137e21580ef13982a95d634c40070094779"
}

View file

@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "CALL af_collab_upsert($1, $2, $3, $4, $5, $6)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Int4",
"Int8",
"Int4",
"Bytea"
]
},
"nullable": []
},
"hash": "7e413ca3430f3270e9f903c8bc2151a5f354cd951c20dbd6261d65dbdb503cbc"
}

View file

@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT workspace_id FROM af_collab WHERE oid = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "workspace_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "9dd4d1e9fc5684558f765200c88eaf0dc5a4bd4d8bfa621ad1dc85010c965728"
}

View file

@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM af_collab_embeddings WHERE oid = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text"
]
},
"nullable": []
},
"hash": "b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721"
}

View file

@ -1,19 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE af_collab SET blob = $3, len = $4, encrypt = $5, owner_uid = $6 WHERE oid = $1 AND partition_key = $2;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int4",
"Bytea",
"Int4",
"Int4",
"Int8"
]
},
"nullable": []
},
"hash": "d4ce35ee25927744b46eb4c4a4b6b710669f8059b77e107b60993c939112e3a0"
}

View file

@ -1,20 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT rp.permission_id\n FROM af_role_permissions rp\n JOIN af_roles ON rp.role_id = af_roles.id\n WHERE af_roles.name = 'Owner';\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "permission_id",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "d6708c221831093b03524bd6b73974f6f41db00b5f58f47ad7180d068e273406"
}

View file

@ -1,20 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)VALUES ($1, $2, $3, $4, $5, $6, $7)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Bytea",
"Int4",
"Int4",
"Int4",
"Int8",
"Uuid"
]
},
"nullable": []
},
"hash": "e226a33d5e5db0b155b5245e05293072f0e16cb4e25840b064bf00469f8adcf4"
}

View file

@ -18,7 +18,7 @@ use sqlx::{Error, Executor, PgPool, Postgres, Row, Transaction};
use std::collections::HashMap;
use std::fmt::Debug;
use std::{ops::DerefMut, str::FromStr};
use tracing::{error, event, instrument};
use tracing::{error, instrument};
use uuid::Uuid;
/// Inserts a new row into the `af_collab` table or updates an existing row if it matches the
@ -55,109 +55,33 @@ pub async fn insert_into_af_collab(
let encrypt = 0;
let partition_key = crate::collab::partition_key_from_collab_type(&params.collab_type);
let workspace_id = Uuid::from_str(workspace_id)?;
let existing_workspace_id: Option<Uuid> = sqlx::query_scalar!(
"SELECT workspace_id FROM af_collab WHERE oid = $1",
&params.object_id
)
.fetch_optional(tx.deref_mut())
.await?;
event!(
tracing::Level::TRACE,
tracing::trace!(
"upsert collab:{}, len:{}",
params.object_id,
params.encoded_collab_v1.len(),
);
// If the collab already exists, update the row with the new data.
// In most cases, the workspace_id should be the same as the existing one. Comparing the workspace_id
// is a safety check to prevent a user from inserting a row with an existing object_id but a different
// workspace_id.
match existing_workspace_id {
Some(existing_workspace_id) => {
if existing_workspace_id == workspace_id {
sqlx::query!(
"UPDATE af_collab \
SET blob = $3, len = $4, encrypt = $5, owner_uid = $6 WHERE oid = $1 AND partition_key = $2;",
params.object_id,
partition_key,
params.encoded_collab_v1.as_ref(),
params.encoded_collab_v1.len() as i32,
encrypt,
uid,
)
.execute(tx.deref_mut())
.await.map_err(|err| {
AppError::Internal(anyhow!(
"Update af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. error: {:?}",
workspace_id, uid, params.object_id, params.collab_type, err,
))
})?;
} else {
return Err(AppError::Internal(anyhow!(
"workspace_id is not match. expect workspace_id:{}, but receive:{}",
existing_workspace_id,
workspace_id
)));
}
},
None => {
// If the collab doesn't exist, insert a new row into the `af_collab` table and add a corresponding
// entry to the `af_collab_member` table.
let permission_id: i32 = sqlx::query_scalar!(
r#"
SELECT rp.permission_id
FROM af_role_permissions rp
JOIN af_roles ON rp.role_id = af_roles.id
WHERE af_roles.name = 'Owner';
"#
)
.fetch_one(tx.deref_mut())
.await?;
sqlx::query!(
r#"
INSERT INTO af_collab_member (uid, oid, permission_id)
VALUES ($1, $2, $3)
ON CONFLICT (uid, oid)
DO UPDATE
SET permission_id = excluded.permission_id;
"#,
uid,
params.object_id,
permission_id
)
.execute(tx.deref_mut())
.await
.map_err(|err| {
AppError::Internal(anyhow!(
"Insert af_collab_member failed: {}:{}:{}. error details:{:?}",
uid,
params.object_id,
permission_id,
err
))
})?;
sqlx::query!(
"INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)\
VALUES ($1, $2, $3, $4, $5, $6, $7)",
params.object_id,
params.encoded_collab_v1.as_ref(),
params.encoded_collab_v1.len() as i32,
partition_key,
encrypt,
uid,
workspace_id,
)
.execute(tx.deref_mut())
.await.map_err(|err| {
AppError::Internal(anyhow!(
"Insert new af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. payload len:{} error: {:?}",
workspace_id, uid, params.object_id, params.collab_type, params.encoded_collab_v1.len(), err,
))
})?;
},
}
sqlx::query!(
r#"CALL af_collab_upsert($1, $2, $3, $4, $5, $6)"#,
workspace_id,
params.object_id,
partition_key,
*uid,
encrypt,
params.encoded_collab_v1.as_ref(),
)
.execute(tx.deref_mut())
.await
.map_err(|err| {
AppError::Internal(anyhow!(
"Update af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. error: {:?}",
workspace_id,
uid,
params.object_id,
params.collab_type,
err,
))
})?;
Ok(())
}

View file

@ -63,7 +63,13 @@ impl CollabDiskCache {
em.tokens_consumed
);
let workspace_id = Uuid::parse_str(workspace_id)?;
upsert_collab_embeddings(transaction, &workspace_id, em.tokens_consumed, &em.params).await?;
upsert_collab_embeddings(
transaction,
&workspace_id,
em.tokens_consumed,
em.params.clone(),
)
.await?;
} else if params.collab_type == CollabType::Document {
tracing::info!("no embeddings to save for collab {}", params.object_id);
}

View file

@ -2,6 +2,7 @@ use std::ops::DerefMut;
use collab_entity::CollabType;
use pgvector::Vector;
use sqlx::postgres::{PgHasArrayType, PgTypeInfo};
use sqlx::{Error, Executor, Postgres, Transaction};
use uuid::Uuid;
@ -58,52 +59,52 @@ WHERE w.workspace_id = $1"#,
}
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "af_fragment", no_pg_array)]
struct Fragment {
fragment_id: String,
content_type: i32,
contents: String,
embedding: Option<Vector>,
}
impl From<AFCollabEmbeddingParams> for Fragment {
fn from(value: AFCollabEmbeddingParams) -> Self {
Fragment {
fragment_id: value.fragment_id,
content_type: value.content_type as i32,
contents: value.content,
embedding: value.embedding.map(Vector::from),
}
}
}
impl PgHasArrayType for Fragment {
fn array_type_info() -> PgTypeInfo {
PgTypeInfo::with_name("af_fragment[]")
}
}
pub async fn upsert_collab_embeddings(
tx: &mut Transaction<'_, sqlx::Postgres>,
workspace_id: &Uuid,
tokens_used: u32,
records: &[AFCollabEmbeddingParams],
records: Vec<AFCollabEmbeddingParams>,
) -> Result<(), sqlx::Error> {
if tokens_used > 0 {
sqlx::query(r#"
INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed)
VALUES (now()::date, $1, 0, 0, $2)
ON CONFLICT (created_at, workspace_id) DO UPDATE
SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + $2"#,
)
.bind(workspace_id)
.bind(tokens_used as i64)
.execute(tx.deref_mut())
.await?;
if records.is_empty() {
return Ok(());
}
let object_id = records[0].object_id.clone();
let collab_type = records[0].collab_type.clone();
if !records.is_empty() {
// replace existing collab embeddings
remove_collab_embeddings(tx, &records[0].object_id).await?;
for r in records {
sqlx::query(
r#"INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
ON CONFLICT (fragment_id) DO UPDATE SET content_type = $4, content = $5, embedding = $6, indexed_at = NOW()"#,
)
.bind(&r.fragment_id)
.bind(&r.object_id)
.bind(crate::collab::partition_key_from_collab_type(&r.collab_type))
.bind(r.content_type as i32)
.bind(&r.content)
.bind(r.embedding.clone().map(Vector::from))
.execute(tx.deref_mut())
.await?;
}
}
Ok(())
}
let fragments = records.into_iter().map(Fragment::from).collect::<Vec<_>>();
pub async fn remove_collab_embeddings(
tx: &mut Transaction<'_, sqlx::Postgres>,
object_id: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM af_collab_embeddings WHERE oid = $1", object_id)
sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment[])"#)
.bind(*workspace_id)
.bind(object_id)
.bind(crate::collab::partition_key_from_collab_type(&collab_type))
.bind(tokens_used as i32)
.bind(fragments)
.execute(tx.deref_mut())
.await?;
Ok(())

View file

@ -0,0 +1,50 @@
CREATE OR REPLACE PROCEDURE af_collab_upsert(
IN p_workspace_id UUID,
IN p_oid TEXT,
IN p_partition_key INT,
IN p_uid BIGINT,
IN p_encrypt INT,
IN p_blob BYTEA
)
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)
VALUES (p_oid, p_blob, LENGTH(p_blob), p_partition_key, p_encrypt, p_uid, p_workspace_id)
ON CONFLICT (oid, partition_key)
DO UPDATE SET blob = p_blob, len = LENGTH(p_blob), encrypt = p_encrypt, owner_uid = p_uid WHERE excluded.workspace_id = af_collab.workspace_id;
INSERT INTO af_collab_member (uid, oid, permission_id)
SELECT p_uid, p_oid, rp.permission_id
FROM af_role_permissions rp
JOIN af_roles ON rp.role_id = af_roles.id
WHERE af_roles.name = 'Owner'
ON CONFLICT (uid, oid)
DO UPDATE SET permission_id = excluded.permission_id;
END
$$;
CREATE TYPE af_fragment AS (fragment_id TEXT, content_type INT, contents TEXT, embedding VECTOR(1536));
CREATE OR REPLACE PROCEDURE af_collab_embeddings_upsert(
IN p_workspace_id UUID,
IN p_oid TEXT,
IN p_partition_key INT,
IN p_tokens_used INT,
IN p_fragments af_fragment[]
)
LANGUAGE plpgsql
AS $$
BEGIN
DELETE FROM af_collab_embeddings WHERE oid = p_oid;
INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at)
SELECT f.fragment_id, p_oid, p_partition_key, f.content_type, f.contents, f.embedding, NOW()
FROM UNNEST(p_fragments) as f;
INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed)
VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used)
ON CONFLICT (created_at, workspace_id)
DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used;
END
$$;

View file

@ -163,7 +163,7 @@ impl IndexerProvider {
&mut tx,
&workspace_id,
embeddings.tokens_consumed,
&embeddings.params,
embeddings.params,
)
.await?;
tx.commit().await?;