Merge branch 'main' into feat/update-collab

This commit is contained in:
Zack Fu Zi Xiang 2024-12-19 23:09:59 +08:00
commit 1f83e48362
No known key found for this signature in database
5 changed files with 47 additions and 63 deletions

View file

@ -2,7 +2,7 @@ use crate::config::get_env_var;
use crate::indexer::metrics::EmbeddingMetrics;
use crate::indexer::vector::embedder::Embedder;
use crate::indexer::vector::open_ai;
use crate::indexer::IndexerProvider;
use crate::indexer::{Indexer, IndexerProvider};
use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use actix::dev::Stream;
use anyhow::anyhow;
@ -129,6 +129,10 @@ impl IndexerScheduler {
true
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_provider.is_indexing_enabled(collab_type)
}
fn create_embedder(&self) -> Result<Embedder, AppError> {
if self.config.openai_api_key.is_empty() {
return Err(AppError::AIServiceUnavailable(
@ -159,10 +163,16 @@ impl IndexerScheduler {
return Ok(());
}
let embedder = self.create_embedder()?;
let indexed_collab = indexed_collab.into();
let indexer = self
.indexer_provider
.indexer_for(&indexed_collab.collab_type);
if indexer.is_none() {
return Ok(());
}
let embedder = self.create_embedder()?;
let workspace_id = Uuid::parse_str(workspace_id)?;
let indexer_provider = self.indexer_provider.clone();
let tx = self.schedule_tx.clone();
let metrics = self.metrics.clone();
@ -178,7 +188,7 @@ impl IndexerScheduler {
return;
}
match process_collab(&embedder, &indexer_provider, &indexed_collab, &metrics) {
match process_collab(&embedder, indexer, &indexed_collab, &metrics) {
Ok(Some((tokens_used, contents))) => {
if let Err(err) = tx.send(EmbeddingRecord {
workspace_id,
@ -209,12 +219,18 @@ impl IndexerScheduler {
pub fn index_encoded_collabs(
&self,
workspace_id: &str,
indexed_collabs: Vec<IndexedCollab>,
mut indexed_collabs: Vec<IndexedCollab>,
) -> Result<(), AppError> {
if !self.index_enabled() {
return Ok(());
}
indexed_collabs.retain(|collab| self.is_indexing_enabled(&collab.collab_type));
if indexed_collabs.is_empty() {
return Ok(());
}
info!("indexing {} collabs", indexed_collabs.len());
let embedder = self.create_embedder()?;
let workspace_id = Uuid::parse_str(workspace_id)?;
let indexer_provider = self.indexer_provider.clone();
@ -226,6 +242,7 @@ impl IndexerScheduler {
let embeddings_list = indexed_collabs
.into_par_iter()
.filter_map(|collab| {
let indexer = indexer_provider.indexer_for(&collab.collab_type)?;
let task = ActiveTask::new(collab.object_id.clone());
let task_created_at = task.created_at;
active_task.insert(collab.object_id.clone(), task);
@ -234,7 +251,7 @@ impl IndexerScheduler {
if !should_embed(&active_task, &collab.object_id, task_created_at) {
return None;
}
process_collab(&embedder, &indexer_provider, &collab, &metrics).ok()
process_collab(&embedder, Some(indexer), &collab, &metrics).ok()
})
.ok()
})
@ -272,6 +289,10 @@ impl IndexerScheduler {
return Ok(());
}
if !self.is_indexing_enabled(collab_type) {
return Ok(());
}
let indexer = self
.indexer_provider
.indexer_for(collab_type)
@ -571,11 +592,11 @@ async fn batch_insert_records(
/// This function must be called within the rayon thread pool.
fn process_collab(
embdder: &Embedder,
indexer_provider: &IndexerProvider,
indexer: Option<Arc<dyn Indexer>>,
indexed_collab: &IndexedCollab,
metrics: &EmbeddingMetrics,
) -> Result<Option<(u32, Vec<AFCollabEmbeddedChunk>)>, AppError> {
if let Some(indexer) = indexer_provider.indexer_for(&indexed_collab.collab_type) {
if let Some(indexer) = indexer {
metrics.record_embed_count(1);
let encode_collab = EncodedCollab::decode_from_bytes(&indexed_collab.encoded_collab)?;
let collab = Collab::new_with_source(

View file

@ -52,4 +52,8 @@ impl IndexerProvider {
pub fn indexer_for(&self, collab_type: &CollabType) -> Option<Arc<dyn Indexer>> {
self.indexer_cache.get(collab_type).cloned()
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_cache.contains_key(collab_type)
}
}

View file

@ -829,10 +829,17 @@ async fn batch_create_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
state.indexer_scheduler.index_encoded_collabs(
&workspace_id,
collab_params_list.iter().map(IndexedCollab::from).collect(),
)?;
let indexed_collabs: Vec<_> = collab_params_list
.iter()
.filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type))
.map(IndexedCollab::from)
.collect();
if !indexed_collabs.is_empty() {
state
.indexer_scheduler
.index_encoded_collabs(&workspace_id, indexed_collabs)?;
}
}
let start = Instant::now();

View file

@ -294,54 +294,6 @@ async fn generate_chat_message_answer_test() {
assert!(!answer.is_empty());
}
#[tokio::test]
async fn create_chat_context_test() {
if !ai_test_enabled() {
return;
}
let test_client = TestClient::new_user_without_ws_conn().await;
let workspace_id = test_client.workspace_id().await;
let chat_id = uuid::Uuid::new_v4().to_string();
let params = CreateChatParams {
chat_id: chat_id.clone(),
name: "context chat".to_string(),
rag_ids: vec![],
};
test_client
.api_client
.create_chat(&workspace_id, params)
.await
.unwrap();
let content = "Lacus have lived in the US for five years".to_string();
let metadata = ChatMessageMetadata {
data: ChatRAGData::from_text(content),
id: chat_id.clone(),
name: "".to_string(),
source: "appflowy".to_string(),
extra: None,
};
let params = CreateChatMessageParams::new_user("Where Lacus live?").with_metadata(metadata);
let question = test_client
.api_client
.create_question(&workspace_id, &chat_id, params)
.await
.unwrap();
let answer = test_client
.api_client
.get_answer(&workspace_id, &chat_id, question.message_id)
.await
.unwrap();
println!("answer: {:?}", answer);
if answer.content.contains("United States") {
return;
}
assert!(answer.content.contains("US"));
}
// #[tokio::test]
// async fn update_chat_message_test() {
// if !ai_test_enabled() {

View file

@ -106,10 +106,10 @@ async fn chat_with_multiple_selected_source_test() {
&test_client,
&workspace_id,
&chat_id,
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know",
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know the date for the trip to Japan",
)
.await;
let expected_unknown_japan_answer = r#"I dont know"#;
let expected_unknown_japan_answer = r#"I dont know the date for your trip to Japan"#;
test_client
.assert_similarity(&workspace_id, &answer, expected_unknown_japan_answer, 0.7)
.await;
@ -168,7 +168,7 @@ async fn chat_with_multiple_selected_source_test() {
&test_client,
&workspace_id,
&chat_id,
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know",
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know the date for the trip to Japan",
)
.await;
test_client