chore: adjust chunk size (#1334)

* chore: adjust chunk size

* chore: deduplcate search results

* chore: fix test

* chore: update logs

* chore: separate search and summary

* chore: fix test
This commit is contained in:
Nathan.fooo 2025-04-12 22:22:17 +08:00 committed by GitHub
parent c76d2f0b4b
commit ef40f8faa4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 309 additions and 195 deletions

View file

@ -90,7 +90,7 @@ jobs:
- name: Replace values in .env
run: |
# log level
sed -i 's|RUST_LOG=.*|RUST_LOG=trace|' .env
sed -i 's|RUST_LOG=.*|RUST_LOG='appflowy_cloud=trace,appflowy_worker=trace,database=trace,indexer=trace'|' .env
sed -i 's|GOTRUE_SMTP_USER=.*|GOTRUE_SMTP_USER=${{ secrets.CI_GOTRUE_SMTP_USER }}|' .env
sed -i 's|GOTRUE_SMTP_PASS=.*|GOTRUE_SMTP_PASS=${{ secrets.CI_GOTRUE_SMTP_PASS }}|' .env
sed -i 's|GOTRUE_SMTP_ADMIN_EMAIL=.*|GOTRUE_SMTP_ADMIN_EMAIL=${{ secrets.CI_GOTRUE_SMTP_ADMIN_EMAIL }}|' .env
@ -101,6 +101,9 @@ jobs:
sed -i 's|APPFLOWY_MAILER_SMTP_USERNAME=.*|APPFLOWY_MAILER_SMTP_USERNAME=${{ secrets.CI_GOTRUE_SMTP_USER }}|' .env
sed -i 's|APPFLOWY_MAILER_SMTP_PASSWORD=.*|APPFLOWY_MAILER_SMTP_PASSWORD=${{ secrets.CI_GOTRUE_SMTP_PASS }}|' .env
sed -i 's|AI_OPENAI_API_KEY=.*|AI_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}|' .env
sed -i 's|AI_OPENAI_API_SUMMARY_MODEL=.*|AI_OPENAI_API_SUMMARY_MODEL="gpt-4o-mini"|' .env
sed -i 's|APPFLOWY_EMBEDDING_CHUNK_SIZE=.*|APPFLOWY_EMBEDDING_CHUNK_SIZE=500|' .env
sed -i 's|APPFLOWY_EMBEDDING_CHUNK_OVERLAP=.*|APPFLOWY_EMBEDDING_CHUNK_OVERLAP=50|' .env
sed -i 's|AI_ANTHROPIC_API_KEY=.*|AI_ANTHROPIC_API_KEY=${{ secrets.CI_AI_ANTHROPIC_API_KEY }}|' .env
sed -i "s|AI_AWS_ACCESS_KEY_ID=.*|AI_AWS_ACCESS_KEY_ID=${{ secrets.LOCAL_AI_AWS_ACCESS_KEY_ID }}|" .env
sed -i "s|AI_AWS_SECRET_ACCESS_KEY=.*|AI_AWS_SECRET_ACCESS_KEY=${{ secrets.LOCAL_AI_AWS_SECRET_ACCESS_KEY }}|" .env

View file

@ -177,7 +177,10 @@ NGINX_TLS_PORT=443
# Standard OpenAI API:
# Set your API key here if you are using the standard OpenAI API.
AI_OPENAI_API_KEY=
# If no summary model is provided, there will be no search summary when using AI search.
AI_OPENAI_API_SUMMARY_MODEL=
APPFLOWY_EMBEDDING_CHUNK_SIZE=1000
APPFLOWY_EMBEDDING_CHUNK_OVERLAP=200
# Azure-hosted OpenAI API:
# If you're using a self-hosted OpenAI API via Azure, leave AI_OPENAI_API_KEY empty

View file

@ -120,7 +120,10 @@ CLOUDFLARE_TUNNEL_TOKEN=
# Standard OpenAI API:
# Set your API key here if you are using the standard OpenAI API.
AI_OPENAI_API_KEY=
AI_OPENAI_API_SUMMARY_MODEL=
# If no summary model is provided, there will be no search summary when using AI search.
AI_OPENAI_API_SUMMARY_MODEL="gpt-4o-mini"
APPFLOWY_EMBEDDING_CHUNK_SIZE=500
APPFLOWY_EMBEDDING_CHUNK_OVERLAP=50
# Azure-hosted OpenAI API:
# If you're using a self-hosted OpenAI API via Azure, leave AI_OPENAI_API_KEY empty

View file

@ -190,6 +190,9 @@ pub enum AppError {
#[error("{0}")]
InvalidBlock(String),
#[error("{0}")]
FeatureNotAvailable(String),
}
impl AppError {
@ -272,6 +275,7 @@ impl AppError {
AppError::ApplyUpdateError(_) => ErrorCode::ApplyUpdateError,
AppError::ActionTimeout(_) => ErrorCode::ActionTimeout,
AppError::InvalidBlock(_) => ErrorCode::InvalidBlock,
AppError::FeatureNotAvailable(_) => ErrorCode::FeatureNotAvailable,
}
}
}
@ -448,6 +452,7 @@ pub enum ErrorCode {
InvalidBlock = 1064,
RequestTimeout = 1065,
AIResponseError = 1066,
FeatureNotAvailable = 1067,
}
impl ErrorCode {

View file

@ -44,7 +44,7 @@ use database_entity::dto::{
SnapshotData,
};
use shared_entity::dto::ai_dto::CalculateSimilarityParams;
use shared_entity::dto::search_dto::SearchResult;
use shared_entity::dto::search_dto::SearchDocumentResponseItem;
use shared_entity::dto::workspace_dto::{
BlobMetadata, CollabResponse, EmbeddedCollabQuery, PublishedDuplicate, WorkspaceMemberChangeset,
WorkspaceMemberInvitation, WorkspaceSpaceUsage,
@ -589,7 +589,7 @@ impl TestClient {
workspace_id: &Uuid,
query: Vec<EmbeddedCollabQuery>,
) -> Vec<AFCollabEmbedInfo> {
let timeout_duration = Duration::from_secs(30);
let timeout_duration = Duration::from_secs(60);
let poll_interval = Duration::from_millis(2000);
let poll_fut = async {
loop {
@ -598,7 +598,7 @@ impl TestClient {
.batch_get_collab_embed_info(workspace_id, query.clone())
.await
{
Ok(items) if items.len() == query.len() => return Ok::<_, Error>(items),
Ok(items) if items.len() >= query.len() => return Ok::<_, Error>(items),
_ => tokio::time::sleep(poll_interval).await,
}
}
@ -608,7 +608,7 @@ impl TestClient {
match timeout(timeout_duration, poll_fut).await {
Ok(Ok(items)) => items,
Ok(Err(e)) => panic!("Test failed: {}", e),
Err(_) => panic!("Test failed: Timeout after 30 seconds."),
Err(_) => panic!("Test failed: Timeout after 30 seconds. {:?}", query),
}
}
@ -643,16 +643,16 @@ impl TestClient {
limit: u32,
preview: u32,
score_limit: Option<f32>,
) -> SearchResult {
) -> Vec<SearchDocumentResponseItem> {
timeout(Duration::from_secs(30), async {
loop {
let response = self
.api_client
.search_documents_v2(workspace_id, query, limit, preview, score_limit)
.search_documents(workspace_id, query, limit, preview, score_limit)
.await
.unwrap();
if response.items.is_empty() {
if response.is_empty() {
tokio::time::sleep(Duration::from_millis(1500)).await;
continue;
} else {

View file

@ -1286,11 +1286,3 @@ fn extract_request_id(resp: &reqwest::Response) -> Option<String> {
.get("x-request-id")
.map(|v| v.to_str().unwrap_or("invalid").to_string())
}
pub(crate) fn log_request_id(resp: &reqwest::Response) {
if let Some(request_id) = resp.headers().get("x-request-id") {
info!("request_id: {:?}", request_id);
} else {
debug!("request_id: not found");
}
}

View file

@ -1,4 +1,3 @@
use crate::http::log_request_id;
use crate::{process_response_data, process_response_error, Client};
use app_error::AppError;
@ -188,7 +187,6 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
match resp.status() {
StatusCode::OK => {

View file

@ -1,4 +1,3 @@
use crate::http::log_request_id;
use crate::{process_response_data, process_response_error, Client};
use app_error::AppError;
@ -156,7 +155,6 @@ impl Client {
app_err
}
})?;
log_request_id(&resp);
let stream = AppResponse::<serde_json::Value>::json_response_stream(resp).await?;
Ok(QuestionStream::new(stream))
}
@ -178,7 +176,6 @@ impl Client {
.json(&query)
.send()
.await?;
log_request_id(&resp);
let stream = AppResponse::<serde_json::Value>::json_response_stream(resp).await?;
Ok(QuestionStream::new(stream))
}

View file

@ -1,5 +1,4 @@
use crate::entity::CollabType;
use crate::http::log_request_id;
use crate::{
blocking_brotli_compress, brotli_compress, process_response_data, process_response_error, Client,
};
@ -469,6 +468,23 @@ impl Client {
.map(|data| data.0)
}
pub async fn force_generate_collab_embeddings(
&self,
workspace_id: &Uuid,
object_id: &Uuid,
) -> Result<(), AppResponseError> {
let url = format!(
"{}/api/workspace/{workspace_id}/collab/{object_id}/generate-embedding",
self.base_url
);
let resp = self
.http_client_with_auth(Method::POST, &url)
.await?
.send()
.await?;
process_response_error(resp).await
}
pub async fn collab_full_sync(
&self,
workspace_id: &Uuid,
@ -508,7 +524,6 @@ impl Client {
.body(Bytes::from(encoded_payload))
.send()
.await?;
log_request_id(&resp);
if resp.status().is_success() {
let body = resp.bytes().await?;
let decompressed_body = zstd::decode_all(Cursor::new(body))?;

View file

@ -1,43 +1,24 @@
use app_error::ErrorCode;
use reqwest::Method;
use shared_entity::dto::search_dto::{SearchDocumentResponseItem, SearchResult};
use shared_entity::dto::search_dto::{
SearchDocumentResponseItem, SearchResult, SearchSummaryResult, SummarySearchResultRequest,
};
use shared_entity::response::AppResponseError;
use uuid::Uuid;
use crate::{process_response_data, Client};
impl Client {
pub async fn search_documents(
&self,
workspace_id: &Uuid,
query: &str,
limit: u32,
preview_size: u32,
) -> Result<Vec<SearchDocumentResponseItem>, AppResponseError> {
let query = serde_urlencoded::to_string([
("query", query),
("limit", &limit.to_string()),
("preview_size", &preview_size.to_string()),
])
.map_err(|err| AppResponseError::new(ErrorCode::InvalidRequest, err.to_string()))?;
let url = format!("{}/api/search/{workspace_id}?{query}", self.base_url);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.send()
.await?;
process_response_data::<Vec<SearchDocumentResponseItem>>(resp).await
}
/// High score means more relevant
pub async fn search_documents_v2<T: Into<Option<f32>>>(
/// If `score` is `None`, it will use the score from the server. High score means more relevant.
/// score range is 0.0 to 1.0
pub async fn search_documents<T: Into<Option<f32>>>(
&self,
workspace_id: &Uuid,
query: &str,
limit: u32,
preview_size: u32,
score: T,
) -> Result<SearchResult, AppResponseError> {
) -> Result<Vec<SearchDocumentResponseItem>, AppResponseError> {
let mut raw_query = Vec::with_capacity(4);
raw_query.push(("query", query.to_string()));
raw_query.push(("limit", limit.to_string()));
@ -50,12 +31,35 @@ impl Client {
let query = serde_urlencoded::to_string(raw_query)
.map_err(|err| AppResponseError::new(ErrorCode::InvalidRequest, err.to_string()))?;
let url = format!("{}/api/search/v2/{workspace_id}?{query}", self.base_url);
let url = format!("{}/api/search/{workspace_id}?{query}", self.base_url);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.send()
.await?;
process_response_data::<SearchResult>(resp).await
process_response_data::<Vec<SearchDocumentResponseItem>>(resp).await
}
/// High score means more relevant
pub async fn generate_search_summary(
&self,
workspace_id: &Uuid,
query: &str,
search_results: Vec<SearchResult>,
) -> Result<SearchSummaryResult, AppResponseError> {
let payload = SummarySearchResultRequest {
query: query.to_string(),
search_results,
only_context: true,
};
let url = format!("{}/api/search/{workspace_id}/summary", self.base_url);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.json(&payload)
.send()
.await?;
process_response_data::<SearchSummaryResult>(resp).await
}
}

View file

@ -26,22 +26,31 @@ pub async fn search_documents<'a, E: Executor<'a, Database = Postgres>>(
search_tokens_consumed = af_workspace_ai_usage.search_tokens_consumed + $6
RETURNING workspace_id
)
SELECT
em.oid AS object_id,
collab.workspace_id,
collab.partition_key AS collab_type,
em.content_type,
em.content AS content,
LEFT(em.content, $4) AS content_preview,
u.name AS created_by,
collab.created_at AS created_at,
em.embedding <=> $3 AS distance
FROM af_collab_embeddings em
JOIN af_collab collab ON em.oid = collab.oid
SELECT
collab.oid AS object_id,
collab.workspace_id,
collab.partition_key AS collab_type,
em.content_type,
em.content,
LEFT(em.content, $4) AS content_preview,
u.name AS created_by,
collab.created_at AS created_at,
em.embedding <=> $3 AS distance
FROM af_collab collab
JOIN LATERAL (
-- Fetch the most relevant embedding per collab.oid
SELECT *
FROM af_collab_embeddings
WHERE oid = collab.oid
ORDER BY embedding <=> $3 -- Use vector index for sorting
LIMIT 1 -- Only keep the top result
) em ON true
JOIN af_user u ON collab.owner_uid = u.uid
WHERE collab.workspace_id = $2 AND (collab.oid = ANY($7::uuid[]))
ORDER BY em.embedding <=> $3
LIMIT $5
WHERE
collab.workspace_id = $2
AND collab.oid = ANY($7::uuid[])
ORDER BY distance
LIMIT $5;
"#,
)
.bind(params.user_id)

View file

@ -11,7 +11,7 @@ use collab_document::document::DocumentBody;
use database_entity::dto::{AFCollabEmbeddedChunk, AFCollabEmbeddings, EmbeddingContentType};
use infra::env_util::get_env_var;
use serde_json::json;
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};
use twox_hash::xxhash64::Hasher;
use uuid::Uuid;
@ -55,12 +55,12 @@ impl Indexer for DocumentIndexer {
object_id,
paragraphs,
model,
get_env_var("APPFLOWY_EMBEDDING_CHUNK_SIZE", "8000")
get_env_var("APPFLOWY_EMBEDDING_CHUNK_SIZE", "1000")
.parse::<usize>()
.unwrap_or(8000),
get_env_var("APPFLOWY_EMBEDDING_CHUNK_OVERLAP", "500")
.unwrap_or(1000),
get_env_var("APPFLOWY_EMBEDDING_CHUNK_OVERLAP", "200")
.parse::<usize>()
.unwrap_or(500),
.unwrap_or(200),
)
}
@ -96,14 +96,12 @@ impl Indexer for DocumentIndexer {
.map_err(|err| AppError::Unhandled(err.to_string()))?;
let resp = embedder.async_embed(request).await?;
trace!(
"[Embedding] requested {} embeddings, received {} embeddings",
valid_indices.len(),
resp.data.len()
);
if resp.data.len() != valid_indices.len() {
error!(
"[Embedding] requested {} embeddings, received {} embeddings",
valid_indices.len(),
resp.data.len()
);
return Err(AppError::Unhandled(format!(
"Mismatch in number of embeddings requested and received: {} vs {}",
valid_indices.len(),
@ -123,6 +121,13 @@ impl Indexer for DocumentIndexer {
}
}
/// chunk_size:
/// Small Chunks (50256 tokens): Best for precision-focused tasks (e.g., Q&A, technical docs) where specific details matter.
/// Medium Chunks (2561,024 tokens): Ideal for balanced tasks like RAG or contextual search, providing enough context without noise.
/// Large Chunks (1,0242,048 tokens): Suited for analysis or thematic tasks where broad understanding is key.
///
/// overlap:
/// Add 1020% overlap for larger chunks (e.g., 50100 tokens for 512-token chunks) to preserve context across boundaries.
pub fn split_text_into_chunks(
object_id: Uuid,
paragraphs: Vec<String>,
@ -170,5 +175,13 @@ pub fn split_text_into_chunks(
);
}
}
trace!(
"[Embedding] Created {} chunks for object_id `{}`, chunk_size: {}, overlap: {}",
chunks.len(),
object_id,
chunk_size,
overlap
);
Ok(chunks)
}

View file

@ -426,12 +426,11 @@ pub async fn spawn_pg_write_embeddings(
break;
}
trace!("[Embedding] pg received {} embeddings to write", n);
let start = Instant::now();
let records = buf.drain(..n).collect::<Vec<_>>();
for record in records.iter() {
debug!(
"[Embedding] generate collab:{} embeddings, tokens used: {}",
"[Embedding] pg write collab:{} embeddings, tokens used: {}",
record.object_id, record.tokens_used
);
}
@ -449,7 +448,6 @@ pub async fn spawn_pg_write_embeddings(
match result {
Ok(_) => {
trace!("[Embedding] save {} embeddings to disk", n);
metrics.record_write_embedding_time(start.elapsed().as_millis());
},
Err(err) => {

View file

@ -95,7 +95,7 @@ const SYSTEM_PROMPT: &str = r#"
You are a strict, context-based question answering assistant.
Provide an answer with appropriate metadata in JSON format.
For each answer, include:
1. The answer text
1. The answer text. It must be concise summaries and highly precise.
2. Metadata with empty JSON map
3. A numeric relevance score (0.0 to 1.0):
- 1.0: Completely relevant.
@ -106,7 +106,7 @@ const ONLY_CONTEXT_SYSTEM_PROMPT: &str = r#"
You are a strict, context-bound question answering assistant. Answer solely based on the text provided below. If the context lacks sufficient information for a confident response, reply with an empty answer.
Your response must include:
1. The answer text.
1. The answer text. It must be concise summaries and highly precise.
2. Metadata extracted from the context. (If the answer is not relevant, return an empty JSON Map.)
3. A numeric score (0.0 to 1.0) indicating the answer's relevance to the user's question:
- 1.0: Completely relevant.
@ -116,7 +116,7 @@ Your response must include:
Do not reference or use any information beyond what is provided in the context.
"#;
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct LLMDocument {
pub content: String,
pub metadata: Value,
@ -158,7 +158,7 @@ pub async fn summarize_documents<C: Config>(
let response_format = ResponseFormat::JsonSchema {
json_schema: ResponseFormatJsonSchema {
description: Some("A response containing a list of answers, each with the answer text, metadata extracted from context, and relevance score".to_string()),
name: "ChatResponse".into(),
name: "SummarySearchResponse".into(),
schema: Some(schema_value),
strict: Some(true),
},

View file

@ -17,22 +17,39 @@ pub struct SearchDocumentRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub preview_size: Option<u32>,
#[serde(default = "default_only_context")]
pub only_context: bool,
#[serde(default = "default_search_score_limit")]
pub score: f64,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SearchResult {
pub object_id: Uuid,
pub content: String,
}
impl From<&SearchDocumentResponseItem> for SearchResult {
fn from(value: &SearchDocumentResponseItem) -> Self {
Self {
object_id: value.object_id,
content: value.content.clone(),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SummarySearchResultRequest {
pub query: String,
pub search_results: Vec<SearchResult>,
pub only_context: bool,
}
fn default_search_score_limit() -> f64 {
// Higher score means better match.
0.4
}
fn default_only_context() -> bool {
true
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Summary {
pub content: String,
@ -43,9 +60,8 @@ pub struct Summary {
/// Response array element for the collab vector search query.
/// See: [SearchDocumentRequest].
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SearchResult {
pub struct SearchSummaryResult {
pub summaries: Vec<Summary>,
pub items: Vec<SearchDocumentResponseItem>,
}
/// Response array element for the collab vector search query.
@ -63,6 +79,9 @@ pub struct SearchDocumentResponseItem {
/// Type of the content to be presented in preview field. This is a hint what
/// kind of content was used to match the user query ie. document plain text, pdf attachment etc.
pub content_type: Option<SearchContentType>,
/// Content of the document. This is a full content of the document, not just a preview.
#[serde(default)]
pub content: String,
/// First N characters of the indexed content matching the user query. It doesn't have to contain
/// the user query itself.
pub preview: Option<String>,

View file

@ -1,26 +1,28 @@
use crate::biz::search::{search_document, summary_search_results};
use crate::state::AppState;
use access_control::act::Action;
use actix_web::web::{Data, Query};
use actix_web::web::{Data, Json, Query};
use actix_web::{web, Scope};
use async_openai::config::{AzureConfig, OpenAIConfig};
use authentication::jwt::Authorization;
use llm_client::chat::{AITool, AzureOpenAIChat, OpenAIChat};
use shared_entity::dto::search_dto::{
SearchDocumentRequest, SearchDocumentResponseItem, SearchResult,
SearchDocumentRequest, SearchDocumentResponseItem, SearchSummaryResult,
SummarySearchResultRequest,
};
use shared_entity::response::{AppResponse, JsonAppResponse};
use uuid::Uuid;
use crate::biz::search::search_document;
use crate::state::AppState;
pub fn search_scope() -> Scope {
web::scope("/api/search")
.service(web::resource("{workspace_id}").route(web::get().to(document_search)))
.service(web::resource("/v2/{workspace_id}").route(web::get().to(document_search_v2)))
.service(web::resource("{workspace_id}").route(web::get().to(document_search_handler)))
.service(
web::resource("/{workspace_id}/summary").route(web::get().to(summary_search_results_handler)),
)
}
#[tracing::instrument(skip(state, auth, payload), err)]
async fn document_search(
async fn document_search_handler(
auth: Authorization,
path: web::Path<Uuid>,
payload: Query<SearchDocumentRequest>,
@ -43,19 +45,18 @@ async fn document_search(
workspace_id,
request,
metrics,
None,
)
.await?;
Ok(AppResponse::Ok().with_data(resp.items).into())
Ok(AppResponse::Ok().with_data(resp).into())
}
#[tracing::instrument(skip(state, auth, payload), err)]
async fn document_search_v2(
async fn summary_search_results_handler(
auth: Authorization,
path: web::Path<Uuid>,
payload: Query<SearchDocumentRequest>,
payload: Json<SummarySearchResultRequest>,
state: Data<AppState>,
) -> actix_web::Result<JsonAppResponse<SearchResult>> {
) -> actix_web::Result<JsonAppResponse<SearchSummaryResult>> {
let workspace_id = path.into_inner();
let request = payload.into_inner();
let user_uuid = auth.uuid()?;
@ -64,20 +65,10 @@ async fn document_search_v2(
.workspace_access_control
.enforce_action(&uid, &workspace_id, Action::Read)
.await?;
let metrics = &*state.metrics.request_metrics;
let ai_tool = create_ai_tool(&state.config.azure_ai_config, &state.config.open_ai_config);
let resp = search_document(
&state.pg_pool,
&state.collab_access_control_storage,
&state.indexer_scheduler,
uid,
workspace_id,
request,
metrics,
ai_tool,
)
.await?;
Ok(AppResponse::Ok().with_data(resp).into())
let result = summary_search_results(ai_tool, request).await?;
Ok(AppResponse::Ok().with_data(result).into())
}
pub fn create_ai_tool(

View file

@ -37,7 +37,9 @@ use actix_web::{web, HttpResponse, ResponseError, Scope};
use actix_web::{HttpRequest, Result};
use anyhow::{anyhow, Context};
use app_error::{AppError, ErrorCode};
use appflowy_collaborate::actix_ws::entities::{ClientHttpStreamMessage, ClientHttpUpdateMessage};
use appflowy_collaborate::actix_ws::entities::{
ClientGenerateEmbeddingMessage, ClientHttpStreamMessage, ClientHttpUpdateMessage,
};
use authentication::jwt::{Authorization, OptionalUserUuid, UserUuid};
use bytes::BytesMut;
use chrono::{DateTime, Duration, Utc};
@ -169,6 +171,10 @@ pub fn workspace_scope() -> Scope {
web::resource("/{workspace_id}/collab/{object_id}/embed-info")
.route(web::get().to(get_collab_embed_info_handler)),
)
.service(
web::resource("/{workspace_id}/collab/{object_id}/generate-embedding")
.route(web::get().to(force_generate_collab_embedding_handler)),
)
.service(
web::resource("/{workspace_id}/collab/embed-info/list")
.route(web::post().to(batch_get_collab_embed_info_handler)),
@ -2712,6 +2718,20 @@ async fn get_collab_embed_info_handler(
Ok(Json(AppResponse::Ok().with_data(info)))
}
async fn force_generate_collab_embedding_handler(
path: web::Path<(Uuid, Uuid)>,
server: Data<RealtimeServerAddr>,
) -> Result<Json<AppResponse<()>>> {
let (workspace_id, object_id) = path.into_inner();
let request = ClientGenerateEmbeddingMessage {
workspace_id,
object_id,
return_tx: None,
};
let _ = server.try_send(request);
Ok(Json(AppResponse::Ok()))
}
#[instrument(level = "debug", skip_all)]
async fn batch_get_collab_embed_info_handler(
state: Data<AppState>,
@ -2824,6 +2844,7 @@ async fn collab_full_sync_handler(
let encoded = tokio::task::spawn_blocking(move || zstd::encode_all(Cursor::new(data), 3))
.await
.map_err(|err| AppError::Internal(anyhow!("Failed to compress data: {}", err)))??;
Ok(HttpResponse::Ok().body(encoded))
},
Ok(None) => Ok(HttpResponse::InternalServerError().finish()),

View file

@ -15,7 +15,8 @@ use infra::env_util::get_env_var;
use llm_client::chat::{AITool, LLMDocument};
use serde_json::json;
use shared_entity::dto::search_dto::{
SearchContentType, SearchDocumentRequest, SearchDocumentResponseItem, SearchResult, Summary,
SearchContentType, SearchDocumentRequest, SearchDocumentResponseItem, SearchSummaryResult,
Summary, SummarySearchResultRequest,
};
use sqlx::PgPool;
use std::collections::HashSet;
@ -71,6 +72,7 @@ fn populate_searchable_view_ids(
);
}
}
#[allow(clippy::too_many_arguments)]
pub async fn search_document(
pg_pool: &PgPool,
@ -80,8 +82,7 @@ pub async fn search_document(
workspace_uuid: Uuid,
request: SearchDocumentRequest,
metrics: &RequestMetrics,
ai_tool: Option<AITool>,
) -> Result<SearchResult, AppError> {
) -> Result<Vec<SearchDocumentResponseItem>, AppError> {
// Set up the embedding model and create an embedding request.
let default_model = EmbeddingModel::default_model();
let embeddings_request = CreateEmbeddingRequestArgs::default()
@ -142,63 +143,21 @@ pub async fn search_document(
};
trace!(
"[Search] user_id: {}, workspace_id: {}, limit: {}, score: {:?}, keyword: {}",
params.user_id,
params.workspace_id,
"[Search] query: {}, limit: {}, score: {:?}, workspace: {}",
request.query,
params.limit,
params.score,
request.query,
params.workspace_id,
);
// Perform document search.
let results = search_documents(pg_pool, params, total_tokens).await?;
trace!(
"[Search] user {} search request in workspace {} returned {} results for query: `{}`",
uid,
workspace_uuid,
"[Search] query:{}, got {} results",
request.query,
results.len(),
request.query
);
let mut summaries = Vec::new();
if !results.is_empty() {
if let Some(ai_chat) = ai_tool {
let model_name = get_env_var("AI_OPENAI_API_SUMMARY_MODEL", "gpt-4o-mini");
trace!("using {} model to summarize search results", model_name);
let llm_docs: Vec<LLMDocument> = results
.iter()
.map(|result| {
LLMDocument::new(
result.content.clone(),
json!({
"id": result.object_id,
"source": "appflowy",
"name": "document",
}),
)
})
.collect();
match ai_chat
.summary_documents(&request.query, &model_name, &llm_docs, request.only_context)
.await
{
Ok(resp) => {
trace!("AI summary search document response: {:?}", resp);
summaries = resp
.summaries
.into_iter()
.map(|s| Summary {
content: s.content,
metadata: s.metadata,
score: s.score,
})
.collect();
},
Err(err) => error!("AI summary search document failed, error: {:?}", err),
}
}
}
// Build and return the search result, mapping each document to its response item.
let items = results
.into_iter()
@ -210,8 +169,72 @@ pub async fn search_document(
preview: Some(item.content.chars().take(preview_size as usize).collect()),
created_by: item.created_by,
created_at: item.created_at,
content: item.content,
})
.collect();
Ok(SearchResult { summaries, items })
Ok(items)
}
pub async fn summary_search_results(
ai_tool: Option<AITool>,
request: SummarySearchResultRequest,
) -> Result<SearchSummaryResult, AppError> {
if request.search_results.is_empty() {
return Ok(SearchSummaryResult { summaries: vec![] });
}
if ai_tool.is_none() {
return Err(AppError::FeatureNotAvailable(
"AI tool is not available".to_string(),
));
}
let ai_tool = ai_tool.unwrap();
let model_name = get_env_var("AI_OPENAI_API_SUMMARY_MODEL", "gpt-4o-mini");
let mut summaries = Vec::new();
trace!(
"[Search] use {} model to summarize search results",
model_name
);
let SummarySearchResultRequest {
query,
search_results,
only_context,
} = request;
let llm_docs: Vec<LLMDocument> = search_results
.into_iter()
.map(|result| {
LLMDocument::new(
result.content,
json!({
"id": result.object_id,
"source": "appflowy",
"name": "document",
}),
)
})
.collect();
match ai_tool
.summary_documents(&query, &model_name, &llm_docs, only_context)
.await
{
Ok(resp) => {
trace!("AI summary search document response: {:?}", resp);
summaries = resp
.summaries
.into_iter()
.map(|s| Summary {
content: s.content,
metadata: s.metadata,
score: s.score,
})
.collect();
},
Err(err) => error!("AI summary search document failed, error: {:?}", err),
}
Ok(SearchSummaryResult { summaries })
}

View file

@ -76,7 +76,7 @@ async fn chat_with_multiple_selected_source_test() {
.await;
// create chat
let chat_id = uuid::Uuid::new_v4().to_string();
let chat_id = Uuid::new_v4().to_string();
let params = CreateChatParams {
chat_id: chat_id.clone(),
name: "my first chat".to_string(),

View file

@ -9,7 +9,6 @@ async fn chat_with_search_result_simple() {
if !ai_test_enabled() {
return;
}
load_env();
let (open_ai_config, azure_config) = get_open_ai_config();
let ai_chat = create_ai_tool(&azure_config, &open_ai_config).unwrap();
let model_name = "gpt-4o-mini";

View file

@ -94,9 +94,9 @@ async fn document_full_sync_then_search_test() {
let local_plain_text = local_document.document.paragraphs().join("");
assert_eq!(local_plain_text, remote_plain_text);
let search_result = test_client
let items = test_client
.wait_unit_get_search_result(&workspace_id, "workflows", 1, 200, Some(0.3))
.await;
assert_eq!(search_result.items.len(), 1);
assert_eq!(search_result.items[0].preview, Some("AppFlowy is an open-source project.It is an alternative to tools like Notion.AppFlowy provides full control of your data.The project is built using Flutter for the frontend.Rust powers AppFlowy's back".to_string()));
assert_eq!(items.len(), 1);
assert_eq!(items[0].preview, Some("AppFlowy is an open-source project.It is an alternative to tools like Notion.AppFlowy provides full control of your data.The project is built using Flutter for the frontend.Rust powers AppFlowy's back".to_string()));
}

View file

@ -9,6 +9,7 @@ use collab_document::importer::md_importer::MDImporter;
use collab_entity::CollabType;
use collab_folder::ViewLayout;
use shared_entity::dto::chat_dto::{CreateChatMessageParams, CreateChatParams};
use shared_entity::dto::search_dto::SearchResult;
use tokio::time::sleep;
use uuid::Uuid;
use workspace_template::document::getting_started::getting_started_document_data;
@ -42,15 +43,28 @@ async fn test_embedding_when_create_document() {
.await;
// Test Search
let search_resp = test_client
.wait_unit_get_search_result(&workspace_id, "Kathryn tennis", 5, 100, Some(0.4))
let query = "Kathryn tennis";
let items = test_client
.wait_unit_get_search_result(&workspace_id, query, 5, 100, Some(0.4))
.await;
// The number of returned documents affected by the max token size when splitting the document
// into chunks.
assert_eq!(search_resp.items.len(), 1);
assert_eq!(search_resp.summaries.len(), 1);
let previews = search_resp
.items
assert_eq!(items.len(), 2);
// Test search summary
let result = test_client
.api_client
.generate_search_summary(
&workspace_id,
query,
items.iter().map(SearchResult::from).collect(),
)
.await
.unwrap();
dbg!("search summary: {}", &result);
assert!(!result.summaries.is_empty());
let previews = items
.iter()
.map(|item| item.preview.clone().unwrap())
.collect::<Vec<String>>()
@ -67,13 +81,24 @@ async fn test_embedding_when_create_document() {
.await;
// Test irrelevant search
let search_resp = test_client
let query = "Hello world";
let items = test_client
.api_client
.search_documents_v2(&workspace_id, "Hello world", 5, 100, Some(0.4))
.search_documents(&workspace_id, query, 5, 100, Some(0.4))
.await
.unwrap();
assert!(search_resp.items.is_empty());
assert!(search_resp.summaries.is_empty());
assert!(items.is_empty());
let result = test_client
.api_client
.generate_search_summary(
&workspace_id,
query,
items.into_iter().map(|v| SearchResult::from(&v)).collect(),
)
.await
.unwrap();
assert!(result.summaries.is_empty());
// Simulate when user click search result to open the document and then chat with it.
let answer = create_chat_and_ask_question(
@ -86,12 +111,8 @@ async fn test_embedding_when_create_document() {
.await;
let expected_answer = r#"
Kathryn Petersen is the newly appointed CEO of DecisionTech, a struggling Silicon Valley startup.
She steps into a role facing a dysfunctional executive team characterized by poor communication,
lack of trust, and weak commitment. Throughout the narrative, Kathryn focuses on addressing
foundational team issues by fostering trust, encouraging open conflict, and promoting accountability,
ultimately leading her team toward improved collaboration and performance.
"#;
Kathryn Petersen is the newly appointed CEO of DecisionTech, a struggling Silicon Valley startup featured in Patrick Lencioni's book "The Five Dysfunctions of a Team." She faces the challenge of leading a dysfunctional executive team characterized by poor communication, lack of trust, and weak commitment. Her role involves addressing these issues to improve team dynamics and overall performance within the company.
"#;
calculate_similarity_and_assert(
&mut test_client,
@ -137,7 +158,7 @@ async fn test_document_indexing_and_search() {
// document should get automatically indexed after opening if it wasn't indexed before
let search_resp = test_client
.api_client
.search_documents(&workspace_id, "Appflowy", 1, 20)
.search_documents(&workspace_id, "Appflowy", 1, 20, None)
.await
.unwrap();
assert_eq!(search_resp.len(), 1);