From 3181b17d60d90b2cbe2b8bd3120624045dc8493b Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 7 Apr 2025 14:44:22 +0800 Subject: [PATCH] chore: set deployment id for azure embedding (#1322) * chore: fix audit * chore: update audit config * chore: fix azure embedding * chore: adjust ai config * fix: do not generate embedding when all chunk content is empty --- Cargo.lock | 8 +++--- deny.toml | 6 ++++- deploy.env | 10 +++++++ dev.env | 10 +++++++ libs/appflowy-ai-client/src/dto.rs | 9 +++++++ libs/gotrue-entity/Cargo.toml | 1 + .../src/collab_indexer/document_indexer.rs | 12 +++++++-- libs/indexer/src/metrics.rs | 2 +- libs/indexer/src/scheduler.rs | 16 +++++++++--- libs/indexer/src/vector/embedder.rs | 23 +++++++++++++--- libs/indexer/src/vector/open_ai.rs | 12 ++++++++- services/appflowy-worker/src/application.rs | 6 ++--- .../src/indexer_worker/worker.rs | 21 ++++++++++----- src/application.rs | 5 ++-- src/biz/search/ops.rs | 4 +-- tests/ai_test/local_ai_test.rs | 26 ------------------- 16 files changed, 115 insertions(+), 56 deletions(-) delete mode 100644 tests/ai_test/local_ai_test.rs diff --git a/Cargo.lock b/Cargo.lock index 4437c966..d645e906 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4594,9 +4594,9 @@ dependencies = [ [[package]] name = "openssl" -version = "0.10.66" +version = "0.10.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" dependencies = [ "bitflags 2.6.0", "cfg-if", @@ -4635,9 +4635,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.103" +version = "0.9.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07" dependencies = [ "cc", "libc", diff --git a/deny.toml b/deny.toml index fc1986e7..db4d97b2 100644 --- a/deny.toml +++ b/deny.toml @@ -1,2 +1,6 @@ [advisories] -ignore = ["RUSTSEC-2024-0384"] +ignore = [ + "RUSTSEC-2024-0384", + "RUSTSEC-2025-0012", + "RUSTSEC-2024-0436", +] diff --git a/deploy.env b/deploy.env index 8672fc14..76f8e435 100644 --- a/deploy.env +++ b/deploy.env @@ -174,7 +174,17 @@ NGINX_PORT=80 NGINX_TLS_PORT=443 # AppFlowy AI +# Standard OpenAI API: +# Set your API key here if you are using the standard OpenAI API. AI_OPENAI_API_KEY= + +# Azure-hosted OpenAI API: +# If you're using a self-hosted OpenAI API via Azure, leave AI_OPENAI_API_KEY empty +# and set the following Azure-specific variables instead. If both are set, the standard OpenAI API will be used. +AI_AZURE_OPENAI_API_KEY= +AI_AZURE_OPENAI_API_BASE= +AI_AZURE_OPENAI_API_VERSION= + AI_ANTHROPIC_API_KEY= AI_SERVER_PORT=5001 AI_SERVER_HOST=ai diff --git a/dev.env b/dev.env index 776c071e..42e34f81 100644 --- a/dev.env +++ b/dev.env @@ -117,7 +117,17 @@ GF_SECURITY_ADMIN_PASSWORD=password CLOUDFLARE_TUNNEL_TOKEN= # AppFlowy AI +# Standard OpenAI API: +# Set your API key here if you are using the standard OpenAI API. AI_OPENAI_API_KEY= + +# Azure-hosted OpenAI API: +# If you're using a self-hosted OpenAI API via Azure, leave AI_OPENAI_API_KEY empty +# and set the following Azure-specific variables instead. If both are set, the standard OpenAI API will be used. +AI_AZURE_OPENAI_API_KEY= +AI_AZURE_OPENAI_API_BASE= +AI_AZURE_OPENAI_API_VERSION= + AI_ANTHROPIC_API_KEY= AI_SERVER_PORT=5001 AI_SERVER_HOST=localhost diff --git a/libs/appflowy-ai-client/src/dto.rs b/libs/appflowy-ai-client/src/dto.rs index 2befc654..82d7718c 100644 --- a/libs/appflowy-ai-client/src/dto.rs +++ b/libs/appflowy-ai-client/src/dto.rs @@ -231,6 +231,15 @@ pub enum EmbeddingModel { } impl EmbeddingModel { + /// Returns the default embedding model used in this system. + /// + /// This model is hardcoded and used to generate embeddings whose dimensions are + /// reflected in the PostgreSQL database schema. Changing the default model may + /// require a migration to create a new table with the appropriate dimensions. + pub fn default_model() -> Self { + EmbeddingModel::TextEmbedding3Small + } + pub fn supported_models() -> &'static [&'static str] { &[ "text-embedding-ada-002", diff --git a/libs/gotrue-entity/Cargo.toml b/libs/gotrue-entity/Cargo.toml index ed6ceaf9..c2134b38 100644 --- a/libs/gotrue-entity/Cargo.toml +++ b/libs/gotrue-entity/Cargo.toml @@ -9,5 +9,6 @@ edition = "2021" serde.workspace = true serde_json.workspace = true lazy_static = "1.4.0" +# can not upgrade to 9.3.1, it's not campatible with gotrue token jsonwebtoken = "8.3.0" app-error = { workspace = true, features = ["gotrue_error"] } diff --git a/libs/indexer/src/collab_indexer/document_indexer.rs b/libs/indexer/src/collab_indexer/document_indexer.rs index c6cefac6..acb46d69 100644 --- a/libs/indexer/src/collab_indexer/document_indexer.rs +++ b/libs/indexer/src/collab_indexer/document_indexer.rs @@ -11,7 +11,7 @@ use collab_document::document::DocumentBody; use collab_entity::CollabType; use database_entity::dto::{AFCollabEmbeddedChunk, AFCollabEmbeddings, EmbeddingContentType}; use serde_json::json; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; use twox_hash::xxhash64::Hasher; use uuid::Uuid; @@ -42,6 +42,14 @@ impl Indexer for DocumentIndexer { paragraphs: Vec, model: EmbeddingModel, ) -> Result, AppError> { + if paragraphs.is_empty() { + warn!( + "[Embedding] No paragraphs found in document `{}`. Skipping embedding.", + object_id + ); + + return Ok(vec![]); + } split_text_into_chunks(object_id, paragraphs, CollabType::Document, model) } @@ -63,7 +71,7 @@ impl Indexer for DocumentIndexer { .model(embedder.model().name()) .input(EmbeddingInput::StringArray(contents)) .encoding_format(EncodingFormat::Float) - .dimensions(EmbeddingModel::TextEmbedding3Small.default_dimensions()) + .dimensions(EmbeddingModel::default_model().default_dimensions()) .build() .map_err(|err| AppError::Unhandled(err.to_string()))?; diff --git a/libs/indexer/src/metrics.rs b/libs/indexer/src/metrics.rs index d75b7602..c3b2906a 100644 --- a/libs/indexer/src/metrics.rs +++ b/libs/indexer/src/metrics.rs @@ -73,7 +73,7 @@ impl EmbeddingMetrics { } pub fn record_gen_embedding_time(&self, num: u32, millis: u128) { - tracing::info!("[Embedding]: index {} collabs cost: {}ms", num, millis); + tracing::trace!("[Embedding]: index {} collabs cost: {}ms", num, millis); self.gen_embeddings_time_histogram.observe(millis as f64); } } diff --git a/libs/indexer/src/scheduler.rs b/libs/indexer/src/scheduler.rs index 969a1f9f..f39386e5 100644 --- a/libs/indexer/src/scheduler.rs +++ b/libs/indexer/src/scheduler.rs @@ -15,6 +15,7 @@ use database::index::{ get_collab_embedding_fragment_ids, update_collab_indexed_at, upsert_collab_embeddings, }; use database::workspace::select_workspace_settings; +use database_entity::dto::AFCollabEmbeddedChunk; use infra::env_util::get_env_var; use redis::aio::ConnectionManager; use serde::{Deserialize, Serialize}; @@ -355,8 +356,9 @@ async fn generate_embeddings_loop( } } } + join_set.spawn(async move { - if chunks.is_empty() { + if is_collab_embedded_chunks_empty(&chunks) { return Ok(None); } @@ -398,7 +400,9 @@ async fn generate_embeddings_loop( error!("Failed to send embedding record: {}", err); } }, - Ok(None) => debug!("No embedding for collab"), + Ok(None) => trace!( + "[Embedding] Did found existing embeddings. Skip generate embedding for collab" + ), Err(err) => { metrics.record_failed_embed_count(1); warn!( @@ -429,7 +433,7 @@ pub async fn spawn_pg_write_embeddings( break; } - trace!("[Embedding] received {} embeddings to write", n); + trace!("[Embedding] pg received {} embeddings to write", n); let start = Instant::now(); let records = buf.drain(..n).collect::>(); for record in records.iter() { @@ -541,3 +545,9 @@ impl UnindexedData { } } } + +#[inline] +/// All chunks are empty if all of them have no content +pub fn is_collab_embedded_chunks_empty(chunks: &[AFCollabEmbeddedChunk]) -> bool { + chunks.iter().all(|chunk| chunk.content.is_none()) +} diff --git a/libs/indexer/src/vector/embedder.rs b/libs/indexer/src/vector/embedder.rs index cd4d8f0c..95c21f33 100644 --- a/libs/indexer/src/vector/embedder.rs +++ b/libs/indexer/src/vector/embedder.rs @@ -8,6 +8,7 @@ pub use async_openai::types::{ EncodingFormat, }; use infra::env_util::get_env_var_opt; +use tracing::{info, warn}; #[derive(Debug, Clone)] pub enum AFEmbedder { @@ -27,15 +28,31 @@ impl AFEmbedder { } pub fn model(&self) -> EmbeddingModel { - EmbeddingModel::TextEmbedding3Small + EmbeddingModel::default_model() } } -pub fn open_ai_config() -> Option { +pub fn get_open_ai_config() -> (Option, Option) { + let open_ai_config = open_ai_config(); + let azure_ai_config = azure_open_ai_config(); + + if open_ai_config.is_some() { + info!("Using official OpenAI API"); + if azure_ai_config.is_some() { + warn!("Both OpenAI and Azure OpenAI API keys are set. Using OpenAI API."); + } + return (open_ai_config, None); + } + + info!("Using Azure OpenAI API"); + (None, azure_ai_config) +} + +fn open_ai_config() -> Option { get_env_var_opt("AI_OPENAI_API_KEY").map(|v| OpenAIConfig::default().with_api_key(v)) } -pub fn azure_open_ai_config() -> Option { +fn azure_open_ai_config() -> Option { let azure_open_ai_api_key = get_env_var_opt("AI_AZURE_OPENAI_API_KEY")?; let azure_open_ai_api_base = get_env_var_opt("AI_AZURE_OPENAI_API_BASE")?; let azure_open_ai_api_version = get_env_var_opt("AI_AZURE_OPENAI_API_VERSION")?; diff --git a/libs/indexer/src/vector/open_ai.rs b/libs/indexer/src/vector/open_ai.rs index 4aefda8e..0aff012b 100644 --- a/libs/indexer/src/vector/open_ai.rs +++ b/libs/indexer/src/vector/open_ai.rs @@ -1,8 +1,10 @@ use app_error::AppError; +use appflowy_ai_client::dto::EmbeddingModel; use async_openai::config::{AzureConfig, Config, OpenAIConfig}; use async_openai::types::{CreateEmbeddingRequest, CreateEmbeddingResponse}; use async_openai::Client; use tiktoken_rs::CoreBPE; +use tracing::trace; pub const OPENAI_EMBEDDINGS_URL: &str = "https://api.openai.com/v1/embeddings"; @@ -27,7 +29,9 @@ pub struct AzureOpenAIEmbedder { } impl AzureOpenAIEmbedder { - pub fn new(config: AzureConfig) -> Self { + pub fn new(mut config: AzureConfig) -> Self { + // Make sure your Azure AI service support the model + config = config.with_deployment_id(EmbeddingModel::default_model().to_string()); let client = Client::with_config(config); Self { client } } @@ -37,6 +41,12 @@ pub async fn async_embed( client: &Client, request: CreateEmbeddingRequest, ) -> Result { + trace!( + "async embed with request: model:{:?}, dimension:{:?}, api_base:{}", + request.model, + request.dimensions, + client.config().api_base() + ); let response = client .embeddings() .create(request) diff --git a/services/appflowy-worker/src/application.rs b/services/appflowy-worker/src/application.rs index 88bb1fad..68ba1ac5 100644 --- a/services/appflowy-worker/src/application.rs +++ b/services/appflowy-worker/src/application.rs @@ -21,7 +21,7 @@ use axum::response::IntoResponse; use axum::routing::get; use indexer::metrics::EmbeddingMetrics; use indexer::thread_pool::ThreadPoolNoAbortBuilder; -use indexer::vector::embedder::{azure_open_ai_config, open_ai_config}; +use indexer::vector::embedder::get_open_ai_config; use infra::env_util::get_env_var; use mailer::sender::Mailer; use secrecy::ExposeSecret; @@ -132,9 +132,7 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err .unwrap(), ); - let open_ai_config = open_ai_config(); - let azure_ai_config = azure_open_ai_config(); - + let (open_ai_config, azure_ai_config) = get_open_ai_config(); let indexer_config = BackgroundIndexerConfig { enable: appflowy_collaborate::config::get_env_var("APPFLOWY_INDEXER_ENABLED", "true") .parse::() diff --git a/services/appflowy-worker/src/indexer_worker/worker.rs b/services/appflowy-worker/src/indexer_worker/worker.rs index 66a0a304..119d8e0f 100644 --- a/services/appflowy-worker/src/indexer_worker/worker.rs +++ b/services/appflowy-worker/src/indexer_worker/worker.rs @@ -8,7 +8,9 @@ use indexer::queue::{ ack_task, default_indexer_group_option, ensure_indexer_consumer_group, read_background_embed_tasks, }; -use indexer::scheduler::{spawn_pg_write_embeddings, UnindexedCollabTask, UnindexedData}; +use indexer::scheduler::{ + is_collab_embedded_chunks_empty, spawn_pg_write_embeddings, UnindexedCollabTask, UnindexedData, +}; use indexer::thread_pool::ThreadPoolNoAbort; use indexer::vector::embedder::{AFEmbedder, AzureConfig, OpenAIConfig}; use indexer::vector::open_ai; @@ -195,14 +197,18 @@ async fn process_upcoming_tasks( } } join_set.spawn(async move { - let embeddings = indexer.embed(&embedder, chunks).await.ok()?; - embeddings.map(|embeddings| EmbeddingRecord { + if is_collab_embedded_chunks_empty(&chunks) { + return Ok::<_, AppError>(None); + } + + let embeddings = indexer.embed(&embedder, chunks).await?; + Ok(embeddings.map(|embeddings| EmbeddingRecord { workspace_id: task.workspace_id, object_id: task.object_id, collab_type: task.collab_type, tokens_used: embeddings.tokens_consumed, contents: embeddings.params, - }) + })) }); } } @@ -210,8 +216,11 @@ async fn process_upcoming_tasks( while let Some(Ok(result)) = join_set.join_next().await { match result { - None => metrics.record_failed_embed_count(1), - Some(record) => { + Err(_) => { + metrics.record_failed_embed_count(1); + }, + Ok(None) => {}, + Ok(Some(record)) => { metrics.record_embed_count(1); trace!( "[Background Embedding] send {} embedding record to write task", diff --git a/src/application.rs b/src/application.rs index 6100e427..b768698d 100644 --- a/src/application.rs +++ b/src/application.rs @@ -45,7 +45,7 @@ use collab_stream::stream_router::{StreamRouter, StreamRouterOptions}; use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; use indexer::collab_indexer::IndexerProvider; use indexer::scheduler::{IndexerConfiguration, IndexerScheduler}; -use indexer::vector::embedder::{azure_open_ai_config, open_ai_config}; +use indexer::vector::embedder::get_open_ai_config; use infra::env_util::get_env_var; use mailer::sender::Mailer; use snowflake::Snowflake; @@ -299,8 +299,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result() diff --git a/src/biz/search/ops.rs b/src/biz/search/ops.rs index e52e0bd2..151c2733 100644 --- a/src/biz/search/ops.rs +++ b/src/biz/search/ops.rs @@ -80,10 +80,10 @@ pub async fn search_document( metrics: &RequestMetrics, ) -> Result, AppError> { let embeddings_request = CreateEmbeddingRequestArgs::default() - .model(EmbeddingModel::TextEmbedding3Small.to_string()) + .model(EmbeddingModel::default_model().to_string()) .input(EmbeddingInput::String(request.query.clone())) .encoding_format(EncodingFormat::Float) - .dimensions(EmbeddingModel::TextEmbedding3Small.default_dimensions()) + .dimensions(EmbeddingModel::default_model().default_dimensions()) .build() .map_err(|err| AppError::Unhandled(err.to_string()))?; diff --git a/tests/ai_test/local_ai_test.rs b/tests/ai_test/local_ai_test.rs deleted file mode 100644 index 06ff8983..00000000 --- a/tests/ai_test/local_ai_test.rs +++ /dev/null @@ -1,26 +0,0 @@ -use client_api_test::{local_ai_test_enabled, TestClient}; - -#[tokio::test] -async fn get_local_ai_config_test() { - if !local_ai_test_enabled() { - return; - } - let test_client = TestClient::new_user().await; - let workspace_id = test_client.workspace_id().await; - let config = test_client - .api_client - .get_local_ai_config(&workspace_id, "macos") - .await - .unwrap(); - { - assert!(!config.models.is_empty()); - assert!(!config.models[0].embedding_model.download_url.is_empty()); - assert!(config.models[0].embedding_model.file_size > 10); - assert!(!config.models[0].chat_model.download_url.is_empty()); - assert!(config.models[0].chat_model.file_size > 10); - - assert!(!config.plugin.version.is_empty()); - assert!(!config.plugin.url.is_empty()); - println!("config: {:?}", config); - } -}