add docs and tests for embedding related (#1328)

* chore: add docs and tests

* chore: add test

* chore: fix typo

* chore: clippy
This commit is contained in:
Nathan.fooo 2025-04-10 23:12:53 +08:00 committed by GitHub
parent f300884dde
commit 3e65dd1014
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 917 additions and 182 deletions

View file

@ -0,0 +1,63 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n fragment_id,\n content_type,\n content,\n embedding as \"embedding!: Option<Vector>\",\n metadata,\n fragment_index,\n embedder_type\n FROM af_collab_embeddings\n WHERE oid = $1\n ORDER BY fragment_index\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "fragment_id",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "content_type",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "content",
"type_info": "Text"
},
{
"ordinal": 3,
"name": "embedding!: Option<Vector>",
"type_info": {
"Custom": {
"name": "vector",
"kind": "Simple"
}
}
},
{
"ordinal": 4,
"name": "metadata",
"type_info": "Jsonb"
},
{
"ordinal": 5,
"name": "fragment_index",
"type_info": "Int4"
},
{
"ordinal": 6,
"name": "embedder_type",
"type_info": "Int2"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
true,
true,
true,
true,
true
]
},
"hash": "865fe86df6d04f8abb6d477af13f8a2392a742f4027d99c290f0f156df48be07"
}

111
Cargo.lock generated
View file

