chore: search and chat (#999)

* chore: add test for search and chat

* chore: update test

* chore: update test

* chore: update ci

* chore: fix security audio

* chore: multiple core docker build

* chore: multiple core docker build

* chore: update ci

* chore: update model setting

* chore: test ci

* chore: use tiktoken to calcualte token length

* chore: remove env

* chore: use spawn_blocking with condition

* chore: docs

* chore: clippy

* chore: clippy

* chore: docker logs

* chore: pass message id

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-11-16 14:52:12 +08:00 committed by GitHub
parent d0c212ff10
commit 655f13bc27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 964 additions and 360 deletions

View file

@ -105,6 +105,12 @@ jobs:
# the wasm-pack headless tests will run on random ports, so we need to allow all origins
run: sed -i 's/http:\/\/127\.0\.0\.1:8000/http:\/\/127.0.0.1/g' nginx/nginx.conf
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}
- name: Run Docker-Compose
run: |
export APPFLOWY_WORKER_VERSION=${GITHUB_SHA}
@ -113,29 +119,20 @@ jobs:
docker compose -f docker-compose-ci.yml up -d
docker ps -a
container_id=$(docker ps --filter name=appflowy-cloud-ai-1 -q)
if [ -n "$container_id" ]; then
echo "Displaying logs for the AppFlowy-AI container..."
docker logs "$container_id"
else
echo "No running container found to display logs."
fi
- name: Install prerequisites
run: |
sudo apt-get update
sudo apt-get install protobuf-compiler
sudo apt-get install -y protobuf-compiler
- name: Run Tests
run: |
echo "Running tests for ${{ matrix.test_service }} with flags: ${{ matrix.test_cmd }}"
RUST_LOG="info" DISABLE_CI_TEST_LOG="true" cargo test ${{ matrix.test_cmd }}
- name: Run Tests from main branch
- name: Docker Logs
if: always()
run: |
git fetch origin main
git checkout main
RUST_LOG="info" DISABLE_CI_TEST_LOG="true" cargo test ${{ matrix.test_cmd }}
docker logs appflowy-cloud-ai-1
cleanup:
name: Cleanup Docker Images

View file

@ -95,6 +95,7 @@ jobs:
labels: ${{ steps.meta.outputs.labels }}
provenance: false
build-args: |
PROFILE=release
FEATURES=
- name: Logout from Docker Hub

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT rag_ids\n FROM af_chat\n WHERE chat_id = $1 AND deleted_at IS NULL\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "rag_ids",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false
]
},
"hash": "dbc31936b3e79632f9c8bae449182274d9d75766bd9a5c383b96bd60e9c5c866"
}

47
Cargo.lock generated
View file

@ -747,12 +747,13 @@ dependencies = [
"shared-entity",
"sqlx",
"thiserror",
"tiktoken-rs",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber",
"unicode-segmentation",
"unicode-normalization",
"uuid",
"validator",
"workspace-template",
@ -3341,9 +3342,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
@ -3351,9 +3352,9 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
@ -3379,9 +3380,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-lite"
@ -3398,9 +3399,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
@ -3409,15 +3410,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
@ -3427,9 +3428,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
@ -7170,6 +7171,22 @@ dependencies = [
"weezl",
]
[[package]]
name = "tiktoken-rs"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44075987ee2486402f0808505dd65692163d243a337fc54363d49afac41087f6"
dependencies = [
"anyhow",
"base64 0.21.7",
"bstr",
"fancy-regex 0.13.0",
"lazy_static",
"parking_lot 0.12.3",
"regex",
"rustc-hash",
]
[[package]]
name = "time"
version = "0.3.36"

View file

@ -301,6 +301,11 @@ codegen-units = 1
inherits = "release"
debug = true
[profile.ci]
inherits = "release"
opt-level = 2
lto = false # Disable Link-Time Optimization
[patch.crates-io]
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
# So using patch to workaround this issue.
@ -314,4 +319,5 @@ collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev
[features]
history = []
# Some AI test features are not available for self-hosted AppFlowy Cloud. Therefore, AI testing is disabled by default.
ai-test-enabled = ["client-api-test/ai-test-enabled"]

View file

@ -16,16 +16,19 @@ RUN apt update && apt install -y protobuf-compiler lld clang
# Specify a default value for FEATURES; it could be an empty string if no features are enabled by default
ARG FEATURES=""
ARG PROFILE="release"
COPY --from=planner /app/recipe.json recipe.json
# Build our project dependencies
ENV CARGO_BUILD_JOBS=4
RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
ENV SQLX_OFFLINE true
# Build the project
RUN echo "Building with features: ${FEATURES}"
RUN cargo build --profile=release --features "${FEATURES}" --bin appflowy_cloud
RUN echo "Building with profile: ${PROFILE}, features: ${FEATURES}, "
RUN cargo build --profile=${PROFILE} --features "${FEATURES}" --bin appflowy_cloud
FROM debian:bookworm-slim AS runtime
WORKDIR /app

View file

@ -1,2 +1,2 @@
[advisories]
ignore = ["RUSTSEC-2024-0370"]
ignore = ["RUSTSEC-2024-0370", "RUSTSEC-2024-0384"]

View file

@ -4,6 +4,7 @@ APPFLOWY_DATABASE_URL=postgres://postgres:password@localhost:5432/postgres
APPFLOWY_ACCESS_CONTROL=true
APPFLOWY_WEBSOCKET_MAILBOX_SIZE=6000
APPFLOWY_DATABASE_MAX_CONNECTIONS=40
APPFLOWY_DOCUMENT_CONTENT_SPLIT_LEN=8000
# This file is used to set the environment variables for local development
# Copy this file to .env and change the values as needed

View file

@ -120,6 +120,7 @@ services:
dockerfile: Dockerfile
args:
FEATURES: ""
PROFILE: ci
image: appflowyinc/appflowy_cloud:${APPFLOWY_CLOUD_VERSION:-latest}
admin_frontend:
@ -138,7 +139,7 @@ services:
ai:
restart: on-failure
image: appflowyinc/appflowy_ai:${APPFLOWY_AI_VERSION:-latest}
image: appflowyinc/appflowy_ai_premium:${APPFLOWY_AI_VERSION:-latest}
ports:
- "5001:5001"
environment:
@ -147,6 +148,7 @@ services:
- LOCAL_AI_AWS_SECRET_ACCESS_KEY=${LOCAL_AI_AWS_SECRET_ACCESS_KEY}
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
- APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL}
- APPFLOWY_AI_REDIS_URL=${APPFLOWY_REDIS_URI}
appflowy_worker:
restart: on-failure

View file

