chore: add metadata column to save embeding info (#1086)

This commit is contained in:
Nathan.fooo 2024-12-18 22:48:25 +08:00 committed by GitHub
parent 7ff9a923bb
commit e758f18d75
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 53 additions and 64 deletions

View file

@ -746,44 +746,7 @@ pub struct AFCollabEmbeddedChunk {
pub content_type: EmbeddingContentType,
pub content: String,
pub embedding: Option<Vec<f32>>,
}
impl AFCollabEmbeddedChunk {
pub fn from_proto(proto: &proto::collab::CollabEmbeddingsParams) -> Result<Self, EntityError> {
let collab_type_proto = proto::collab::CollabType::try_from(proto.collab_type).unwrap();
let collab_type = CollabType::from_proto(&collab_type_proto);
let content_type_proto =
proto::collab::EmbeddingContentType::try_from(proto.content_type).unwrap();
let content_type = EmbeddingContentType::from_proto(content_type_proto)?;
let embedding = if proto.embedding.is_empty() {
None
} else {
Some(proto.embedding.clone())
};
Ok(Self {
fragment_id: proto.fragment_id.clone(),
object_id: proto.object_id.clone(),
collab_type,
content_type,
content: proto.content.clone(),
embedding,
})
}
pub fn to_proto(&self) -> proto::collab::CollabEmbeddingsParams {
proto::collab::CollabEmbeddingsParams {
fragment_id: self.fragment_id.clone(),
object_id: self.object_id.clone(),
collab_type: self.collab_type.to_proto() as i32,
content_type: self.content_type.to_proto() as i32,
content: self.content.clone(),
embedding: self.embedding.clone().unwrap_or_default(),
}
}
pub fn to_protobuf_bytes(&self) -> Vec<u8> {
self.to_proto().encode_to_vec()
}
pub metadata: serde_json::Value,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -792,28 +755,6 @@ pub struct AFCollabEmbeddings {
pub params: Vec<AFCollabEmbeddedChunk>,
}
impl AFCollabEmbeddings {
pub fn from_proto(proto: proto::collab::CollabEmbeddings) -> Result<Self, EntityError> {
let mut params = vec![];
for param in proto.embeddings {
params.push(AFCollabEmbeddedChunk::from_proto(&param)?);
}
Ok(Self {
tokens_consumed: proto.tokens_consumed,
params,
})
}
pub fn to_proto(&self) -> proto::collab::CollabEmbeddings {
let embeddings: Vec<proto::collab::CollabEmbeddingsParams> =
self.params.iter().map(|param| param.to_proto()).collect();
proto::collab::CollabEmbeddings {
tokens_consumed: self.tokens_consumed,
embeddings,
}
}
}
/// Type of content stored by the embedding.
/// Currently only plain text of the document is supported.
/// In the future, we might support other kinds like i.e. PDF, images or image-extracted text.

View file

@ -57,12 +57,13 @@ WHERE w.workspace_id = $1"#,
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "af_fragment", no_pg_array)]
#[sqlx(type_name = "af_fragment_v2", no_pg_array)]
struct Fragment {
fragment_id: String,
content_type: i32,
contents: String,
embedding: Option<Vector>,
metadata: serde_json::Value,
}
impl From<AFCollabEmbeddedChunk> for Fragment {
@ -72,13 +73,14 @@ impl From<AFCollabEmbeddedChunk> for Fragment {
content_type: value.content_type as i32,
contents: value.content,
embedding: value.embedding.map(Vector::from),
metadata: value.metadata,
}
}
}
impl PgHasArrayType for Fragment {
fn array_type_info() -> PgTypeInfo {
PgTypeInfo::with_name("af_fragment[]")
PgTypeInfo::with_name("af_fragment_v2[]")
}
}
@ -96,7 +98,7 @@ pub async fn upsert_collab_embeddings(
let collab_type = records[0].collab_type.clone();
let fragments = records.into_iter().map(Fragment::from).collect::<Vec<_>>();
sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment[])"#)
sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment_v2[])"#)
.bind(*workspace_id)
.bind(object_id)
.bind(crate::collab::partition_key_from_collab_type(&collab_type))

View file

@ -0,0 +1,42 @@
-- Add migration script here
ALTER TABLE af_collab_embeddings
ADD COLUMN metadata JSONB DEFAULT '{}'::jsonb;
CREATE TYPE af_fragment_v2 AS (
fragment_id TEXT,
content_type INT,
contents TEXT,
embedding VECTOR(1536),
metadata JSONB
);
CREATE OR REPLACE PROCEDURE af_collab_embeddings_upsert(
IN p_workspace_id UUID,
IN p_oid TEXT,
IN p_partition_key INT,
IN p_tokens_used INT,
IN p_fragments af_fragment_v2[]
)
LANGUAGE plpgsql
AS $$
BEGIN
DELETE FROM af_collab_embeddings WHERE oid = p_oid;
INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at, metadata)
SELECT
f.fragment_id,
p_oid,
p_partition_key,
f.content_type,
f.contents,
f.embedding,
NOW(),
f.metadata
FROM UNNEST(p_fragments) as f;
-- Update the usage tracking table
INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed)
VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used)
ON CONFLICT (created_at, workspace_id)
DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used;
END
$$;

View file

@ -12,6 +12,7 @@ use collab_document::document::DocumentBody;
use collab_document::error::DocumentError;
use collab_entity::CollabType;
use database_entity::dto::{AFCollabEmbeddedChunk, AFCollabEmbeddings, EmbeddingContentType};
use serde_json::json;
use tracing::trace;
use uuid::Uuid;
@ -106,6 +107,8 @@ fn split_text_into_chunks(
// We assume that every token is ~4 bytes. We're going to split document content into fragments
// of ~2000 tokens each.
let split_contents = split_text_by_max_content_len(content, 8000)?;
let metadata =
json!({"id": object_id, "source": "appflowy", "name": "document", "collab_type": collab_type });
Ok(
split_contents
.into_iter()
@ -116,6 +119,7 @@ fn split_text_into_chunks(
content_type: EmbeddingContentType::PlainText,
content,
embedding: None,
metadata: metadata.clone(),
})
.collect(),
)

View file

@ -2397,7 +2397,7 @@ async fn collab_full_sync_handler(
uid,
device_id,
connect_at: timestamp(),
session_id: uuid::Uuid::new_v4().to_string(),
session_id: Uuid::new_v4().to_string(),
app_version,
};