@ -1009,6 +1009,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "auto_enums"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781"
dependencies = [
"derive_utils",
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "autocfg"
version = "1.3.0"
@ -1990,8 +2002,8 @@ dependencies = [
"serde_json",
"serde_repr",
"sha2",
"strum",
"strum_macros",
"strum 0.25.0",
"strum_macros 0.25.3",
"thiserror 1.0.63",
"tokio",
"tokio-stream",
@ -2367,6 +2379,15 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
[[package]]
name = "core_maths"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77745e017f5edba1a9c1d854f6f3a52dac8a12dd5af5d2f54aecf61e43d80d30"
dependencies = [
"libm",
]
[[package]]
name = "cpufeatures"
version = "0.2.12"
@ -2747,6 +2768,17 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "derive_utils"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -2820,9 +2852,9 @@ checksum = "3a68a4904193147e0a8dec3314640e6db742afd5f6e634f428a6af230d9b3591"
[[package]]
name = "either"
version = "1.13.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
dependencies = [
"serde",
]
@ -3858,6 +3890,28 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "icu_segmenter"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a717725612346ffc2d7b42c94b820db6908048f39434504cb130e8b46256b0de"
dependencies = [
"core_maths",
"displaydoc",
"icu_collections",
"icu_locid",
"icu_provider",
"icu_segmenter_data",
"utf8_iter",
"zerovec",
]
[[package]]
name = "icu_segmenter_data"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1e52775179941363cc594e49ce99284d13d6948928d8e72c755f55e98caa1eb"
[[package]]
name = "ident_case"
version = "1.0.1"
@ -3935,6 +3989,7 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
"text-splitter",
"thiserror 1.0.63",
"tiktoken-rs",
"tokio",
@ -4031,6 +4086,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.11"
@ -6862,6 +6926,15 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
[[package]]
name = "strum"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32"
dependencies = [
"strum_macros 0.27.1",
]
[[package]]
name = "strum_macros"
version = "0.25.3"
@ -6875,6 +6948,19 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "strum_macros"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8"
dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.90",
]
[[package]]
name = "subtle"
version = "2.6.1"
@ -6997,6 +7083,23 @@ dependencies = [
"utf-8",
]
[[package]]
name = "text-splitter"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8130aecc3b7938ce3ea387d7615eca92bd4f702a5adc0548ba930a9c039dda4"
dependencies = [
"ahash 0.8.11",
"auto_enums",
"either",
"icu_provider",
"icu_segmenter",
"itertools 0.14.0",
"memchr",
"strum 0.27.1",
"thiserror 2.0.12",
]
[[package]]
name = "thin-vec"
version = "0.2.13"

View file

@ -38,8 +38,8 @@ use client_api_entity::{
};
use client_api_entity::{GotrueTokenResponse, UpdateGotrueUserParams, User};
use semver::Version;
use shared_entity::dto::auth_dto::SignInTokenResponse;
use shared_entity::dto::auth_dto::UpdateUserParams;
use shared_entity::dto::auth_dto::{SignInPasswordResponse, SignInTokenResponse};
use shared_entity::dto::workspace_dto::WorkspaceSpaceUsage;
use shared_entity::response::{AppResponse, AppResponseError};
use std::sync::atomic::{AtomicBool, Ordering};
@ -253,7 +253,7 @@ impl Client {
&self,
email: &str,
password: &str,
) -> Result<GotrueTokenResponse, AppResponseError> {
) -> Result<SignInPasswordResponse, AppResponseError> {
let response = self
.gotrue_client
.token(&Grant::Password(PasswordGrant {
@ -261,9 +261,12 @@ impl Client {
password: password.to_owned(),
}))
.await?;
let _ = self.verify_token_cloud(&response.access_token).await?;
let is_new = self.verify_token_cloud(&response.access_token).await?;
self.token.write().set(response.clone());
Ok(response)
Ok(SignInPasswordResponse {
gotrue_response: response,
is_new,
})
}
/// Sign in with magic link

View file

@ -763,16 +763,26 @@ pub struct AFCollabEmbeddedChunk {
pub object_id: Uuid,
pub content_type: EmbeddingContentType,
pub content: Option<String>,
/// The semantic embedding vector for the content.
/// - Defaults to `None`.
/// - Will remain `None` if `content` is missing.
pub embedding: Option<Vec<f32>>,
pub metadata: serde_json::Value,
pub fragment_index: i32,
pub embedded_type: i16,
}
impl AFCollabEmbeddedChunk {
pub fn mark_as_duplicate(&mut self) {
self.embedding = None;
self.content = None;
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct AFCollabEmbeddings {
pub tokens_consumed: u32,
pub params: Vec<AFCollabEmbeddedChunk>,
pub chunks: Vec<AFCollabEmbeddedChunk>,
}
/// Type of content stored by the embedding.

View file

@ -5,6 +5,7 @@ use database_entity::dto::{AFCollabEmbeddedChunk, IndexingStatus, QueryCollab, Q
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use pgvector::Vector;
use serde_json::json;
use sqlx::pool::PoolConnection;
use sqlx::postgres::{PgHasArrayType, PgTypeInfo};
use sqlx::{Error, Executor, Postgres, Transaction};
@ -61,14 +62,14 @@ WHERE w.workspace_id = $1"#,
#[derive(sqlx::Type)]
#[sqlx(type_name = "af_fragment_v3", no_pg_array)]
struct Fragment {
fragment_id: String,
content_type: i32,
contents: Option<String>,
embedding: Option<Vector>,
metadata: serde_json::Value,
fragment_index: i32,
embedded_type: i16,
pub struct Fragment {
pub fragment_id: String,
pub content_type: i32,
pub contents: Option<String>,
pub embedding: Option<Vector>,
pub metadata: serde_json::Value,
pub fragment_index: i32,
pub embedded_type: i16,
}
impl From<AFCollabEmbeddedChunk> for Fragment {
@ -96,9 +97,9 @@ pub async fn upsert_collab_embeddings(
workspace_id: &Uuid,
object_id: &Uuid,
tokens_used: u32,
records: Vec<AFCollabEmbeddedChunk>,
chunks: Vec<AFCollabEmbeddedChunk>,
) -> Result<(), sqlx::Error> {
let fragments = records.into_iter().map(Fragment::from).collect::<Vec<_>>();
let fragments = chunks.into_iter().map(Fragment::from).collect::<Vec<_>>();
tracing::trace!(
"[Embedding] upsert {} {} fragments, fragment ids: {:?}",
object_id,
@ -118,6 +119,48 @@ pub async fn upsert_collab_embeddings(
Ok(())
}
pub async fn get_collab_embedding_fragment<'a, E>(
tx: E,
object_id: &Uuid,
) -> Result<Vec<Fragment>, sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
let rows = sqlx::query!(
r#"
SELECT
fragment_id,
content_type,
content,
embedding as "embedding!: Option<Vector>",
metadata,
fragment_index,
embedder_type
FROM af_collab_embeddings
WHERE oid = $1
ORDER BY fragment_index
"#,
object_id
)
.fetch_all(tx)
.await?;
let fragments = rows
.into_iter()
.map(|row| Fragment {
fragment_id: row.fragment_id,
content_type: row.content_type,
contents: row.content,
embedding: row.embedding,
metadata: row.metadata.unwrap_or_else(|| json!({})),
fragment_index: row.fragment_index.unwrap_or(0),
embedded_type: row.embedder_type.unwrap_or(0),
})
.collect();
Ok(fragments)
}
pub async fn get_collab_embedding_fragment_ids<'a, E>(
tx: E,
collab_ids: Vec<Uuid>,

View file

@ -35,4 +35,5 @@ redis = { workspace = true, features = [
secrecy = { workspace = true, features = ["serde"] }
reqwest.workspace = true
twox-hash = { version = "2.1.0", features = ["xxhash64"] }
async-openai = "0.28.0"
async-openai = "0.28.0"
text-splitter = { version = "0.25.1" }

228
libs/indexer/README.md Normal file
View file

@ -0,0 +1,228 @@
## Embedding Architecture (Do not edit, AI generated)
The indexing system consists of several interconnected components:
```mermaid
graph TD
A[Document Content] --> B[Indexer]
B --> C[Text Chunking]
C --> D[Embedding Generation]
D --> E[Database Storage]
F[Scheduler] --> B
G[Environment Configuration] --> F
H[Redis Queue] <--> F
I[IndexerProvider] --> B
```
### Key Components
1. **Indexer**: Interface that defines methods for creating and embedding content chunks
2. **DocumentIndexer**: Implementation of Indexer specifically for document content
3. **IndexerProvider**: Factory that resolves the appropriate Indexer based on content type
4. **Scheduler**: Manages embedding tasks and coordinates the embedding process
5. **AFEmbedder**: Provides embedding capabilities through OpenAI or Azure OpenAI APIs
## Embedding Process Flow
### 1. Content Preparation
There are multiple paths that can trigger embedding generation in the system. Each path ultimately processes document
content, but they differ in how they're initiated and handled:
```mermaid
flowchart TD
A[Document Creation/Update] --> B{Embedding Path}
B -->|Immediate Indexing| C[index_collab_immediately]
B -->|Background Indexing| D[index_pending_collab_one/index_pending_collabs]
B -->|Redis Stream| E[read_background_embed_tasks]
B -->|Batch Processing| F[index_workspace]
C --> G[embed_immediately]
D --> H[embed_in_background]
E --> I[generate_embeddings_loop]
F --> J[index_then_write_embedding_to_disk]
G --> K[Indexer Processing]
H --> L[Redis Queue]
I --> K
J --> K
L --> I
K --> M[create_embedded_chunks_from_collab/Text]
M --> N[embed]
N --> O[Database Storage]
```
#### Path 1: Immediate Indexing
When a document is created or updated and requires immediate indexing:
```mermaid
sequenceDiagram
participant App as AppFlowy
participant Scheduler as IndexerScheduler
participant Indexer as DocumentIndexer
participant Embedder as AFEmbedder
participant DB as Database
App ->> Scheduler: index_collab_immediately(workspace_id, object_id, collab, collab_type)
Scheduler ->> Scheduler: embed_immediately(UnindexedCollabTask)
Scheduler ->> Indexer: create_embedded_chunks_from_text(object_id, paragraphs, model)
Indexer ->> Indexer: Split text into chunks
Indexer ->> Embedder: embed(chunks)
Embedder ->> DB: Store embeddings via batch_insert_records
```
#### Path 2: Background Indexing
For non-urgent indexing, tasks are queued for background processing:
```mermaid
sequenceDiagram
participant App as AppFlowy
participant Scheduler as IndexerScheduler
participant Redis as Redis Queue
participant Worker as Background Worker
participant Indexer as DocumentIndexer
participant DB as Database
App ->> Scheduler: index_pending_collab_one/index_pending_collabs
Scheduler ->> Scheduler: embed_in_background(tasks)
Scheduler ->> Redis: add_background_embed_task(tasks)
Worker ->> Redis: read_background_embed_tasks()
Worker ->> Worker: generate_embeddings_loop
Worker ->> Indexer: create_embedded_chunks_from_text
Indexer ->> Worker: Return chunks
Worker ->> DB: batch_insert_records
```
#### Path 3: Batch Processing
For processing multiple unindexed documents at once, typically used for initial indexing or catch-up processing:
```mermaid
sequenceDiagram
participant System as System Process
participant IndexProcess as index_workspace
participant Storage as CollabStorage
participant Indexer as DocumentIndexer
participant DB as Database
System ->> IndexProcess: index_workspace(scheduler, workspace_id)
IndexProcess ->> DB: stream_unindexed_collabs
IndexProcess ->> Storage: get_encode_collab
IndexProcess ->> IndexProcess: index_then_write_embedding_to_disk
IndexProcess ->> Indexer: create_embeddings(embedder, provider, collabs)
Indexer ->> DB: batch_insert_records
```
In all these paths, the content goes through similar processing steps:
1. Document content extraction (paragraphs from document)
2. Text chunking (grouping paragraphs into manageable chunks)
3. Embedding generation via the AI service
4. Storage in the database
### 2. Chunking Strategy
Documents are broken into manageable chunks for effective embedding:
1. The system extracts paragraphs from the document
2. Paragraphs are grouped into chunks of approximately 8000 characters
3. A consistent hash is generated for each chunk to avoid duplicate processing
4. Each chunk is prepared as an `AFCollabEmbeddedChunk` with metadata
```mermaid
graph LR
A[Full Document] --> B[Extract Paragraphs]
B --> C[Group Paragraphs]
C --> D[Generate Content Hash]
D --> E[Create Embedded Chunks]
```
### 3. Embedding Generation
The actual embedding creation happens via OpenAI or Azure's API:
1. Chunks are sent to the embedding service (OpenAI or Azure)
2. The API returns vectors for each chunk
3. Vectors are associated with their original chunks
4. Complete embeddings are stored in the database
## Technical Implementation
### Fragment Processing
```rust
fn split_text_into_chunks(
object_id: Uuid,
paragraphs: Vec<String>,
collab_type: CollabType,
embedding_model: EmbeddingModel,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError> {
// Group paragraphs into chunks of roughly 8000 characters
let split_contents = group_paragraphs_by_max_content_len(paragraphs, 8000);
// Create metadata for the chunks
let metadata = json!({
"id": object_id,
"source": "appflowy",
"name": "document",
"collab_type": collab_type
});
// Track seen fragments to avoid duplicates
let mut seen = std::collections::HashSet::new();
let mut chunks = Vec::new();
// Process each content chunk
for (index, content) in split_contents.into_iter().enumerate() {
// Generate consistent hash for deduplication
let consistent_hash = Hasher::oneshot(0, content.as_bytes());
let fragment_id = format!("{:x}", consistent_hash);
// Only add new fragments
if seen.insert(fragment_id.clone()) {
chunks.push(AFCollabEmbeddedChunk {
fragment_id,
object_id,
content_type: EmbeddingContentType::PlainText,
content: Some(content),
embedding: None,
metadata: metadata.clone(),
fragment_index: index as i32,
embedded_type: 0,
});
}
}
Ok(chunks)
}
```
## Embedding Storage
Embeddings are stored in the database with their associated metadata:
```mermaid
erDiagram
COLLAB_EMBEDDING ||--o{ EMBEDDING_FRAGMENT: contains
COLLAB_EMBEDDING {
uuid object_id
uuid workspace_id
int collab_type
int tokens_used
timestamp indexed_at
}
EMBEDDING_FRAGMENT {
string fragment_id
uuid object_id
string content_type
vec embedding
json metadata
int fragment_index
}
```
## Configuration
The embedding system is configurable through environment variables:
- `APPFLOWY_INDEXER_ENABLED`: Enable/disable the indexing system (default: `true`)
- `APPFLOWY_INDEXER_SCHEDULER_NUM_THREAD`: Number of threads for processing (default: `50`)
- `AI_OPENAI_API_KEY`: OpenAI API key for embeddings
- `AI_AZURE_OPENAI_API_KEY`, `AI_AZURE_OPENAI_API_BASE`, `AI_AZURE_OPENAI_API_VERSION`: Azure OpenAI configuration

View file

@ -8,8 +8,8 @@ use async_openai::types::{CreateEmbeddingRequestArgs, EmbeddingInput, EncodingFo
use async_trait::async_trait;
use collab::preclude::Collab;
use collab_document::document::DocumentBody;
use collab_entity::CollabType;
use database_entity::dto::{AFCollabEmbeddedChunk, AFCollabEmbeddings, EmbeddingContentType};
use infra::env_util::get_env_var;
use serde_json::json;
use tracing::{debug, trace, warn};
use twox_hash::xxhash64::Hasher;
@ -50,22 +50,42 @@ impl Indexer for DocumentIndexer {
return Ok(vec![]);
}
split_text_into_chunks(object_id, paragraphs, CollabType::Document, model)
// Group paragraphs into chunks of roughly 8000 characters.
split_text_into_chunks(
object_id,
paragraphs,
model,
get_env_var("APPFLOWY_EMBEDDING_CHUNK_SIZE", "8000")
.parse::<usize>()
.unwrap_or(8000),
get_env_var("APPFLOWY_EMBEDDING_CHUNK_OVERLAP", "500")
.parse::<usize>()
.unwrap_or(500),
)
}
async fn embed(
&self,
embedder: &AFEmbedder,
mut content: Vec<AFCollabEmbeddedChunk>,
mut chunks: Vec<AFCollabEmbeddedChunk>,
) -> Result<Option<AFCollabEmbeddings>, AppError> {
if content.is_empty() {
let mut valid_indices = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
if let Some(ref content) = chunk.content {
if !content.is_empty() {
valid_indices.push(i);
}
}
}
if valid_indices.is_empty() {
return Ok(None);
}
let contents: Vec<_> = content
.iter()
.map(|fragment| fragment.content.clone().unwrap_or_default())
.collect();
let mut contents = Vec::with_capacity(valid_indices.len());
for &i in &valid_indices {
contents.push(chunks[i].content.as_ref().unwrap().to_owned());
}
let request = CreateEmbeddingRequestArgs::default()
.model(embedder.model().name())
@ -78,30 +98,39 @@ impl Indexer for DocumentIndexer {
let resp = embedder.async_embed(request).await?;
trace!(
"[Embedding] request {} embeddings, received {} embeddings",
content.len(),
"[Embedding] requested {} embeddings, received {} embeddings",
valid_indices.len(),
resp.data.len()
);
if resp.data.len() != valid_indices.len() {
return Err(AppError::Unhandled(format!(
"Mismatch in number of embeddings requested and received: {} vs {}",
valid_indices.len(),
resp.data.len()
)));
}
for embedding in resp.data {
let param = &mut content[embedding.index as usize];
if param.content.is_some() {
param.embedding = Some(embedding.embedding);
}
let chunk_idx = valid_indices[embedding.index as usize];
chunks[chunk_idx].embedding = Some(embedding.embedding);
}
Ok(Some(AFCollabEmbeddings {
tokens_consumed: resp.usage.total_tokens,
params: content,
chunks,
}))
}
}
fn split_text_into_chunks(
pub fn split_text_into_chunks(
object_id: Uuid,
paragraphs: Vec<String>,
collab_type: CollabType,
embedding_model: EmbeddingModel,
chunk_size: usize,
overlap: usize,
) -> Result<Vec<AFCollabEmbeddedChunk>, AppError> {
// we only support text embedding 3 small for now
debug_assert!(matches!(
embedding_model,
EmbeddingModel::TextEmbedding3Small
@ -110,13 +139,11 @@ fn split_text_into_chunks(
if paragraphs.is_empty() {
return Ok(vec![]);
}
// Group paragraphs into chunks of roughly 8000 characters.
let split_contents = group_paragraphs_by_max_content_len(paragraphs, 8000);
let split_contents = group_paragraphs_by_max_content_len(paragraphs, chunk_size, overlap);
let metadata = json!({
"id": object_id,
"source": "appflowy",
"name": "document",
"collab_type": collab_type
});
let mut seen = std::collections::HashSet::new();

View file

@ -15,7 +15,7 @@ pub struct EmbeddingRecord {
pub object_id: Uuid,
pub collab_type: CollabType,
pub tokens_used: u32,
pub contents: Vec<AFCollabEmbeddedChunk>,
pub chunks: Vec<AFCollabEmbeddedChunk>,
}
impl EmbeddingRecord {
@ -25,7 +25,7 @@ impl EmbeddingRecord {
object_id,
collab_type,
tokens_used: 0,
contents: vec![],
chunks: vec![],
}
}
}

View file

@ -15,7 +15,6 @@ use database::index::{
get_collab_embedding_fragment_ids, update_collab_indexed_at, upsert_collab_embeddings,
};
use database::workspace::select_workspace_settings;
use database_entity::dto::AFCollabEmbeddedChunk;
use infra::env_util::get_env_var;
use redis::aio::ConnectionManager;
use serde::{Deserialize, Serialize};
@ -350,18 +349,12 @@ async fn generate_embeddings_loop(
if let Some(fragment_ids) = existing_embeddings.get(&record.object_id) {
for chunk in chunks.iter_mut() {
if fragment_ids.contains(&chunk.fragment_id) {
// we already had an embedding for this chunk
chunk.content = None;
chunk.embedding = None;
chunk.mark_as_duplicate();
}
}
}
join_set.spawn(async move {
if is_collab_embedded_chunks_empty(&chunks) {
return Ok(None);
}
let result = indexer.embed(&embedder, chunks).await;
match result {
Ok(Some(embeddings)) => {
@ -370,7 +363,7 @@ async fn generate_embeddings_loop(
object_id: record.object_id,
collab_type: record.collab_type,
tokens_used: embeddings.tokens_consumed,
contents: embeddings.params,
chunks: embeddings.chunks,
};
Ok(Some(record))
},
@ -493,7 +486,7 @@ pub(crate) async fn batch_insert_records(
&record.workspace_id,
&record.object_id,
record.tokens_used,
record.contents,
record.chunks,
)
.await?;
}
@ -545,9 +538,3 @@ impl UnindexedData {
}
}
}
#[inline]
/// All chunks are empty if all of them have no content
pub fn is_collab_embedded_chunks_empty(chunks: &[AFCollabEmbeddedChunk]) -> bool {
chunks.iter().all(|chunk| chunk.content.is_none())
}

View file

@ -22,6 +22,14 @@ use tokio::task::JoinSet;
use tracing::{error, info, trace};
use uuid::Uuid;
/// # index given workspace
///
/// Continuously processes and creates embeddings for unindexed collabs in a specified workspace.
///
/// This function runs in an infinite loop until a connection to the database cannot be established
/// for an extended period. It streams unindexed collabs from the database in batches, processes them
/// to create embeddings, and writes those embeddings back to the database.
///
#[allow(dead_code)]
pub(crate) async fn index_workspace(scheduler: Arc<IndexerScheduler>, workspace_id: Uuid) {
let mut retry_delay = Duration::from_secs(2);
@ -51,16 +59,16 @@ pub(crate) async fn index_workspace(scheduler: Arc<IndexerScheduler>, workspace_
continue;
}
index_then_write_embedding_to_disk(&scheduler, std::mem::take(&mut unindexed_collabs)).await;
_index_then_write_embedding_to_disk(&scheduler, std::mem::take(&mut unindexed_collabs)).await;
}
if !unindexed_collabs.is_empty() {
index_then_write_embedding_to_disk(&scheduler, unindexed_collabs).await;
_index_then_write_embedding_to_disk(&scheduler, unindexed_collabs).await;
}
}
}
async fn index_then_write_embedding_to_disk(
async fn _index_then_write_embedding_to_disk(
scheduler: &Arc<IndexerScheduler>,
unindexed_collabs: Vec<UnindexedCollab>,
) {
@ -80,7 +88,7 @@ async fn index_then_write_embedding_to_disk(
.collect::<Vec<_>>();
match get_collab_embedding_fragment_ids(&scheduler.pg_pool, object_ids).await {
Ok(existing_embeddings) => {
let embeddings = create_embeddings(
let embeddings = _create_embeddings(
embedder,
&scheduler.indexer_provider,
unindexed_collabs,
@ -156,7 +164,7 @@ async fn stream_unindexed_collabs(
})
.boxed()
}
async fn create_embeddings(
async fn _create_embeddings(
embedder: AFEmbedder,
indexer_provider: &Arc<IndexerProvider>,
unindexed_records: Vec<UnindexedCollab>,
@ -177,13 +185,13 @@ async fn create_embeddings(
let embedder = embedder.clone();
if let Some(indexer) = indexer_provider.indexer_for(record.collab_type) {
join_set.spawn(async move {
match indexer.embed(&embedder, record.contents).await {
match indexer.embed(&embedder, record.chunks).await {
Ok(embeddings) => embeddings.map(|embeddings| EmbeddingRecord {
workspace_id: record.workspace_id,
object_id: record.object_id,
collab_type: record.collab_type,
tokens_used: embeddings.tokens_consumed,
contents: embeddings.params,
chunks: embeddings.chunks,
}),
Err(err) => {
error!("Failed to embed collab: {}", err);
@ -242,8 +250,7 @@ fn compute_embedding_records(
if let Some(existing_embeddings) = existing_embeddings.get(&unindexed.object_id) {
for chunk in chunks.iter_mut() {
if existing_embeddings.contains(&chunk.fragment_id) {
chunk.content = None; // mark as already embedded
chunk.embedding = None;
chunk.mark_as_duplicate();
}
}
}
@ -252,7 +259,7 @@ fn compute_embedding_records(
object_id: unindexed.object_id,
collab_type: unindexed.collab_type,
tokens_used: 0,
contents: chunks,
chunks,
})
})
.collect()

View file

@ -3,8 +3,9 @@ use appflowy_ai_client::dto::EmbeddingModel;
use async_openai::config::{AzureConfig, Config, OpenAIConfig};
use async_openai::types::{CreateEmbeddingRequest, CreateEmbeddingResponse};
use async_openai::Client;
use text_splitter::{ChunkConfig, TextSplitter};
use tiktoken_rs::CoreBPE;
use tracing::trace;
use tracing::{trace, warn};
pub const OPENAI_EMBEDDINGS_URL: &str = "https://api.openai.com/v1/embeddings";
@ -135,26 +136,56 @@ pub fn split_text_by_max_tokens(
Ok(chunks)
}
/// Groups a list of paragraphs into chunks that fit within a specified maximum content length.
///
/// takes a vector of paragraph strings and combines them into larger chunks,
/// ensuring that each chunk's total byte length does not exceed the given `context_size`.
/// Paragraphs are concatenated directly without additional separators. If a single paragraph
/// exceeds the `context_size`, it is included as its own chunk without truncation.
///
/// # Arguments
/// * `paragraphs` - A vector of strings, where each string represents a paragraph.
/// * `context_size` - The maximum byte length allowed for each chunk.
///
/// # Returns
/// A vector of strings, where each string is a chunk of concatenated paragraphs that fits
/// within the `context_size`. If the input is empty, returns an empty vector.
pub fn group_paragraphs_by_max_content_len(
paragraphs: Vec<String>,
max_content_len: usize,
mut context_size: usize,
overlap: usize,
) -> Vec<String> {
if paragraphs.is_empty() {
return vec![];
}
let mut result = Vec::new();
let mut current = String::new();
let mut current = String::with_capacity(context_size.min(4096));
if overlap > context_size {
warn!("context_size is smaller than overlap, which may lead to unexpected behavior.");
context_size = 2 * overlap;
}
let chunk_config = ChunkConfig::new(context_size)
.with_overlap(overlap)
.unwrap();
let splitter = TextSplitter::new(chunk_config);
for paragraph in paragraphs {
if paragraph.len() + current.len() > max_content_len {
// if we add the paragraph to the current content, it will exceed the limit
// so we push the current content to the result set and start a new chunk
let accumulated = std::mem::replace(&mut current, paragraph);
if !accumulated.is_empty() {
result.push(accumulated);
if current.len() + paragraph.len() > context_size {
if !current.is_empty() {
result.push(std::mem::take(&mut current));
}
if paragraph.len() > context_size {
let paragraph_chunks = splitter.chunks(&paragraph);
result.extend(paragraph_chunks.map(String::from));
} else {
current.push_str(&paragraph);
}
} else {
// add the paragraph to the current chunk
// Add paragraph to current chunk
current.push_str(&paragraph);
}
}
@ -185,7 +216,7 @@ mod tests {
assert!(content.is_char_boundary(content.len()));
}
let params = group_paragraphs_by_max_content_len(vec![content], max_tokens);
let params = group_paragraphs_by_max_content_len(vec![content], max_tokens, 500);
for content in params {
assert!(content.is_char_boundary(0));
assert!(content.is_char_boundary(content.len()));
@ -222,7 +253,7 @@ mod tests {
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
assert_eq!(params.len(), 0);
let params = group_paragraphs_by_max_content_len(params, max_tokens);
let params = group_paragraphs_by_max_content_len(params, max_tokens, 500);
assert_eq!(params.len(), 0);
}
@ -237,11 +268,6 @@ mod tests {
for (param, emoji) in params.iter().zip(emojis.iter()) {
assert_eq!(param, emoji);
}
let params = group_paragraphs_by_max_content_len(params, max_tokens);
for (param, emoji) in params.iter().zip(emojis.iter()) {
assert_eq!(param, emoji);
}
}
#[test]
@ -256,7 +282,7 @@ mod tests {
let reconstructed_content = params.join("");
assert_eq!(reconstructed_content, content);
let params = group_paragraphs_by_max_content_len(params, max_tokens);
let params = group_paragraphs_by_max_content_len(params, max_tokens, 500);
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
@ -286,7 +312,7 @@ mod tests {
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
let params = group_paragraphs_by_max_content_len(params, max_tokens);
let params = group_paragraphs_by_max_content_len(params, max_tokens, 500);
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
@ -304,7 +330,7 @@ mod tests {
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
let params = group_paragraphs_by_max_content_len(params, max_tokens);
let params = group_paragraphs_by_max_content_len(params, max_tokens, 10);
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
@ -318,7 +344,7 @@ mod tests {
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
let params = group_paragraphs_by_max_content_len(params, max_tokens);
let params = group_paragraphs_by_max_content_len(params, max_tokens, 10);
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
@ -332,72 +358,144 @@ mod tests {
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
let params = group_paragraphs_by_max_content_len(params, max_tokens);
let params = group_paragraphs_by_max_content_len(params, max_tokens, 10);
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
}
// #[cfg(test)]
// mod execution_time_comparison_tests {
// use crate::indexer::document_indexer::split_text_by_max_tokens;
// use rand::distributions::Alphanumeric;
// use rand::{thread_rng, Rng};
// use std::sync::Arc;
// use std::time::Instant;
// use tiktoken_rs::{cl100k_base, CoreBPE};
//
// #[tokio::test]
// async fn test_execution_time_comparison() {
// let tokenizer = Arc::new(cl100k_base().unwrap());
// let max_tokens = 100;
//
// let sizes = vec![500, 1000, 2000, 5000, 20000]; // Content sizes to test
// for size in sizes {
// let content = generate_random_string(size);
//
// // Measure direct execution time
// let direct_time = measure_direct_execution(content.clone(), max_tokens, &tokenizer);
//
// // Measure spawn_blocking execution time
// let spawn_blocking_time =
// measure_spawn_blocking_execution(content, max_tokens, Arc::clone(&tokenizer)).await;
//
// println!(
// "Content Size: {} | Direct Time: {}ms | spawn_blocking Time: {}ms",
// size, direct_time, spawn_blocking_time
// );
// }
// }
//
// // Measure direct execution time
// fn measure_direct_execution(content: String, max_tokens: usize, tokenizer: &CoreBPE) -> u128 {
// let start = Instant::now();
// split_text_by_max_tokens(content, max_tokens, tokenizer).unwrap();
// start.elapsed().as_millis()
// }
//
// // Measure `spawn_blocking` execution time
// async fn measure_spawn_blocking_execution(
// content: String,
// max_tokens: usize,
// tokenizer: Arc<CoreBPE>,
// ) -> u128 {
// let start = Instant::now();
// tokio::task::spawn_blocking(move || {
// split_text_by_max_tokens(content, max_tokens, tokenizer.as_ref()).unwrap()
// })
// .await
// .unwrap();
// start.elapsed().as_millis()
// }
//
// pub fn generate_random_string(len: usize) -> String {
// let rng = thread_rng();
// rng
// .sample_iter(&Alphanumeric)
// .take(len)
// .map(char::from)
// .collect()
// }
// }
#[test]
fn test_multiple_paragraphs_single_chunk() {
let paragraphs = vec![
"First paragraph.".to_string(),
"Second paragraph.".to_string(),
"Third paragraph.".to_string(),
];
let result = group_paragraphs_by_max_content_len(paragraphs, 100, 5);
assert_eq!(result.len(), 1);
assert_eq!(
result[0],
"First paragraph.Second paragraph.Third paragraph."
);
}
#[test]
fn test_large_paragraph_splitting() {
// Create a paragraph larger than context size
let large_paragraph = "A".repeat(50);
let paragraphs = vec![
"Small paragraph.".to_string(),
large_paragraph.clone(),
"Another small one.".to_string(),
];
let result = group_paragraphs_by_max_content_len(paragraphs, 30, 10);
// Expected: "Small paragraph." as one chunk, then multiple chunks for the large paragraph,
// then "Another small one." as the final chunk
assert!(result.len() > 3); // At least 4 chunks (1 + at least 2 for large + 1)
assert_eq!(result[0], "Small paragraph.");
assert!(result[1].starts_with("A"));
// Check that all chunks of the large paragraph together contain the original text
let large_chunks = &result[1..result.len() - 1];
let reconstructed = large_chunks.join("");
// Due to overlaps, the reconstructed text might be longer
assert!(large_paragraph.len() <= reconstructed.len());
assert!(reconstructed.chars().all(|c| c == 'A'));
}
#[test]
fn test_overlap_larger_than_context_size() {
let paragraphs = vec![
"First paragraph.".to_string(),
"Second very long paragraph that needs to be split.".to_string(),
];
// Overlap larger than context size
let result = group_paragraphs_by_max_content_len(paragraphs, 10, 20);
// Check that the function didn't panic and produced reasonable output
assert!(!result.is_empty());
assert_eq!(result[0], "First paragraph.");
assert_eq!(result[1], "Second very long paragraph that needs to");
assert_eq!(result[2], "that needs to be split.");
assert!(result.iter().all(|chunk| chunk.len() <= 40));
}
#[test]
fn test_exact_fit() {
let paragraph1 = "AAAA".to_string(); // 4 bytes
let paragraph2 = "BBBB".to_string(); // 4 bytes
let paragraph3 = "CCCC".to_string(); // 4 bytes
let paragraphs = vec![paragraph1, paragraph2, paragraph3];
// Context size exactly fits 2 paragraphs
let result = group_paragraphs_by_max_content_len(paragraphs, 8, 2);
assert_eq!(result.len(), 2);
assert_eq!(result[0], "AAAABBBB");
assert_eq!(result[1], "CCCC");
}
#[test]
fn test_edit_paragraphs() {
// Create initial paragraphs and convert them to Strings.
let mut paragraphs = vec![
"Rust is a multiplayer survival game developed by Facepunch Studios,",
"Rust is a modern, system-level programming language designed with a focus on performance, safety, and concurrency. ",
"Rust as a Natural Process (Oxidation) refers to the chemical reaction that occurs when metals, primarily iron, come into contact with oxygen and moisture (water) over time, leading to the formation of iron oxide, commonly known as rust. This process is a form of oxidation, where a substance reacts with oxygen in the air or water, resulting in the degradation of the metal.",
]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
// Confirm the original lengths of the paragraphs.
assert_eq!(paragraphs[0].len(), 67);
assert_eq!(paragraphs[1].len(), 115);
assert_eq!(paragraphs[2].len(), 374);
// First grouping of paragraphs with a context size of 200 and overlap 100.
// Expecting 4 chunks based on the original paragraphs.
let result = group_paragraphs_by_max_content_len(paragraphs.clone(), 200, 100);
assert_eq!(result.len(), 4);
// Edit paragraph 0 by appending more text, simulating a content update.
paragraphs.get_mut(0).unwrap().push_str(
" first released in early access in December 2013 and fully launched in February 2018",
);
// After edit, confirm that paragraph 0's length is updated.
assert_eq!(paragraphs[0].len(), 151);
// Group paragraphs again after the edit.
// The change should cause a shift in grouping, expecting 5 chunks now.
let result_2 = group_paragraphs_by_max_content_len(paragraphs.clone(), 200, 100);
assert_eq!(result_2.len(), 5);
// Verify that parts of the original grouping (later chunks) remain the same:
// The third chunk from the first run should equal the fourth from the second run.
assert_eq!(result[2], result_2[3]);
// The fourth chunk from the first run should equal the fifth from the second run.
assert_eq!(result[3], result_2[4]);
// Edit paragraph 1 by appending extra text, simulating another update.
paragraphs
.get_mut(1)
.unwrap()
.push_str("It was created by Mozilla.");
// Group paragraphs once again after the second edit.
let result_3 = group_paragraphs_by_max_content_len(paragraphs.clone(), 200, 100);
assert_eq!(result_3.len(), 5);
// Confirm that the grouping for the unchanged parts is still consistent:
// The first chunk from the previous grouping (before editing paragraph 1) stays the same.
assert_eq!(result_2[0], result_3[0]);
// Similarly, the second and third chunks (from result_2) remain unchanged.
assert_eq!(result_2[2], result_3[2]);
// The fourth chunk in both groupings should still be identical.
assert_eq!(result_2[3], result_3[3]);
// And the fifth chunk in both groupings is compared for consistency.
assert_eq!(result_2[4], result_3[4]);
}
}

View file

@ -55,9 +55,9 @@ impl UpdateUserParams {
}
}
#[derive(serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct SignInPasswordResponse {
pub access_token_resp: GotrueTokenResponse,
pub gotrue_response: GotrueTokenResponse,
pub is_new: bool,
}

View file

@ -22,8 +22,7 @@ async fn wasm_sign_in_success() {
assert!(res.is_ok());
let val = res.unwrap();
assert!(val);
assert!(val.is_new);
}
#[wasm_bindgen_test]

View file

@ -8,9 +8,7 @@ use indexer::queue::{
ack_task, default_indexer_group_option, ensure_indexer_consumer_group,
read_background_embed_tasks,
};
use indexer::scheduler::{
is_collab_embedded_chunks_empty, spawn_pg_write_embeddings, UnindexedCollabTask, UnindexedData,
};
use indexer::scheduler::{spawn_pg_write_embeddings, UnindexedCollabTask, UnindexedData};
use indexer::thread_pool::ThreadPoolNoAbort;
use indexer::vector::embedder::{AFEmbedder, AzureConfig, OpenAIConfig};
use indexer::vector::open_ai;
@ -197,17 +195,13 @@ async fn process_upcoming_tasks(
}
}
join_set.spawn(async move {
if is_collab_embedded_chunks_empty(&chunks) {
return Ok::<_, AppError>(None);
}
let embeddings = indexer.embed(&embedder, chunks).await?;
Ok(embeddings.map(|embeddings| EmbeddingRecord {
Ok::<_, AppError>(embeddings.map(|embeddings| EmbeddingRecord {
workspace_id: task.workspace_id,
object_id: task.object_id,
collab_type: task.collab_type,
tokens_used: embeddings.tokens_consumed,
contents: embeddings.params,
chunks: embeddings.chunks,
}))
});
}

View file

@ -1,4 +1,4 @@
use crate::sql_test::util::{setup_db, test_create_user};
use crate::sql_test::util::{create_test_user, setup_db};
use database::chat::chat_ops::{
delete_chat, get_all_chat_messages, insert_chat, insert_question_message, select_chat,
select_chat_messages, select_chat_settings, update_chat_settings,
@ -19,7 +19,7 @@ async fn chat_crud_test(pool: PgPool) {
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
@ -72,7 +72,7 @@ async fn chat_message_crud_test(pool: PgPool) {
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
@ -195,7 +195,7 @@ async fn chat_setting_test(pool: PgPool) {
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
let workspace_id = user.workspace_id;

View file

@ -0,0 +1,126 @@
use crate::sql_test::util::{
create_test_collab_document, create_test_user, select_all_fragments, setup_db, upsert_test_chunks,
};
use appflowy_ai_client::dto::EmbeddingModel;
use indexer::collab_indexer::split_text_into_chunks;
use sqlx::PgPool;
// Book content broken into logical chunks for testing
const TEST_CHUNKS: [&str; 6] = [
"The Five Dysfunctions of a Team",
"Part I: Underachievement - Introduces Kathryn Petersen, the newly appointed CEO of DecisionTech, a struggling Silicon Valley startup with a dysfunctional executive team.",
"Part II: Lighting the Fire - Kathryn organizes an offsite meeting to build trust and introduce constructive conflict, encouraging open discussion about disagreements.",
"Part IV: Traction - The team experiences benefits of improved trust and open conflict, with accountability becoming routine and meetings increasingly productive.",
"The Model identifies five key dysfunctions: Absence of Trust, Fear of Conflict, Lack of Commitment, Avoidance of Accountability, and Inattention to Results.",
"The book provides practical strategies for building trust, encouraging conflict, ensuring commitment, embracing accountability, and focusing on collective results."
];
#[sqlx::test(migrations = false)]
async fn insert_collab_embedding_fragment_test(pool: PgPool) {
setup_db(&pool).await.unwrap();
let mut paragraphs = TEST_CHUNKS
.iter()
.map(|&s| s.to_string())
.collect::<Vec<_>>();
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
let doc_id = uuid::Uuid::new_v4();
let workspace_id = user.workspace_id;
create_test_collab_document(&pool, &user.uid, &workspace_id, &doc_id).await;
let chunks_1 = split_text_into_chunks(
doc_id,
paragraphs.clone(),
EmbeddingModel::TextEmbedding3Small,
500,
100,
)
.unwrap();
upsert_test_chunks(&pool, &workspace_id, &doc_id, chunks_1.clone()).await;
let fragments_1 = select_all_fragments(&pool, &doc_id).await;
assert_eq!(chunks_1.len(), fragments_1.len());
// simulate edit first paragraph.
paragraphs[0].push_str(" by Patrick Lencioni is a compelling exploration of team dynamics and the common pitfalls that undermine successful collaboration.");
let chunks_2 = split_text_into_chunks(
doc_id,
paragraphs.clone(),
EmbeddingModel::TextEmbedding3Small,
500,
100,
)
.unwrap();
// After edit, the first paragraph will be different. but the second paragraph will be the same.
assert_ne!(chunks_1[0].fragment_id, chunks_2[0].fragment_id);
assert_eq!(chunks_1[1].fragment_id, chunks_2[1].fragment_id);
assert_eq!(chunks_2.len(), 2);
// Simulate insert a new paragraph
paragraphs.insert(3, "Part III: Heavy Lifting - Focuses on accountability and responsibility, with Kathryn holding the team to high standards and addressing issues directly.".to_string(),);
let chunks_3 = split_text_into_chunks(
doc_id,
paragraphs.clone(),
EmbeddingModel::TextEmbedding3Small,
500,
100,
)
.unwrap();
// After insert a new paragraph, the second paragraph will be different, but the first will remain the same.
assert_eq!(chunks_2[0].fragment_id, chunks_3[0].fragment_id);
assert_ne!(chunks_2[1].fragment_id, chunks_3[1].fragment_id);
assert_eq!(chunks_3.len(), 3);
}
#[sqlx::test(migrations = false)]
async fn test_embed_over_context_size(pool: PgPool) {
setup_db(&pool).await.unwrap();
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
let doc_id = uuid::Uuid::new_v4();
let workspace_id = user.workspace_id;
create_test_collab_document(&pool, &user.uid, &workspace_id, &doc_id).await;
let content= "The Five Dysfunctions of a Team Part I: Underachievement - Introduces Kathryn Petersen, the newly appointed CEO of DecisionTech, a struggling Silicon Valley startup with a dysfunctional executive team. Part II: Lighting the Fire - Kathryn organizes an offsite meeting to build trust and introduce constructive conflict, encouraging open discussion about disagreements. Part IV: Traction - The team experiences benefits of improved trust and open conflict, with accountability becoming routine and meetings increasingly productive. The Model identifies five key dysfunctions: Absence of Trust, Fear of Conflict, Lack of Commitment, Avoidance of Accountability, and Inattention to Results. The book provides practical strategies for building trust, encouraging conflict, ensuring commitment, embracing accountability, and focusing on collective results.";
let chunks = split_text_into_chunks(
doc_id,
vec![content.to_string()],
EmbeddingModel::TextEmbedding3Small,
300,
100,
)
.unwrap();
assert_eq!(chunks.len(), 5);
upsert_test_chunks(&pool, &workspace_id, &doc_id, chunks.clone()).await;
let fragments = select_all_fragments(&pool, &doc_id).await;
assert_eq!(chunks.len(), fragments.len());
// Replace the content with a new one. It will cause all existing fragments to be deleted.
let content = "Hello world!";
let chunks = split_text_into_chunks(
doc_id,
vec![content.to_string()],
EmbeddingModel::TextEmbedding3Small,
300,
100,
)
.unwrap();
upsert_test_chunks(&pool, &workspace_id, &doc_id, chunks.clone()).await;
let fragments = select_all_fragments(&pool, &doc_id).await;
assert_eq!(fragments.len(), 1);
}

View file

@ -1,4 +1,4 @@
use crate::sql_test::util::{setup_db, test_create_user};
use crate::sql_test::util::{create_test_user, setup_db};
use collab_entity::CollabType;
use database::history::ops::{
get_latest_snapshot, get_latest_snapshot_state, get_snapshot_meta_list, insert_history,
@ -14,7 +14,7 @@ async fn insert_snapshot_test(pool: PgPool) {
let user_uuid = Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();

View file

@ -1,4 +1,5 @@
mod chat_test;
mod collab_embed_test;
mod history_test;
pub(crate) mod util;
mod workspace_test;

View file

@ -1,3 +1,9 @@
use bytes::Bytes;
use collab_document::document_data::default_document_collab_data;
use collab_entity::CollabType;
use database::collab::insert_into_af_collab;
use database::index::{get_collab_embedding_fragment, upsert_collab_embeddings, Fragment};
use database_entity::dto::{AFCollabEmbeddedChunk, CollabParams};
use lazy_static::lazy_static;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
@ -47,7 +53,7 @@ lazy_static! {
pub static ref ID_GEN: RwLock<Snowflake> = RwLock::new(Snowflake::new(1));
}
pub async fn test_create_user(
pub async fn create_test_user(
pool: &PgPool,
user_uuid: Uuid,
email: &str,
@ -62,6 +68,43 @@ pub async fn test_create_user(
Ok(TestUser { uid, workspace_id })
}
pub async fn create_test_collab_document(
pg_pool: &PgPool,
uid: &i64,
workspace_id: &Uuid,
doc_id: &Uuid,
) {
let document = default_document_collab_data(&doc_id.to_string()).unwrap();
let params = CollabParams {
object_id: *doc_id,
encoded_collab_v1: Bytes::from(document.encode_to_bytes().unwrap()),
collab_type: CollabType::Document,
};
let mut txn = pg_pool.begin().await.unwrap();
insert_into_af_collab(&mut txn, uid, workspace_id, &params)
.await
.unwrap();
txn.commit().await.unwrap();
}
pub async fn upsert_test_chunks(
pg: &PgPool,
workspace_id: &Uuid,
doc_id: &Uuid,
chunks: Vec<AFCollabEmbeddedChunk>,
) {
let mut txn = pg.begin().await.unwrap();
upsert_collab_embeddings(&mut txn, workspace_id, doc_id, 0, chunks.clone())
.await
.unwrap();
txn.commit().await.unwrap();
}
pub async fn select_all_fragments(pg: &PgPool, object_id: &Uuid) -> Vec<Fragment> {
get_collab_embedding_fragment(pg, object_id).await.unwrap()
}
#[derive(Clone)]
pub struct TestUser {
pub uid: i64,

View file

@ -1,4 +1,4 @@
use crate::sql_test::util::{generate_random_bytes, setup_db, test_create_user};
use crate::sql_test::util::{create_test_user, generate_random_bytes, setup_db};
use collab_entity::CollabType;
use database::collab::{
@ -15,7 +15,7 @@ async fn insert_collab_sql_test(pool: PgPool) {
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
@ -66,7 +66,7 @@ async fn insert_bulk_collab_sql_test(pool: PgPool) {
let user_uuid = uuid::Uuid::new_v4();
let name = user_uuid.to_string();
let email = format!("{}@appflowy.io", name);
let user = test_create_user(&pool, user_uuid, &email, &name)
let user = create_test_user(&pool, user_uuid, &email, &name)
.await
.unwrap();
@ -133,7 +133,7 @@ async fn test_bulk_insert_empty_collab_list(pool: PgPool) {
setup_db(&pool).await.unwrap();
let user_uuid = uuid::Uuid::new_v4();
let user = test_create_user(&pool, user_uuid, "test@appflowy.io", "test_user")
let user = create_test_user(&pool, user_uuid, "test@appflowy.io", "test_user")
.await
.unwrap();
@ -155,7 +155,7 @@ async fn test_bulk_insert_duplicate_oid_partition_key(pool: PgPool) {
setup_db(&pool).await.unwrap();
let user_uuid = uuid::Uuid::new_v4();
let user = test_create_user(&pool, user_uuid, "test@appflowy.io", "test_user")
let user = create_test_user(&pool, user_uuid, "test@appflowy.io", "test_user")
.await
.unwrap();
@ -194,7 +194,7 @@ async fn test_batch_insert_comparison(pool: PgPool) {
setup_db(&pool).await.unwrap();
let user_uuid = uuid::Uuid::new_v4();
let user = test_create_user(&pool, user_uuid, "test@appflowy.io", "test_user")
let user = create_test_user(&pool, user_uuid, "test@appflowy.io", "test_user")
.await
.unwrap();

View file

@ -51,7 +51,8 @@ async fn sign_in_success() {
let is_new = c
.sign_in_password(&registered_user.email, &registered_user.password)
.await
.unwrap();
.unwrap()
.is_new;
assert!(is_new);
assert!(c
.token()
@ -73,7 +74,8 @@ async fn sign_in_success() {
let is_new = c
.sign_in_password(&registered_user.email, &registered_user.password)
.await
.unwrap();
.unwrap()
.is_new;
assert!(!is_new);
// workspaces should be the same