@ -3,7 +3,6 @@ pub mod gotrue;
#[cfg(feature = "gotrue_error")]
use crate::gotrue::GoTrueError;
use std::error::Error as StdError;
use std::string::FromUtf8Error;
#[cfg(feature = "appflowy_ai_error")]
@ -92,7 +91,7 @@ pub enum AppError {
#[error("{desc}: {err}")]
SqlxArgEncodingError {
desc: String,
err: Box<dyn StdError + 'static + Send + Sync>,
err: Box<dyn std::error::Error + 'static + Send + Sync>,
},
#[cfg(feature = "validation_error")]

View file

@ -1,8 +1,9 @@
use crate::dto::{
AIModel, ChatAnswer, ChatQuestion, CompleteTextResponse, CompletionType, CreateChatContext,
CustomPrompt, Document, EmbeddingRequest, EmbeddingResponse, LocalAIConfig, MessageData,
RepeatedLocalAIPackage, RepeatedRelatedQuestion, SearchDocumentsRequest, SummarizeRowResponse,
TranslateRowData, TranslateRowResponse,
AIModel, CalculateSimilarityParams, ChatAnswer, ChatQuestion, CompleteTextResponse,
CompletionType, CreateChatContext, CustomPrompt, Document, EmbeddingRequest, EmbeddingResponse,
LocalAIConfig, MessageData, RepeatedLocalAIPackage, RepeatedRelatedQuestion,
SearchDocumentsRequest, SimilarityResponse, SummarizeRowResponse, TranslateRowData,
TranslateRowResponse,
};
use crate::error::AIError;
@ -202,6 +203,7 @@ impl AppFlowyAIClient {
pub async fn send_question(
&self,
chat_id: &str,
question_id: i64,
content: &str,
model: &AIModel,
metadata: Option<Value>,
@ -211,6 +213,8 @@ impl AppFlowyAIClient {
data: MessageData {
content: content.to_string(),
metadata,
rag_ids: vec![],
message_id: Some(question_id.to_string()),
},
};
let url = format!("{}/chat/message", self.url);
@ -230,6 +234,7 @@ impl AppFlowyAIClient {
chat_id: &str,
content: &str,
metadata: Option<Value>,
rag_ids: Vec<String>,
model: &AIModel,
) -> Result<impl Stream<Item = Result<Bytes, AIError>>, AIError> {
let json = ChatQuestion {
@ -237,6 +242,8 @@ impl AppFlowyAIClient {
data: MessageData {
content: content.to_string(),
metadata,
rag_ids,
message_id: None,
},
};
let url = format!("{}/chat/message/stream", self.url);
@ -253,8 +260,10 @@ impl AppFlowyAIClient {
pub async fn stream_question_v2(
&self,
chat_id: &str,
question_id: i64,
content: &str,
metadata: Option<Value>,
rag_ids: Vec<String>,
model: &AIModel,
) -> Result<impl Stream<Item = Result<Bytes, AIError>>, AIError> {
let json = ChatQuestion {
@ -262,6 +271,8 @@ impl AppFlowyAIClient {
data: MessageData {
content: content.to_string(),
metadata,
rag_ids,
message_id: Some(question_id.to_string()),
},
};
let url = format!("{}/v2/chat/message/stream", self.url);
@ -323,6 +334,21 @@ impl AppFlowyAIClient {
.into_data()
}
pub async fn calculate_similarity(
&self,
params: CalculateSimilarityParams,
) -> Result<SimilarityResponse, AIError> {
let url = format!("{}/similarity", self.url);
let resp = self
.http_client(Method::POST, &url)?
.json(&params)
.send()
.await?;
AIResponse::<SimilarityResponse>::from_response(resp)
.await?
.into_data()
}
fn http_client(&self, method: Method, url: &str) -> Result<RequestBuilder, AIError> {
let request_builder = self.client.request(method, url);
Ok(request_builder)

View file

@ -23,6 +23,10 @@ pub struct MessageData {
pub content: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
#[serde(default)]
pub rag_ids: Vec<String>,
#[serde(default)]
pub message_id: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -182,7 +186,7 @@ pub struct EmbeddingRequest {
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum EmbeddingsModel {
pub enum EmbeddingModel {
#[serde(rename = "text-embedding-3-small")]
TextEmbedding3Small,
#[serde(rename = "text-embedding-3-large")]
@ -191,12 +195,55 @@ pub enum EmbeddingsModel {
TextEmbeddingAda002,
}
impl Display for EmbeddingsModel {
impl EmbeddingModel {
pub fn supported_models() -> &'static [&'static str] {
&[
"text-embedding-ada-002",
"text-embedding-3-small",
"text-embedding-3-large",
]
}
pub fn max_token(&self) -> usize {
match self {
EmbeddingModel::TextEmbeddingAda002 => 8191,
EmbeddingModel::TextEmbedding3Large => 8191,
EmbeddingModel::TextEmbedding3Small => 8191,
}
}
pub fn default_dimensions(&self) -> i32 {
match self {
EmbeddingModel::TextEmbeddingAda002 => 1536,
EmbeddingModel::TextEmbedding3Large => 3072,
EmbeddingModel::TextEmbedding3Small => 1536,
}
}
pub fn name(&self) -> &'static str {
match self {
EmbeddingModel::TextEmbeddingAda002 => "text-embedding-ada-002",
EmbeddingModel::TextEmbedding3Large => "text-embedding-3-large",
EmbeddingModel::TextEmbedding3Small => "text-embedding-3-small",
}
}
pub fn from_name(name: &str) -> Option<Self> {
match name {
"text-embedding-ada-002" => Some(EmbeddingModel::TextEmbeddingAda002),
"text-embedding-3-large" => Some(EmbeddingModel::TextEmbedding3Large),
"text-embedding-3-small" => Some(EmbeddingModel::TextEmbedding3Small),
_ => None,
}
}
}
impl Display for EmbeddingModel {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
EmbeddingsModel::TextEmbedding3Small => write!(f, "text-embedding-3-small"),
EmbeddingsModel::TextEmbedding3Large => write!(f, "text-embedding-3-large"),
EmbeddingsModel::TextEmbeddingAda002 => write!(f, "text-embedding-ada-002"),
EmbeddingModel::TextEmbedding3Small => write!(f, "text-embedding-3-small"),
EmbeddingModel::TextEmbedding3Large => write!(f, "text-embedding-3-large"),
EmbeddingModel::TextEmbeddingAda002 => write!(f, "text-embedding-ada-002"),
}
}
}
@ -320,3 +367,15 @@ pub struct CustomPrompt {
pub system: String,
pub user: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CalculateSimilarityParams {
pub workspace_id: String,
pub input: String,
pub expected: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SimilarityResponse {
pub score: f64,
}

View file

@ -14,7 +14,7 @@ async fn create_chat_context_test() {
};
client.create_chat_text_context(context).await.unwrap();
let resp = client
.send_question(&chat_id, "Where I live?", &AIModel::GPT4oMini, None)
.send_question(&chat_id, 1, "Where I live?", &AIModel::GPT4oMini, None)
.await
.unwrap();
// response will be something like:

View file

@ -1,7 +1,7 @@
use crate::appflowy_ai_client;
use appflowy_ai_client::dto::{
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingRequest, EmbeddingsModel,
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingModel, EmbeddingRequest,
};
#[tokio::test]
@ -9,10 +9,10 @@ async fn embedding_test() {
let client = appflowy_ai_client();
let request = EmbeddingRequest {
input: EmbeddingInput::String("hello world".to_string()),
model: EmbeddingsModel::TextEmbedding3Small.to_string(),
model: EmbeddingModel::TextEmbedding3Small.to_string(),
chunk_size: 1000,
encoding_format: EmbeddingEncodingFormat::Float,
dimensions: 1536,
dimensions: EmbeddingModel::TextEmbedding3Small.default_dimensions(),
};
let result = client.embeddings(request).await.unwrap();
assert!(result.total_tokens > 0);

View file

@ -11,7 +11,7 @@ async fn qa_test() {
client.health_check().await.unwrap();
let chat_id = uuid::Uuid::new_v4().to_string();
let resp = client
.send_question(&chat_id, "I feel hungry", &AIModel::GPT4o, None)
.send_question(&chat_id, 1, "I feel hungry", &AIModel::GPT4o, None)
.await
.unwrap();
assert!(!resp.content.is_empty());
@ -30,7 +30,7 @@ async fn stop_stream_test() {
client.health_check().await.unwrap();
let chat_id = uuid::Uuid::new_v4().to_string();
let mut stream = client
.stream_question(&chat_id, "I feel hungry", None, &AIModel::GPT4oMini)
.stream_question(&chat_id, "I feel hungry", None, vec![], &AIModel::GPT4oMini)
.await
.unwrap();
@ -52,7 +52,14 @@ async fn stream_test() {
client.health_check().await.expect("Health check failed");
let chat_id = uuid::Uuid::new_v4().to_string();
let stream = client
.stream_question_v2(&chat_id, "I feel hungry", None, &AIModel::GPT4oMini)
.stream_question_v2(
&chat_id,
1,
"I feel hungry",
None,
vec![],
&AIModel::GPT4oMini,
)
.await
.expect("Failed to initiate question stream");

View file

@ -31,7 +31,10 @@ use uuid::Uuid;
#[cfg(feature = "collab-sync")]
use client_api::collab_sync::{SinkConfig, SyncObject, SyncPlugin};
use client_api::entity::id::user_awareness_object_id;
use client_api::entity::{PublishCollabItem, PublishCollabMetadata, QueryWorkspaceMember};
use client_api::entity::{
PublishCollabItem, PublishCollabMetadata, QueryWorkspaceMember, QuestionStream,
QuestionStreamValue,
};
use client_api::ws::{WSClient, WSClientConfig};
use database_entity::dto::{
AFAccessLevel, AFRole, AFSnapshotMeta, AFSnapshotMetas, AFUserProfile, AFUserWorkspaceInfo,
@ -845,24 +848,21 @@ impl TestClient {
#[allow(unused_variables)]
pub async fn create_collab_with_data(
&mut self,
object_id: String,
workspace_id: &str,
object_id: &str,
collab_type: CollabType,
encoded_collab_v1: Option<EncodedCollab>,
encoded_collab_v1: EncodedCollab,
) -> Result<(), AppResponseError> {
// Subscribe to object
let origin = CollabOrigin::Client(CollabClient::new(self.uid().await, self.device_id.clone()));
let collab = match encoded_collab_v1 {
None => Collab::new_with_origin(origin.clone(), &object_id, vec![], false),
Some(data) => Collab::new_with_source(
origin.clone(),
&object_id,
DataSource::DocStateV1(data.doc_state.to_vec()),
vec![],
false,
)
.unwrap(),
};
let collab = Collab::new_with_source(
origin.clone(),
object_id,
DataSource::DocStateV1(encoded_collab_v1.doc_state.to_vec()),
vec![],
false,
)
.unwrap();
let encoded_collab_v1 = collab
.encode_collab_v1(|collab| collab_type.validate_require_data(collab))
@ -873,7 +873,7 @@ impl TestClient {
self
.api_client
.create_collab(CreateCollabParams {
object_id: object_id.clone(),
object_id: object_id.to_string(),
encoded_collab_v1,
collab_type: collab_type.clone(),
workspace_id: workspace_id.to_string(),
@ -1167,3 +1167,16 @@ pub async fn get_collab_json_from_server(
.unwrap()
.to_json_value()
}
pub async fn collect_answer(mut stream: QuestionStream) -> String {
let mut answer = String::new();
while let Some(value) = stream.next().await {
match value.unwrap() {
QuestionStreamValue::Answer { value } => {
answer.push_str(&value);
},
QuestionStreamValue::Metadata { .. } => {},
}
}
answer
}

View file

@ -9,7 +9,10 @@ use futures_core::{ready, Stream};
use pin_project::pin_project;
use reqwest::Method;
use serde_json::Value;
use shared_entity::dto::ai_dto::{RepeatedRelatedQuestion, STREAM_ANSWER_KEY, STREAM_METADATA_KEY};
use shared_entity::dto::ai_dto::{
CalculateSimilarityParams, RepeatedRelatedQuestion, SimilarityResponse, STREAM_ANSWER_KEY,
STREAM_METADATA_KEY,
};
use shared_entity::response::{AppResponse, AppResponseError};
use std::pin::Pin;
use std::task::{Context, Poll};
@ -215,6 +218,26 @@ impl Client {
.await?
.into_data()
}
pub async fn calculate_similarity(
&self,
params: CalculateSimilarityParams,
) -> Result<SimilarityResponse, AppResponseError> {
let url = format!(
"{}/api/ai/{}/calculate_similarity",
self.base_url, &params.workspace_id
);
let resp = self
.http_client_with_auth(Method::POST, &url)
.await?
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<SimilarityResponse>::from_response(resp)
.await?
.into_data()
}
}
#[pin_project]

View file

@ -33,7 +33,6 @@ pub async fn insert_chat(
)));
}
let rag_ids = json!(params.rag_ids);
sqlx::query!(
r#"
INSERT INTO af_chat (chat_id, name, workspace_id, rag_ids)
@ -145,6 +144,25 @@ pub async fn select_chat<'a, E: Executor<'a, Database = Postgres>>(
}
}
pub async fn select_chat_rag_ids<'a, E: Executor<'a, Database = Postgres>>(
executor: E,
chat_id: &str,
) -> Result<Vec<String>, AppError> {
let chat_id = Uuid::from_str(chat_id)?;
let row = sqlx::query!(
r#"
SELECT rag_ids
FROM af_chat
WHERE chat_id = $1 AND deleted_at IS NULL
"#,
&chat_id,
)
.fetch_one(executor)
.await?;
let rag_ids = serde_json::from_value::<Vec<String>>(row.rag_ids).unwrap_or_default();
Ok(rag_ids)
}
pub async fn insert_answer_message_with_transaction(
transaction: &mut Transaction<'_, Postgres>,
author: ChatAuthor,

View file

@ -87,8 +87,9 @@ lazy_static = "1.4.0"
itertools = "0.12.0"
validator = "0.16.1"
rayon.workspace = true
unicode-segmentation = "1.9.0"
tiktoken-rs = "0.6.0"
[dev-dependencies]
rand = "0.8.5"
workspace-template.workspace = true
unicode-normalization = "0.1.24"

View file

@ -134,7 +134,7 @@ where
let lock = collab.read().await;
if let Some(indexer) = &self.indexer {
match indexer.embedding_params(&lock) {
match indexer.embedding_params(&lock).await {
Ok(embedding_params) => {
drop(lock); // we no longer need the lock
match indexer.embeddings(embedding_params).await {

View file

@ -4,36 +4,44 @@ use anyhow::anyhow;
use async_trait::async_trait;
use collab::preclude::Collab;
use crate::indexer::{DocumentDataExt, Indexer};
use app_error::AppError;
use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_ai_client::dto::{
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingRequest, EmbeddingsModel,
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingModel, EmbeddingOutput, EmbeddingRequest,
};
use collab_document::document::DocumentBody;
use collab_document::error::DocumentError;
use collab_entity::CollabType;
use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, EmbeddingContentType};
use unicode_segmentation::UnicodeSegmentation;
use uuid::Uuid;
use crate::indexer::{DocumentDataExt, Indexer};
use tiktoken_rs::CoreBPE;
use tracing::trace;
use uuid::Uuid;
pub struct DocumentIndexer {
ai_client: AppFlowyAIClient,
tokenizer: Arc<CoreBPE>,
embedding_model: EmbeddingModel,
}
impl DocumentIndexer {
/// We assume that every token is ~4 bytes. We're going to split document content into fragments
/// of ~2000 tokens each.
pub const DOC_CONTENT_SPLIT: usize = 8000;
pub fn new(ai_client: AppFlowyAIClient) -> Arc<Self> {
Arc::new(Self { ai_client })
let tokenizer = tiktoken_rs::cl100k_base().unwrap();
Arc::new(Self {
ai_client,
tokenizer: Arc::new(tokenizer),
embedding_model: EmbeddingModel::TextEmbedding3Small,
})
}
}
#[async_trait]
impl Indexer for DocumentIndexer {
fn embedding_params(&self, collab: &Collab) -> Result<Vec<AFCollabEmbeddingParams>, AppError> {
async fn embedding_params(
&self,
collab: &Collab,
) -> Result<Vec<AFCollabEmbeddingParams>, AppError> {
let object_id = collab.object_id().to_string();
let document = DocumentBody::from_collab(collab).ok_or_else(|| {
anyhow!(
@ -46,12 +54,15 @@ impl Indexer for DocumentIndexer {
match result {
Ok(document_data) => {
let content = document_data.to_plain_text();
create_embedding_params(
let max_tokens = self.embedding_model.default_dimensions() as usize;
create_embedding(
object_id,
content,
CollabType::Document,
Self::DOC_CONTENT_SPLIT,
max_tokens,
self.tokenizer.clone(),
)
.await
},
Err(err) => {
if matches!(err, DocumentError::NoRequiredData) {
@ -80,12 +91,17 @@ impl Indexer for DocumentIndexer {
.ai_client
.embeddings(EmbeddingRequest {
input: EmbeddingInput::StringArray(contents),
model: EmbeddingsModel::TextEmbedding3Small.to_string(),
chunk_size: (Self::DOC_CONTENT_SPLIT / 4) as i32,
model: EmbeddingModel::TextEmbedding3Small.to_string(),
chunk_size: 2000,
encoding_format: EmbeddingEncodingFormat::Float,
dimensions: 1536,
dimensions: EmbeddingModel::TextEmbedding3Small.default_dimensions(),
})
.await?;
trace!(
"[Embedding] request {} embeddings, received {} embeddings",
params.len(),
resp.data.len()
);
for embedding in resp.data {
let param = &mut params[embedding.index as usize];
@ -112,335 +128,322 @@ impl Indexer for DocumentIndexer {
}))
}
}
#[inline]
fn create_embedding_params(
/// ## Execution Time Comparison Results
///
/// The following results were observed when running `execution_time_comparison_tests`:
///
/// | Content Size (chars) | Direct Time (ms) | spawn_blocking Time (ms) |
/// |-----------------------|------------------|--------------------------|
/// | 500 | 1 | 1 |
/// | 1000 | 2 | 2 |
/// | 2000 | 5 | 5 |
/// | 5000 | 11 | 11 |
/// | 20000 | 49 | 48 |
///
/// ## Guidelines for Using `spawn_blocking`
///
/// - **Short Tasks (< 1 ms)**:
/// Use direct execution on the async runtime. The minimal execution time has negligible impact.
///
/// - **Moderate Tasks (110 ms)**:
/// - For infrequent or low-concurrency tasks, direct execution is acceptable.
/// - For frequent or high-concurrency tasks, consider using `spawn_blocking` to avoid delays.
///
/// - **Long Tasks (> 10 ms)**:
/// Always offload to a blocking thread with `spawn_blocking` to maintain runtime efficiency and responsiveness.
///
/// Related blog:
/// https://tokio.rs/blog/2020-04-preemption
/// https://ryhl.io/blog/async-what-is-blocking/
async fn create_embedding(
object_id: String,
content: String,
collab_type: CollabType,
max_content_len: usize,
max_tokens: usize,
tokenizer: Arc<CoreBPE>,
) -> Result<Vec<AFCollabEmbeddingParams>, AppError> {
let split_contents = if content.len() < 500 {
split_text_by_max_tokens(content, max_tokens, tokenizer.as_ref())?
} else {
tokio::task::spawn_blocking(move || {
split_text_by_max_tokens(content, max_tokens, tokenizer.as_ref())
})
.await??
};
Ok(
split_contents
.into_iter()
.map(|content| AFCollabEmbeddingParams {
fragment_id: Uuid::new_v4().to_string(),
object_id: object_id.clone(),
collab_type: collab_type.clone(),
content_type: EmbeddingContentType::PlainText,
content,
embedding: None,
})
.collect(),
)
}
fn split_text_by_max_tokens(
content: String,
max_tokens: usize,
tokenizer: &CoreBPE,
) -> Result<Vec<String>, AppError> {
if content.is_empty() {
return Ok(vec![]);
}
// Helper function to create AFCollabEmbeddingParams
fn create_param(
fragment_id: String,
object_id: &str,
collab_type: &CollabType,
content: String,
) -> AFCollabEmbeddingParams {
AFCollabEmbeddingParams {
fragment_id,
object_id: object_id.to_string(),
collab_type: collab_type.clone(),
content_type: EmbeddingContentType::PlainText,
content,
embedding: None,
}
let token_ids = tokenizer.encode_ordinary(&content);
let total_tokens = token_ids.len();
if total_tokens <= max_tokens {
return Ok(vec![content]);
}
if content.len() <= max_content_len {
// Content is short enough; return as a single fragment
let param = create_param(object_id.clone(), &object_id, &collab_type, content);
return Ok(vec![param]);
}
// Content is longer than max_content_len; need to split
let mut result = Vec::with_capacity(1 + content.len() / max_content_len);
let mut fragment = String::with_capacity(max_content_len);
let mut current_len = 0;
for grapheme in content.graphemes(true) {
let grapheme_len = grapheme.len();
if current_len + grapheme_len > max_content_len {
if !fragment.is_empty() {
// Move the fragment to avoid cloning
result.push(create_param(
Uuid::new_v4().to_string(),
&object_id,
&collab_type,
std::mem::take(&mut fragment),
));
}
current_len = 0;
// Check if the grapheme itself is longer than max_content_len
if grapheme_len > max_content_len {
// Push the grapheme as a fragment on its own
result.push(create_param(
Uuid::new_v4().to_string(),
&object_id,
&collab_type,
grapheme.to_string(),
));
continue;
let mut chunks = Vec::new();
let mut start_idx = 0;
while start_idx < total_tokens {
let mut end_idx = (start_idx + max_tokens).min(total_tokens);
let mut decoded = false;
// Try to decode the chunk, adjust end_idx if decoding fails
while !decoded {
let token_chunk = &token_ids[start_idx..end_idx];
// Attempt to decode the current chunk
match tokenizer.decode(token_chunk.to_vec()) {
Ok(chunk_text) => {
chunks.push(chunk_text);
start_idx = end_idx;
decoded = true;
},
Err(_) => {
// If we can extend the chunk, do so
if end_idx < total_tokens {
end_idx += 1;
} else if start_idx + 1 < total_tokens {
// Skip the problematic token at start_idx
start_idx += 1;
end_idx = (start_idx + max_tokens).min(total_tokens);
} else {
// Cannot decode any further, break to avoid infinite loop
start_idx = total_tokens;
break;
}
},
}
}
fragment.push_str(grapheme);
current_len += grapheme_len;
}
// Add the last fragment if it's not empty
if !fragment.is_empty() {
result.push(create_param(
object_id.clone(),
&object_id,
&collab_type,
fragment,
));
}
Ok(result)
Ok(chunks)
}
#[cfg(test)]
mod tests {
use crate::indexer::document_indexer::create_embedding_params;
use collab_entity::CollabType;
use crate::indexer::document_indexer::split_text_by_max_tokens;
use tiktoken_rs::cl100k_base;
#[test]
fn test_split_at_non_utf8() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 10; // Small number for testing
let max_tokens = 10; // Small number for testing
// Content with multibyte characters (emojis)
let content = "Hello 😃 World 🌍! This is a test 🚀.".to_string();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
// Ensure that we didn't split in the middle of a multibyte character
for param in params {
assert!(param.content.is_char_boundary(0));
assert!(param.content.is_char_boundary(param.content.len()));
for content in params {
assert!(content.is_char_boundary(0));
assert!(content.is_char_boundary(content.len()));
}
}
#[test]
fn test_exact_boundary_split() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 5; // Set to 5 for testing
let max_tokens = 5; // Set to 5 tokens for testing
let content = "The quick brown fox jumps over the lazy dog".to_string();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
// Content length is exactly a multiple of max_content_len
let content = "abcdefghij".to_string(); // 10 characters
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
assert_eq!(params.len(), 2);
assert_eq!(params[0].content, "abcde");
assert_eq!(params[1].content, "fghij");
let total_tokens = tokenizer.encode_ordinary(&content).len();
let expected_fragments = (total_tokens + max_tokens - 1) / max_tokens;
assert_eq!(params.len(), expected_fragments);
}
#[test]
fn test_content_shorter_than_max_len() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 100;
let max_tokens = 100;
let content = "Short content".to_string();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
assert_eq!(params.len(), 1);
assert_eq!(params[0].content, content);
assert_eq!(params[0], content);
}
#[test]
fn test_empty_content() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 10;
let max_tokens = 10;
let content = "".to_string();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
assert_eq!(params.len(), 0);
}
#[test]
fn test_content_with_only_multibyte_characters() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 4; // Small number for testing
// Each emoji is 4 bytes in UTF-8
let max_tokens = 1; // Set to 1 token for testing
let content = "😀😃😄😁😆".to_string();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
assert_eq!(params.len(), 5);
let expected_contents = vec!["😀", "😃", "😄", "😁", "😆"];
for (param, expected) in params.iter().zip(expected_contents.iter()) {
assert_eq!(param.content, *expected);
let emojis: Vec<String> = content.chars().map(|c| c.to_string()).collect();
for (param, emoji) in params.iter().zip(emojis.iter()) {
assert_eq!(param, emoji);
}
}
#[test]
fn test_split_with_combining_characters() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 5; // Small number for testing
// String with combining characters (e.g., letters with accents)
let max_tokens = 1; // Set to 1 token for testing
let content = "a\u{0301}e\u{0301}i\u{0301}o\u{0301}u\u{0301}".to_string(); // "áéíóú"
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
let total_tokens = tokenizer.encode_ordinary(&content).len();
assert_eq!(params.len(), total_tokens);
assert_eq!(params.len(), 5);
let expected_contents = vec!["", "", "", "", ""];
for (param, expected) in params.iter().zip(expected_contents.iter()) {
assert_eq!(param.content, *expected);
}
let reconstructed_content = params.join("");
assert_eq!(reconstructed_content, content);
}
#[test]
fn test_large_content() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 1000;
// Generate a large content string
let max_tokens = 1000;
let content = "a".repeat(5000); // 5000 characters
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
assert_eq!(params.len(), 5); // 5000 / 1000 = 5
for param in params {
assert_eq!(param.content.len(), 1000);
}
let total_tokens = tokenizer.encode_ordinary(&content).len();
let expected_fragments = (total_tokens + max_tokens - 1) / max_tokens;
assert_eq!(params.len(), expected_fragments);
}
#[test]
fn test_non_ascii_characters() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 5;
// Non-ASCII characters: "áéíóú"
let max_tokens = 2;
let content = "áéíóú".to_string();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
let total_tokens = tokenizer.encode_ordinary(&content).len();
let expected_fragments = (total_tokens + max_tokens - 1) / max_tokens;
assert_eq!(params.len(), expected_fragments);
// Content should be split into two fragments
assert_eq!(params.len(), 3);
assert_eq!(params[0].content, "áé");
assert_eq!(params[1].content, "íó");
assert_eq!(params[2].content, "ú");
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
#[test]
fn test_content_with_leading_and_trailing_whitespace() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 5;
let max_tokens = 3;
let content = " abcde ".to_string();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
let total_tokens = tokenizer.encode_ordinary(&content).len();
let expected_fragments = (total_tokens + max_tokens - 1) / max_tokens;
assert_eq!(params.len(), expected_fragments);
// Content should include leading and trailing whitespace
assert_eq!(params.len(), 2);
assert_eq!(params[0].content, " abc");
assert_eq!(params[1].content, "de ");
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
#[test]
fn test_content_with_multiple_zero_width_joiners() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 10;
// Complex emoji sequence with multiple zero-width joiners
let max_tokens = 1;
let content = "👩‍👩‍👧‍👧👨‍👨‍👦‍👦".to_string();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
// Each complex emoji should be treated as a single grapheme
assert_eq!(params.len(), 2);
assert_eq!(params[0].content, "👩‍👩‍👧‍👧");
assert_eq!(params[1].content, "👨‍👨‍👦‍👦");
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
#[test]
fn test_content_with_long_combining_sequences() {
let object_id = "test_object".to_string();
let collab_type = CollabType::Document;
let max_content_len = 5;
let max_tokens = 1;
let content = "a\u{0300}\u{0301}\u{0302}\u{0303}\u{0304}".to_string();
let tokenizer = cl100k_base().unwrap();
let params = split_text_by_max_tokens(content.clone(), max_tokens, &tokenizer).unwrap();
// Character with multiple combining marks
let content = "a\u{0300}\u{0301}\u{0302}\u{0303}\u{0304}".to_string(); // a with multiple accents
let params = create_embedding_params(
object_id.clone(),
content.clone(),
collab_type.clone(),
max_content_len,
)
.unwrap();
// The entire combining sequence should be in one fragment
assert_eq!(params.len(), 1);
assert_eq!(params[0].content, content);
let reconstructed_content: String = params.concat();
assert_eq!(reconstructed_content, content);
}
}
// #[cfg(test)]
// mod execution_time_comparison_tests {
// use crate::indexer::document_indexer::split_text_by_max_tokens;
// use rand::distributions::Alphanumeric;
// use rand::{thread_rng, Rng};
// use std::sync::Arc;
// use std::time::Instant;
// use tiktoken_rs::{cl100k_base, CoreBPE};
//
// #[tokio::test]
// async fn test_execution_time_comparison() {
// let tokenizer = Arc::new(cl100k_base().unwrap());
// let max_tokens = 100;
//
// let sizes = vec![500, 1000, 2000, 5000, 20000]; // Content sizes to test
// for size in sizes {
// let content = generate_random_string(size);
//
// // Measure direct execution time
// let direct_time = measure_direct_execution(content.clone(), max_tokens, &tokenizer);
//
// // Measure spawn_blocking execution time
// let spawn_blocking_time =
// measure_spawn_blocking_execution(content, max_tokens, Arc::clone(&tokenizer)).await;
//
// println!(
// "Content Size: {} | Direct Time: {}ms | spawn_blocking Time: {}ms",
// size, direct_time, spawn_blocking_time
// );
// }
// }
//
// // Measure direct execution time
// fn measure_direct_execution(content: String, max_tokens: usize, tokenizer: &CoreBPE) -> u128 {
// let start = Instant::now();
// split_text_by_max_tokens(content, max_tokens, tokenizer).unwrap();
// start.elapsed().as_millis()
// }
//
// // Measure `spawn_blocking` execution time
// async fn measure_spawn_blocking_execution(
// content: String,
// max_tokens: usize,
// tokenizer: Arc<CoreBPE>,
// ) -> u128 {
// let start = Instant::now();
// tokio::task::spawn_blocking(move || {
// split_text_by_max_tokens(content, max_tokens, tokenizer.as_ref()).unwrap()
// })
// .await
// .unwrap();
// start.elapsed().as_millis()
// }
//
// pub fn generate_random_string(len: usize) -> String {
// let rng = thread_rng();
// rng
// .sample_iter(&Alphanumeric)
// .take(len)
// .map(char::from)
// .collect()
// }
// }

View file

@ -1,7 +1,6 @@
mod document_indexer;
mod ext;
mod provider;
pub use document_indexer::DocumentIndexer;
pub use ext::DocumentDataExt;
pub use provider::*;

View file

@ -26,7 +26,10 @@ use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, CollabPa
#[async_trait]
pub trait Indexer: Send + Sync {
fn embedding_params(&self, collab: &Collab) -> Result<Vec<AFCollabEmbeddingParams>, AppError>;
async fn embedding_params(
&self,
collab: &Collab,
) -> Result<Vec<AFCollabEmbeddingParams>, AppError>;
async fn embeddings(
&self,
@ -46,7 +49,7 @@ pub trait Indexer: Send + Sync {
false,
)
.map_err(|err| AppError::Internal(err.into()))?;
let embedding_params = self.embedding_params(&collab)?;
let embedding_params = self.embedding_params(&collab).await?;
self.embeddings(embedding_params).await
}
}

View file

@ -5,7 +5,8 @@ use actix_web::web::{Data, Json};
use actix_web::{web, HttpRequest, HttpResponse, Scope};
use app_error::AppError;
use appflowy_ai_client::dto::{
CompleteTextResponse, LocalAIConfig, TranslateRowParams, TranslateRowResponse,
CalculateSimilarityParams, CompleteTextResponse, LocalAIConfig, SimilarityResponse,
TranslateRowParams, TranslateRowResponse,
};
use futures_util::{stream, TryStreamExt};
@ -25,6 +26,9 @@ pub fn ai_completion_scope() -> Scope {
.service(web::resource("/summarize_row").route(web::post().to(summarize_row_handler)))
.service(web::resource("/translate_row").route(web::post().to(translate_row_handler)))
.service(web::resource("/local/config").route(web::get().to(local_ai_config_handler)))
.service(
web::resource("/calculate_similarity").route(web::post().to(calculate_similarity_handler)),
)
}
async fn complete_text_handler(
@ -163,3 +167,18 @@ async fn local_ai_config_handler(
.map_err(|err| AppError::AIServiceUnavailable(err.to_string()))?;
Ok(AppResponse::Ok().with_data(config).into())
}
#[instrument(level = "debug", skip_all, err)]
async fn calculate_similarity_handler(
state: web::Data<AppState>,
payload: web::Json<CalculateSimilarityParams>,
) -> actix_web::Result<Json<AppResponse<SimilarityResponse>>> {
let params = payload.into_inner();
let response = state
.ai_client
.calculate_similarity(params)
.await
.map_err(|err| AppError::AIServiceUnavailable(err.to_string()))?;
Ok(AppResponse::Ok().with_data(response).into())
}

View file

@ -95,7 +95,6 @@ async fn create_chat_handler(
) -> actix_web::Result<JsonAppResponse<()>> {
let workspace_id = path.into_inner();
let params = payload.into_inner();
trace!("create new chat: {:?}", params);
create_chat(&state.pg_pool, params, &workspace_id).await?;
Ok(AppResponse::Ok().into())
}
@ -242,10 +241,11 @@ async fn answer_stream_handler(
let (_workspace_id, chat_id, question_id) = path.into_inner();
let (content, metadata) =
chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?;
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?;
let ai_model = ai_model_from_header(&req);
match state
.ai_client
.stream_question(&chat_id, &content, Some(metadata), &ai_model)
.stream_question(&chat_id, &content, Some(metadata), rag_ids, &ai_model)
.await
{
Ok(answer_stream) => {
@ -275,10 +275,25 @@ async fn answer_stream_v2_handler(
let (_workspace_id, chat_id, question_id) = path.into_inner();
let (content, metadata) =
chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?;
let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?;
let ai_model = ai_model_from_header(&req);
trace!(
"[Chat] stream answer for chat: {}, question: {}, rag_ids: {:?}",
chat_id,
content,
rag_ids
);
match state
.ai_client
.stream_question_v2(&chat_id, &content, Some(metadata), &ai_model)
.stream_question_v2(
&chat_id,
question_id,
&content,
Some(metadata),
rag_ids,
&ai_model,
)
.await
{
Ok(answer_stream) => {

View file

@ -17,7 +17,7 @@ use shared_entity::dto::chat_dto::{
CreateChatParams, GetChatMessageParams, RepeatedChatMessage, UpdateChatMessageContentParams,
};
use sqlx::PgPool;
use tracing::{error, info};
use tracing::{error, info, trace};
use appflowy_ai_client::dto::AIModel;
use validator::Validate;
@ -28,6 +28,7 @@ pub(crate) async fn create_chat(
workspace_id: &str,
) -> Result<(), AppError> {
params.validate()?;
trace!("[Chat] create chat {:?}", params);
let mut txn = pg_pool.begin().await?;
insert_chat(&mut txn, workspace_id, params).await?;
@ -60,7 +61,13 @@ pub async fn update_chat_message(
// TODO(nathan): query the metadata from the database
let new_answer = ai_client
.send_question(&params.chat_id, &params.content, &ai_model, None)
.send_question(
&params.chat_id,
params.message_id,
&params.content,
&ai_model,
None,
)
.await?;
let _answer = insert_answer_message(
pg_pool,
@ -85,7 +92,13 @@ pub async fn generate_chat_message_answer(
let (content, metadata) =
chat::chat_ops::select_chat_message_content(pg_pool, question_message_id).await?;
let new_answer = ai_client
.send_question(chat_id, &content, &ai_model, Some(metadata))
.send_question(
chat_id,
question_message_id,
&content,
&ai_model,
Some(metadata),
)
.await?;
info!("new_answer: {:?}", new_answer);
@ -174,7 +187,7 @@ pub async fn create_chat_message_stream(
match params.message_type {
ChatMessageType::System => {}
ChatMessageType::User => {
let answer = match ai_client.send_question(&chat_id, &params.content, &ai_model, Some(json!(params.metadata))).await {
let answer = match ai_client.send_question(&chat_id,question_id, &params.content, &ai_model, Some(json!(params.metadata))).await {
Ok(response) => response,
Err(err) => {
error!("Failed to send question to AI: {}", err);

View file

@ -2,7 +2,7 @@ use crate::api::metrics::RequestMetrics;
use app_error::ErrorCode;
use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_ai_client::dto::{
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingRequest, EmbeddingsModel,
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingModel, EmbeddingOutput, EmbeddingRequest,
};
use database::index::{search_documents, SearchDocumentParams};
@ -25,10 +25,10 @@ pub async fn search_document(
let embeddings = ai_client
.embeddings(EmbeddingRequest {
input: EmbeddingInput::String(request.query.clone()),
model: EmbeddingsModel::TextEmbedding3Small.to_string(),
model: EmbeddingModel::TextEmbedding3Small.to_string(),
chunk_size: 500,
encoding_format: EmbeddingEncodingFormat::Float,
dimensions: 1536,
dimensions: EmbeddingModel::TextEmbedding3Small.default_dimensions(),
})
.await
.map_err(|e| AppResponseError::new(ErrorCode::Internal, e.to_string()))?;
@ -64,7 +64,7 @@ pub async fn search_document(
user_id: uid,
workspace_id,
limit: request.limit.unwrap_or(10) as i32,
preview: request.preview_size.unwrap_or(180) as i32,
preview: request.preview_size.unwrap_or(500) as i32,
embedding,
},
total_tokens,

View file

@ -37,24 +37,6 @@ async fn get_collab_response_compatible_test() {
assert_eq!(collab_resp.encode_collab, encode_collab);
}
#[tokio::test]
#[should_panic]
async fn create_collab_workspace_id_equal_to_object_id_test() {
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
// Only the object with [CollabType::Folder] can have the same object_id as workspace_id. But
// it should use create workspace API
test_client
.create_collab_with_data(
workspace_id.clone(),
&workspace_id,
CollabType::Unknown,
None,
)
.await
.unwrap()
}
#[tokio::test]
async fn batch_insert_collab_with_empty_payload_test() {
let mut test_client = TestClient::new_user().await;

View file

@ -0,0 +1,54 @@
# AppFlowy Values
## Mission Driven
- Our mission is to enable everyone to unleash the potential and achieve more with secure workplace tools.
- We are true believers in open source—a fundamentally superior approach to achieve the mission.
- We actively lead and support the AppFlowy open-source community, where a diverse group of people is empowered to
contribute to the common good.
- We think strategically, make wise decisions, and act accordingly, with an eye toward whats sustainable in the long
run and not whats convenient in the moment.
## Aim High and Iterate
1. We strive for excellence with a growth mindset.
2. We dream big, start small, and move fast.
3. We take smaller steps and ship smaller, simpler features.
4. We dont wait, but instead iterate and work as part of the community.
5. We focus on results over process and prioritize progress over perfection.
## Transparency
1. We make information about AppFlowy public by default unless there is a compelling reason not to.
2. We are straightforward and kind with ourselves and each other.
- We surface issues constructively and proactively.
- We say “why” and provide sufficient context for our actions rather than just disclosing the “what.”
## Collaboration
> We pride ourselves on being a great team.
>
> We foster collaboration, value diversity and inclusion, and encourage sharing.
>
> We thrive as individuals within the context of our team and succeed together.
>
> We play very effectively with people of diverse backgrounds and cultures.
>
> We make time to help each other in pursuit of our common goals.
>
Honesty
We are honest with ourselves.
We admit mistakes freely and openly.
We provide candid, helpful, timely feedback to colleagues with respect, regardless of their status or whether they
disagree with us.
We are vulnerable in search of truth and dont defend our point to just win over others.

View file

@ -0,0 +1,54 @@
Kathryns Journey to Becoming a Tennis Player
Kathryns love for tennis began on a warm summer day when she was eight years old. She stumbled across a local park
where players were volleying back and forth. The sound of the ball hitting the racket and the sheer energy of the game
captivated her. That evening, she begged her parents for a tennis racket, and the very next weekend, she was on the
court for the first time.
Learning the Basics
Kathryns first lessons were clumsy but full of enthusiasm. She struggled with her serves, missed easy shots, and often
hit the ball over the fence. But every mistake made her more determined to improve. Her first coach, Mr. Evans, taught
her the fundamentals—how to grip the racket, the importance of footwork, and how to keep her eye on the ball. “Tennis is
about focus and persistence,” he would say, and Kathryn took that advice to heart.
By the time she was 12, Kathryn was playing in local junior tournaments. At first, she lost more matches than she won,
but she never let the defeats discourage her. “Every loss teaches you something,” she told herself. Gradually, her
skills improved, and she started to win.
The Turning Point
As Kathryn entered high school, her passion for tennis only grew stronger. She spent hours after school practicing her
backhand and perfecting her serve. She joined her schools tennis team, where she met her new coach, Ms. Carter. Unlike
her earlier coaches, Ms. Carter focused on strategy and mental toughness.
“Kathryn, tennis isnt just physical. Its a mental game too,” she said one day after a tough match. “You need to stay
calm under pressure and think a few steps ahead of your opponent.”
That advice changed everything for Kathryn. She began analyzing her matches, understanding her opponents patterns, and
using strategy to outplay them. By her senior year, she was the captain of her team and had won several regional
championships.
Chasing the Dream
After high school, Kathryn decided to pursue tennis seriously. She joined a competitive training academy, where the
practices were grueling, and the competition was fierce. There were times she doubted herself, especially after losing
matches to stronger players. But her love for the game kept her going.
Her coaches helped her refine her technique, adding finesse to her volleys and power to her forehand. She also learned
to play smarter, conserving energy during long matches and capitalizing on her opponents weaknesses.
Becoming a Player
By the time Kathryn was in her early 20s, she was competing in national tournaments. She wasnt the biggest name on the
court, but her hard work and persistence earned her respect. Each match was a chance to learn, grow, and prove herself.
She eventually won her first title at a mid-level tournament, a moment she would never forget. Standing on the podium,
holding the trophy, she realized how far she had come—from the little girl who couldnt hit a serve to a tennis player
with real potential.
A Life of Tennis
Today, Kathryn continues to play with the same passion she had when she first picked up a racket. She travels to
tournaments, trains every day, and inspires young players to follow their dreams. For her, tennis is more than a
sport—its a lifelong journey of growth, persistence, and joy.

View file

@ -0,0 +1,125 @@
# *The Five Dysfunctions of a Team* by Patrick Lencioni
*The Five Dysfunctions of a Team* by Patrick Lencioni is a compelling exploration of team dynamics and the common
pitfalls that undermine successful collaboration. Through the lens of a fictional story about a Silicon Valley startup,
DecisionTech, and its CEO Kathryn Petersen, Lencioni provides a practical framework to address and resolve issues that
commonly disrupt team cohesion and performance. Below is a chapter-by-chapter look at the books content, capturing its
essential lessons and actionable insights.
---
## Part I: Underachievement
In this introductory section, we meet Kathryn Petersen, the newly appointed CEO of DecisionTech, a struggling Silicon
Valley startup with a dysfunctional executive team. Kathryn steps into a role where the team is plagued by poor
communication, lack of trust, and weak commitment.
Lencioni uses this setup to introduce readers to the core problems affecting team productivity and morale. Kathryn
realizes that the teams challenges are deeply rooted in its dynamics rather than surface-level operational issues.
Through her initial observations, she identifies that turning around the team will require addressing foundational
issues like trust, respect, and open communication.
---
## Part II: Lighting the Fire
To start addressing these issues, Kathryn organizes an offsite meeting in Napa Valley. This setting becomes a
transformative space where Kathryn pushes the team to be present, vulnerable, and engaged. Her goal is to build trust, a
critical foundation for any team.
Kathryn leads exercises that reveal personal histories, enabling the team members to see each other beyond their
professional roles. She also introduces the idea of constructive conflict, encouraging open discussion about
disagreements and differing opinions. Despite the discomfort this causes for some team members who are used to
individualistic work styles, Kathryn emphasizes that trust and openness are crucial for effective teamwork.
---
## Part III: Heavy Lifting
With initial trust in place, Kathryn shifts her focus to accountability and responsibility. This part highlights the
challenges team members face when taking ownership of collective goals.
Kathryn holds the team to high standards, stressing the importance of addressing issues directly instead of avoiding
them. This section also examines the role of healthy conflict as a mechanism for growth, as team members begin to hold
each other accountable for their contributions. Through challenging conversations, they tackle topics like performance
expectations and role clarity. Kathryns persistence helps the team understand that embracing accountability is
essential for progress, even if it leads to uncomfortable discussions.
---
## Part IV: Traction
By this stage, Kathryn reinforces the teams commitment to shared goals. The team starts experiencing the tangible
benefits of improved trust and open conflict. Accountability has now become an expected part of their routine, and
meetings are increasingly productive.
As they move towards achieving measurable results, the focus shifts from individual successes to collective
achievements. Kathryn ensures that each member appreciates the value of prioritizing team success over personal gain.
Through this unified approach, the teams motivation and performance visibly improve, demonstrating the power of
cohesive collaboration.
---
## The Model: Overcoming the Five Dysfunctions
Lencioni introduces a model that identifies the five key dysfunctions of a team and provides strategies to overcome
them:
1. **Absence of Trust**
The lack of trust prevents team members from being vulnerable and open with each other. Lencioni suggests exercises
that encourage personal sharing to build this essential foundation.
2. **Fear of Conflict**
Teams that avoid conflict miss out on critical discussions that lead to better decision-making. Lencioni recommends
fostering a safe environment where team members feel comfortable challenging each others ideas without fear of
reprisal.
3. **Lack of Commitment**
Without clarity and buy-in, team decisions become fragmented. Leaders should ensure everyone understands and agrees
on goals to achieve genuine commitment.
4. **Avoidance of Accountability**
When team members dont hold each other accountable, performance suffers. Regular check-ins and peer accountability
encourage responsibility and consistency.
5. **Inattention to Results**
Prioritizing individual goals over collective outcomes dilutes team success. Aligning rewards and recognition with
team achievements helps refocus efforts on shared objectives.
---
## Understanding and Overcoming Each Dysfunction
Each dysfunction is further broken down with practical strategies:
- **Building Trust**
Kathryns personal history exercise is one example of building trust. By sharing backgrounds and opening up, team
members foster a culture of vulnerability and connection.
- **Encouraging Conflict**
Constructive conflict allows ideas to be challenged and strengthened. Kathryns insistence on open debate helps the
team reach better, more robust decisions.
- **Ensuring Commitment**
Lencioni highlights the importance of clarity and alignment, which Kathryn reinforces by facilitating discussions that
ensure all team members are on the same page about their goals.
- **Embracing Accountability**
Accountability becomes ingrained as team members regularly check in with each other, creating a culture of mutual
responsibility and high standards.
- **Focusing on Results**
Kathryns focus on collective achievements over individual successes aligns with Lencionis advice to reward team
efforts, ensuring the entire group works toward a shared purpose.
---
## Final Thoughts
*The Five Dysfunctions of a Team* illustrates the importance of cohesive team behavior and effective leadership in
overcoming common organizational challenges. Through Kathryns story, Lencioni provides a practical roadmap for leaders
and teams to diagnose and address dysfunctions, ultimately fostering an environment where trust, accountability, and
shared goals drive performance.
This book remains a valuable resource for anyone seeking to understand and improve team dynamics, with lessons that
apply well beyond the workplace.

View file

@ -1,13 +1,140 @@
use std::path::PathBuf;
use std::time::Duration;
use appflowy_ai_client::dto::CalculateSimilarityParams;
use client_api_test::{collect_answer, TestClient};
use collab::preclude::Collab;
use collab_document::document::Document;
use collab_document::importer::md_importer::MDImporter;
use collab_entity::CollabType;
use shared_entity::dto::chat_dto::{CreateChatMessageParams, CreateChatParams};
use tokio::time::sleep;
use client_api_test::TestClient;
use workspace_template::document::getting_started::getting_started_document_data;
#[tokio::test]
async fn test_embedding_when_create_document() {
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id_1 = uuid::Uuid::new_v4().to_string();
let the_five_dysfunctions_of_a_team =
create_document_collab(&object_id_1, "the_five_dysfunctions_of_a_team.md").await;
let encoded_collab = the_five_dysfunctions_of_a_team.encode_collab().unwrap();
test_client
.create_collab_with_data(
&workspace_id,
&object_id_1,
CollabType::Document,
encoded_collab,
)
.await
.unwrap();
let object_id_2 = uuid::Uuid::new_v4().to_string();
let tennis_player = create_document_collab(&object_id_2, "kathryn_tennis_story.md").await;
let encoded_collab = tennis_player.encode_collab().unwrap();
test_client
.create_collab_with_data(
&workspace_id,
&object_id_2,
CollabType::Document,
encoded_collab,
)
.await
.unwrap();
let search_resp = test_client
.api_client
.search_documents(&workspace_id, "Kathryn", 5, 100)
.await
.unwrap();
// The number of returned documents affected by the max token size when splitting the document
// into chunks.
assert_eq!(search_resp.len(), 2);
if ai_test_enabled() {
let previews = search_resp
.iter()
.map(|item| item.preview.clone().unwrap())
.collect::<Vec<String>>()
.join("\n");
let params = CalculateSimilarityParams {
workspace_id: workspace_id.clone(),
input: previews,
expected: r#"
"Kathryns Journey to Becoming a Tennis Player Kathryns love for tennis began on a warm summer day w
yn decided to pursue tennis seriously. She joined a competitive training academy, where the
practice
mwork. Part III: Heavy Lifting With initial trust in place, Kathryn shifts her focus to accountabili
s ideas without fear of
reprisal. Lack of Commitment Without clarity and buy-in, team decisions bec
The Five Dysfunctions of a Team by Patrick Lencioni The Five Dysfunctions of a Team by Patrick Lenci"
"#
.to_string(),
};
let score = test_client
.api_client
.calculate_similarity(params)
.await
.unwrap()
.score;
assert!(
score > 0.85,
"preview score should greater than 0.85, but got: {}",
score
);
// Create a chat to ask questions that related to the five dysfunctions of a team.
let chat_id = uuid::Uuid::new_v4().to_string();
let params = CreateChatParams {
chat_id: chat_id.clone(),
name: "chat with the five dysfunctions of a team".to_string(),
rag_ids: vec![object_id_1],
};
test_client
.api_client
.create_chat(&workspace_id, params)
.await
.unwrap();
let params = CreateChatMessageParams::new_user("Tell me what Kathryn concisely?");
let question = test_client
.api_client
.create_question(&workspace_id, &chat_id, params)
.await
.unwrap();
let answer_stream = test_client
.api_client
.stream_answer_v2(&workspace_id, &chat_id, question.message_id)
.await
.unwrap();
let answer = collect_answer(answer_stream).await;
let params = CalculateSimilarityParams {
workspace_id,
input: answer,
expected: r#"
Kathryn Petersen is the newly appointed CEO of DecisionTech, a struggling Silicon Valley startup.
She steps into a role facing a dysfunctional executive team characterized by poor communication,
lack of trust, and weak commitment. Throughout the narrative, Kathryn focuses on addressing
foundational team issues by fostering trust, encouraging open conflict, and promoting accountability,
ultimately leading her team toward improved collaboration and performance.
"#
.to_string(),
};
let score = test_client
.api_client
.calculate_similarity(params)
.await
.unwrap()
.score;
assert!(score > 0.9, "score: {}", score);
}
}
#[ignore]
#[tokio::test]
async fn test_document_indexing_and_search() {
@ -56,3 +183,18 @@ async fn test_document_indexing_and_search() {
let preview = item.preview.clone().unwrap();
assert!(preview.contains("Welcome to AppFlowy"));
}
async fn create_document_collab(document_id: &str, file_name: &str) -> Document {
let file_path = PathBuf::from(format!("tests/search/asset/{}", file_name));
let md = std::fs::read_to_string(file_path).unwrap();
let importer = MDImporter::new(None);
let document_data = importer.import(document_id, md).unwrap();
Document::create(document_id, document_data).unwrap()
}
pub fn ai_test_enabled() -> bool {
if cfg!(feature = "ai-test-enabled") {
return true;
}
false
}