Merge pull request #7777 from AppFlowy-IO/prepare_auto_sync_chat

refactor: save chat and chat message
This commit is contained in:
Nathan.fooo 2025-04-18 15:31:23 +08:00 committed by GitHub
commit f6e3290aa4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
35 changed files with 613 additions and 409 deletions

View file

@ -435,7 +435,7 @@ class ChatBloc extends Bloc<ChatEvent, ChatState> {
messageType: ChatMessageTypePB.User,
questionStreamPort: Int64(questionStream.nativePort),
answerStreamPort: Int64(answerStream!.nativePort),
metadata: await metadataPBFromMetadata(metadata),
//metadata: await metadataPBFromMetadata(metadata),
);
if (format != null) {
payload.format = format.toPB();

View file

@ -493,7 +493,7 @@ checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"anyhow",
"bincode",
@ -513,7 +513,7 @@ dependencies = [
[[package]]
name = "appflowy-ai-client"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"anyhow",
"bytes",
@ -1159,7 +1159,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"again",
"anyhow",
@ -1214,7 +1214,7 @@ dependencies = [
[[package]]
name = "client-api-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"collab-entity",
"collab-rt-entity",
@ -1227,7 +1227,7 @@ dependencies = [
[[package]]
name = "client-websocket"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"futures-channel",
"futures-util",
@ -1499,7 +1499,7 @@ dependencies = [
[[package]]
name = "collab-rt-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"anyhow",
"bincode",
@ -1521,7 +1521,7 @@ dependencies = [
[[package]]
name = "collab-rt-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"anyhow",
"async-trait",
@ -1969,7 +1969,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"bincode",
"bytes",
@ -3427,7 +3427,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"anyhow",
"getrandom 0.2.10",
@ -3442,7 +3442,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"app-error",
"jsonwebtoken",
@ -4066,7 +4066,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"anyhow",
"bytes",
@ -6658,7 +6658,7 @@ dependencies = [
[[package]]
name = "shared-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72a71205ebb3ec227b44ed48473abe4f1c7663e8#72a71205ebb3ec227b44ed48473abe4f1c7663e8"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6c6f1c5cc3ce2161c247f63f6132361da8dc0a32#6c6f1c5cc3ce2161c247f63f6132361da8dc0a32"
dependencies = [
"anyhow",
"app-error",

View file

@ -107,8 +107,8 @@ af-local-ai = { version = "0.1" }
# Run the script.add_workspace_members:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "72a71205ebb3ec227b44ed48473abe4f1c7663e8" }
client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "72a71205ebb3ec227b44ed48473abe4f1c7663e8" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6c6f1c5cc3ce2161c247f63f6132361da8dc0a32" }
client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6c6f1c5cc3ce2161c247f63f6132361da8dc0a32" }
[profile.dev]
opt-level = 0

View file

@ -27,7 +27,6 @@ async fn af_cloud_create_chat_message_test() {
&Uuid::from_str(&chat_id).unwrap(),
&format!("hello world {}", i),
ChatMessageType::System,
&[],
)
.await
.unwrap();
@ -83,7 +82,6 @@ async fn af_cloud_load_remote_system_message_test() {
&Uuid::from_str(&chat_id).unwrap(),
&format!("hello server {}", i),
ChatMessageType::System,
&[],
)
.await
.unwrap();
@ -93,10 +91,8 @@ async fn af_cloud_load_remote_system_message_test() {
.notification_sender
.subscribe::<ChatMessageListPB>(&chat_id, ChatNotification::DidLoadLatestChatMessage);
// Previous messages were created by the server, so there are no messages in the local cache.
// It will try to load messages in the background.
let all = test.load_next_message(&chat_id, 5, None).await;
assert!(all.messages.is_empty());
assert_eq!(all.messages.len(), 5);
// Wait for the messages to be loaded.
let next_back_five = receive_with_timeout(rx, Duration::from_secs(60))
@ -121,7 +117,6 @@ async fn af_cloud_load_remote_system_message_test() {
let first_five_messages = receive_with_timeout(rx, Duration::from_secs(60))
.await
.unwrap();
assert!(!first_five_messages.has_more);
assert_eq!(first_five_messages.messages[0].content, "hello server 4");
assert_eq!(first_five_messages.messages[1].content, "hello server 3");
assert_eq!(first_five_messages.messages[2].content, "hello server 2");

View file

@ -2,8 +2,8 @@ use event_integration_test::user_event::use_localhost_af_cloud;
use event_integration_test::EventIntegrationTest;
use flowy_ai_pub::cloud::MessageCursor;
use flowy_ai_pub::persistence::{
insert_chat_messages, select_chat_messages, select_message, select_message_content,
select_message_where_match_reply_message_id, total_message_count, ChatMessageTable,
select_answer_where_match_reply_message_id, select_chat_messages, select_message,
select_message_content, total_message_count, upsert_chat_messages, ChatMessageTable,
};
use uuid::Uuid;
@ -31,6 +31,7 @@ async fn chat_message_table_insert_select_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
},
ChatMessageTable {
message_id: message_id_2,
@ -41,11 +42,12 @@ async fn chat_message_table_insert_select_test() {
author_id: "ai".to_string(),
reply_message_id: Some(message_id_1),
metadata: Some(r#"{"source": "test"}"#.to_string()),
is_sync: false,
},
];
// Test insert_chat_messages
let result = insert_chat_messages(db_conn, &messages);
let result = upsert_chat_messages(db_conn, &messages);
assert!(
result.is_ok(),
"Failed to insert chat messages: {:?}",
@ -105,11 +107,12 @@ async fn chat_message_table_cursor_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
});
}
// Insert messages
insert_chat_messages(db_conn, &messages).unwrap();
upsert_chat_messages(db_conn, &messages).unwrap();
// Test MessageCursor::Offset
let db_conn = test.user_manager.db_connection(uid).unwrap();
@ -173,6 +176,7 @@ async fn chat_message_total_count_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
},
ChatMessageTable {
message_id: 1002,
@ -183,11 +187,12 @@ async fn chat_message_total_count_test() {
author_id: "ai".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
},
];
// Insert messages
insert_chat_messages(db_conn, &messages).unwrap();
upsert_chat_messages(db_conn, &messages).unwrap();
// Test total_message_count
let db_conn = test.user_manager.db_connection(uid).unwrap();
@ -205,9 +210,10 @@ async fn chat_message_total_count_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
};
insert_chat_messages(db_conn, &[additional_message]).unwrap();
upsert_chat_messages(db_conn, &[additional_message]).unwrap();
// Verify count increased
let db_conn = test.user_manager.db_connection(uid).unwrap();
@ -242,10 +248,11 @@ async fn chat_message_select_message_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: Some(r#"{"test_key": "test_value"}"#.to_string()),
is_sync: false,
};
// Insert message
insert_chat_messages(db_conn, &[message]).unwrap();
upsert_chat_messages(db_conn, &[message]).unwrap();
// Test select_message
let db_conn = test.user_manager.db_connection(uid).unwrap();
@ -294,10 +301,11 @@ async fn chat_message_select_content_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
};
// Insert message
insert_chat_messages(db_conn, &[message]).unwrap();
upsert_chat_messages(db_conn, &[message]).unwrap();
// Test select_message_content
let db_conn = test.user_manager.db_connection(uid).unwrap();
@ -334,6 +342,7 @@ async fn chat_message_reply_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
};
let answer = ChatMessageTable {
@ -345,14 +354,15 @@ async fn chat_message_reply_test() {
author_id: "ai".to_string(),
reply_message_id: Some(question_id), // Link to question
metadata: None,
is_sync: false,
};
// Insert messages
insert_chat_messages(db_conn, &[question, answer]).unwrap();
upsert_chat_messages(db_conn, &[question, answer]).unwrap();
// Test select_message_where_match_reply_message_id
let db_conn = test.user_manager.db_connection(uid).unwrap();
let result = select_message_where_match_reply_message_id(db_conn, &chat_id, question_id).unwrap();
let result = select_answer_where_match_reply_message_id(db_conn, &chat_id, question_id).unwrap();
assert!(result.is_some());
let reply = result.unwrap();
@ -362,7 +372,7 @@ async fn chat_message_reply_test() {
// Test with non-existent reply relation
let db_conn = test.user_manager.db_connection(uid).unwrap();
let no_reply = select_message_where_match_reply_message_id(
let no_reply = select_answer_where_match_reply_message_id(
db_conn, &chat_id, 9999, // Non-existent question ID
)
.unwrap();
@ -372,7 +382,7 @@ async fn chat_message_reply_test() {
// Test with wrong chat_id
let db_conn = test.user_manager.db_connection(uid).unwrap();
let wrong_chat =
select_message_where_match_reply_message_id(db_conn, "wrong_chat_id", question_id).unwrap();
select_answer_where_match_reply_message_id(db_conn, "wrong_chat_id", question_id).unwrap();
assert!(wrong_chat.is_none());
}
@ -399,10 +409,11 @@ async fn chat_message_upsert_test() {
author_id: "user_1".to_string(),
reply_message_id: None,
metadata: None,
is_sync: false,
};
// Insert message
insert_chat_messages(db_conn, &[message]).unwrap();
upsert_chat_messages(db_conn, &[message]).unwrap();
// Check original content
let db_conn = test.user_manager.db_connection(uid).unwrap();
@ -420,10 +431,11 @@ async fn chat_message_upsert_test() {
author_id: "user_1".to_string(),
reply_message_id: Some(1000), // Added reply ID
metadata: Some(r#"{"updated": true}"#.to_string()),
is_sync: false,
};
// Upsert message
insert_chat_messages(db_conn, &[updated_message]).unwrap();
upsert_chat_messages(db_conn, &[updated_message]).unwrap();
// Verify update
let db_conn = test.user_manager.db_connection(uid).unwrap();
@ -474,11 +486,12 @@ async fn chat_message_select_with_large_dataset() {
} else {
None
},
is_sync: false,
});
}
// Insert all 100 messages
insert_chat_messages(db_conn, &messages).unwrap();
upsert_chat_messages(db_conn, &messages).unwrap();
// Verify total count
let db_conn = test.user_manager.db_connection(uid).unwrap();

View file

@ -6,8 +6,8 @@ pub use client_api::entity::ai_dto::{
};
pub use client_api::entity::billing_dto::SubscriptionPlan;
pub use client_api::entity::chat_dto::{
ChatMessage, ChatMessageMetadata, ChatMessageType, ChatRAGData, ChatSettings, ContextLoader,
MessageCursor, RepeatedChatMessage, UpdateChatParams,
ChatMessage, ChatMessageType, ChatRAGData, ChatSettings, ContextLoader, MessageCursor,
RepeatedChatMessage, UpdateChatParams,
};
pub use client_api::entity::QuestionStreamValue;
pub use client_api::entity::*;
@ -95,7 +95,6 @@ pub trait ChatCloudService: Send + Sync + 'static {
chat_id: &Uuid,
message: &str,
message_type: ChatMessageType,
metadata: &[ChatMessageMetadata],
) -> Result<ChatMessage, FlowyError>;
async fn create_answer(
@ -111,7 +110,7 @@ pub trait ChatCloudService: Send + Sync + 'static {
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
message_id: i64,
question_id: i64,
format: ResponseFormat,
ai_model: Option<AIModel>,
) -> Result<StreamAnswer, FlowyError>;
@ -120,7 +119,7 @@ pub trait ChatCloudService: Send + Sync + 'static {
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
question_message_id: i64,
question_id: i64,
) -> Result<ChatMessage, FlowyError>;
async fn get_chat_messages(

View file

@ -1,4 +1,5 @@
use crate::cloud::MessageCursor;
use client_api::entity::chat_dto::ChatMessage;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::upsert::excluded;
use flowy_sqlite::{
@ -21,9 +22,40 @@ pub struct ChatMessageTable {
pub author_id: String,
pub reply_message_id: Option<i64>,
pub metadata: Option<String>,
pub is_sync: bool,
}
impl ChatMessageTable {
pub fn from_message(chat_id: String, message: ChatMessage, is_sync: bool) -> Self {
ChatMessageTable {
message_id: message.message_id,
chat_id,
content: message.content,
created_at: message.created_at.timestamp(),
author_type: message.author.author_type as i64,
author_id: message.author.author_id.to_string(),
reply_message_id: message.reply_message_id,
metadata: Some(serde_json::to_string(&message.metadata).unwrap_or_default()),
is_sync,
}
}
}
pub fn insert_chat_messages(
pub fn update_chat_message_is_sync(
mut conn: DBConnection,
chat_id_val: &str,
message_id_val: i64,
is_sync_val: bool,
) -> FlowyResult<()> {
diesel::update(chat_message_table::table)
.filter(chat_message_table::chat_id.eq(chat_id_val))
.filter(chat_message_table::message_id.eq(message_id_val))
.set(chat_message_table::is_sync.eq(is_sync_val))
.execute(&mut *conn)?;
Ok(())
}
pub fn upsert_chat_messages(
mut conn: DBConnection,
new_messages: &[ChatMessageTable],
) -> FlowyResult<()> {
@ -143,7 +175,7 @@ pub fn select_message_content(
Ok(message)
}
pub fn select_message_where_match_reply_message_id(
pub fn select_answer_where_match_reply_message_id(
mut conn: DBConnection,
chat_id: &str,
answer_message_id_val: i64,

View file

@ -7,7 +7,10 @@ use flowy_sqlite::{
schema::{chat_table, chat_table::dsl},
AsChangeset, DBConnection, ExpressionMethods, Identifiable, Insertable, QueryResult, Queryable,
};
use lib_infra::util::timestamp;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
#[derive(Clone, Default, Queryable, Insertable, Identifiable)]
#[diesel(table_name = chat_table)]
@ -18,6 +21,23 @@ pub struct ChatTable {
pub name: String,
pub metadata: String,
pub rag_ids: Option<String>,
pub is_sync: bool,
}
impl ChatTable {
pub fn new(chat_id: String, metadata: Value, rag_ids: Vec<Uuid>, is_sync: bool) -> Self {
let rag_ids = rag_ids.iter().map(|v| v.to_string()).collect::<Vec<_>>();
let metadata = serialize_chat_metadata(&metadata);
let rag_ids = Some(serialize_rag_ids(&rag_ids));
Self {
chat_id,
created_at: timestamp(),
name: "".to_string(),
metadata,
rag_ids,
is_sync,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
@ -49,27 +69,7 @@ pub struct ChatTableChangeset {
pub name: Option<String>,
pub metadata: Option<String>,
pub rag_ids: Option<String>,
}
impl ChatTableChangeset {
pub fn from_metadata(metadata: ChatTableMetadata) -> Self {
ChatTableChangeset {
chat_id: Default::default(),
metadata: serde_json::to_string(&metadata).ok(),
name: None,
rag_ids: None,
}
}
pub fn from_rag_ids(rag_ids: Vec<String>) -> Self {
ChatTableChangeset {
chat_id: Default::default(),
// Serialize the Vec<String> to a JSON array string
rag_ids: Some(serde_json::to_string(&rag_ids).unwrap_or_default()),
name: None,
metadata: None,
}
}
pub is_sync: Option<bool>,
}
pub fn serialize_rag_ids(rag_ids: &[String]) -> String {
@ -107,6 +107,7 @@ pub fn upsert_chat(mut conn: DBConnection, new_chat: &ChatTable) -> QueryResult<
chat_table::name.eq(excluded(chat_table::name)),
chat_table::metadata.eq(excluded(chat_table::metadata)),
chat_table::rag_ids.eq(excluded(chat_table::rag_ids)),
chat_table::is_sync.eq(excluded(chat_table::is_sync)),
))
.execute(&mut *conn)
}
@ -120,6 +121,16 @@ pub fn update_chat(
Ok(affected_row)
}
pub fn update_chat_is_sync(
mut conn: DBConnection,
chat_id_val: &str,
is_sync_val: bool,
) -> QueryResult<usize> {
diesel::update(dsl::chat_table.filter(chat_table::chat_id.eq(chat_id_val)))
.set(chat_table::is_sync.eq(is_sync_val))
.execute(&mut *conn)
}
pub fn read_chat(mut conn: DBConnection, chat_id_val: &str) -> QueryResult<ChatTable> {
let row = dsl::chat_table
.filter(chat_table::chat_id.eq(chat_id_val))

View file

@ -4,10 +4,8 @@ use crate::entities::{
FilePB, PredefinedFormatPB, RepeatedRelatedQuestionPB, StreamMessageParams,
};
use crate::local_ai::controller::{LocalAIController, LocalAISetting};
use crate::middleware::chat_service_mw::AICloudServiceMiddleware;
use flowy_ai_pub::persistence::{
read_chat_metadata, serialize_chat_metadata, serialize_rag_ids, upsert_chat, ChatTable,
};
use crate::middleware::chat_service_mw::ChatServiceMiddleware;
use flowy_ai_pub::persistence::read_chat_metadata;
use std::collections::HashMap;
use dashmap::DashMap;
@ -72,7 +70,7 @@ struct ServerModelsCache {
pub const GLOBAL_ACTIVE_MODEL_KEY: &str = "global_active_model";
pub struct AIManager {
pub cloud_service_wm: Arc<AICloudServiceMiddleware>,
pub cloud_service_wm: Arc<ChatServiceMiddleware>,
pub user_service: Arc<dyn AIUserService>,
pub external_service: Arc<dyn AIExternalService>,
chats: Arc<DashMap<Uuid, Arc<Chat>>>,
@ -97,7 +95,7 @@ impl AIManager {
});
let external_service = Arc::new(query_service);
let cloud_service_wm = Arc::new(AICloudServiceMiddleware::new(
let cloud_service_wm = Arc::new(ChatServiceMiddleware::new(
user_service.clone(),
chat_cloud_service,
local_ai.clone(),
@ -226,13 +224,6 @@ impl AIManager {
.unwrap_or_default();
info!("[Chat] create chat with rag_ids: {:?}", rag_ids);
save_chat(
self.user_service.sqlite_connection(*uid)?,
chat_id,
"",
rag_ids.iter().map(|v| v.to_string()).collect(),
json!({}),
)?;
self
.cloud_service_wm
.create_chat(uid, &workspace_id, chat_id, rag_ids, "", json!({}))
@ -730,28 +721,9 @@ async fn sync_chat_documents(
Ok(())
}
fn save_chat(
conn: DBConnection,
chat_id: &Uuid,
name: &str,
rag_ids: Vec<String>,
metadata: serde_json::Value,
) -> FlowyResult<()> {
let row = ChatTable {
chat_id: chat_id.to_string(),
created_at: timestamp(),
name: name.to_string(),
metadata: serialize_chat_metadata(&metadata),
rag_ids: Some(serialize_rag_ids(&rag_ids)),
};
upsert_chat(conn, &row)?;
Ok(())
}
async fn refresh_chat_setting(
user_service: &Arc<dyn AIUserService>,
cloud_service: &Arc<AICloudServiceMiddleware>,
cloud_service: &Arc<ChatServiceMiddleware>,
store_preferences: &Arc<KVStorePreferences>,
chat_id: &Uuid,
) -> FlowyResult<ChatSettings> {

View file

@ -3,7 +3,7 @@ use crate::entities::{
ChatMessageErrorPB, ChatMessageListPB, ChatMessagePB, PredefinedFormatPB,
RepeatedRelatedQuestionPB, StreamMessageParams,
};
use crate::middleware::chat_service_mw::AICloudServiceMiddleware;
use crate::middleware::chat_service_mw::ChatServiceMiddleware;
use crate::notification::{chat_notification_builder, ChatNotification};
use crate::stream_message::StreamMessage;
use allo_isolate::Isolate;
@ -11,7 +11,7 @@ use flowy_ai_pub::cloud::{
AIModel, ChatCloudService, ChatMessage, MessageCursor, QuestionStreamValue, ResponseFormat,
};
use flowy_ai_pub::persistence::{
insert_chat_messages, select_chat_messages, select_message_where_match_reply_message_id,
select_answer_where_match_reply_message_id, select_chat_messages, upsert_chat_messages,
ChatMessageTable,
};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
@ -35,7 +35,7 @@ pub struct Chat {
chat_id: Uuid,
uid: i64,
user_service: Arc<dyn AIUserService>,
chat_service: Arc<AICloudServiceMiddleware>,
chat_service: Arc<ChatServiceMiddleware>,
prev_message_state: Arc<RwLock<PrevMessageState>>,
latest_message_id: Arc<AtomicI64>,
stop_stream: Arc<AtomicBool>,
@ -47,7 +47,7 @@ impl Chat {
uid: i64,
chat_id: Uuid,
user_service: Arc<dyn AIUserService>,
chat_service: Arc<AICloudServiceMiddleware>,
chat_service: Arc<ChatServiceMiddleware>,
) -> Chat {
Chat {
uid,
@ -76,11 +76,10 @@ impl Chat {
preferred_ai_model: Option<AIModel>,
) -> Result<ChatMessagePB, FlowyError> {
trace!(
"[Chat] stream chat message: chat_id={}, message={}, message_type={:?}, metadata={:?}, format={:?}",
"[Chat] stream chat message: chat_id={}, message={}, message_type={:?}, format={:?}",
self.chat_id,
params.message,
params.message_type,
params.metadata,
params.format,
);
@ -105,7 +104,6 @@ impl Chat {
&self.chat_id,
&params.message,
params.message_type.clone(),
&[],
)
.await
.map_err(|err| {
@ -117,16 +115,8 @@ impl Chat {
.send(StreamMessage::MessageId(question.message_id).to_string())
.await;
if let Err(err) = self
.chat_service
.index_message_metadata(&self.chat_id, &params.metadata, &mut question_sink)
.await
{
error!("Failed to index file: {}", err);
}
// Save message to disk
save_and_notify_message(uid, &self.chat_id, &self.user_service, question.clone())?;
notify_message(&self.chat_id, question.clone())?;
let format = params.format.clone().map(Into::into).unwrap_or_default();
self.stream_response(
params.answer_stream_port,
@ -185,7 +175,7 @@ impl Chat {
&self,
answer_stream_port: i64,
answer_stream_buffer: Arc<Mutex<StringBuffer>>,
uid: i64,
_uid: i64,
workspace_id: Uuid,
question_id: i64,
format: ResponseFormat,
@ -194,7 +184,6 @@ impl Chat {
let stop_stream = self.stop_stream.clone();
let chat_id = self.chat_id;
let cloud_service = self.chat_service.clone();
let user_service = self.user_service.clone();
tokio::spawn(async move {
let mut answer_sink = IsolateSink::new(Isolate::new(answer_stream_port));
match cloud_service
@ -309,7 +298,7 @@ impl Chat {
metadata,
)
.await?;
save_and_notify_message(uid, &chat_id, &user_service, answer)?;
notify_message(&chat_id, answer)?;
Ok::<(), FlowyError>(())
});
}
@ -442,6 +431,7 @@ impl Chat {
user_service.sqlite_connection(uid)?,
&chat_id,
resp.messages.clone(),
true,
) {
error!("Failed to save chat:{} messages: {}", chat_id, err);
}
@ -492,7 +482,7 @@ impl Chat {
let conn = self.user_service.sqlite_connection(self.uid)?;
let local_result =
select_message_where_match_reply_message_id(conn, &chat_id.to_string(), answer_message_id)?
select_answer_where_match_reply_message_id(conn, &chat_id.to_string(), answer_message_id)?
.map(|message| message.message_id);
if let Some(message_id) = local_result {
@ -543,7 +533,7 @@ impl Chat {
.get_answer(&workspace_id, &self.chat_id, question_message_id)
.await?;
save_and_notify_message(self.uid, &self.chat_id, &self.user_service, answer.clone())?;
notify_message(&self.chat_id, answer.clone())?;
let pb = ChatMessagePB::from(answer);
Ok(pb)
}
@ -614,6 +604,7 @@ fn save_chat_message_disk(
conn: DBConnection,
chat_id: &Uuid,
messages: Vec<ChatMessage>,
is_sync: bool,
) -> FlowyResult<()> {
let records = messages
.into_iter()
@ -626,9 +617,10 @@ fn save_chat_message_disk(
author_id: message.author.author_id.to_string(),
reply_message_id: message.reply_message_id,
metadata: Some(serde_json::to_string(&message.metadata).unwrap_or_default()),
is_sync,
})
.collect::<Vec<_>>();
insert_chat_messages(conn, &records)?;
upsert_chat_messages(conn, &records)?;
Ok(())
}
@ -665,18 +657,8 @@ impl StringBuffer {
}
}
pub(crate) fn save_and_notify_message(
uid: i64,
chat_id: &Uuid,
user_service: &Arc<dyn AIUserService>,
message: ChatMessage,
) -> Result<(), FlowyError> {
pub(crate) fn notify_message(chat_id: &Uuid, message: ChatMessage) -> Result<(), FlowyError> {
trace!("[Chat] save answer: answer={:?}", message);
save_chat_message_disk(
user_service.sqlite_connection(uid)?,
chat_id,
vec![message.clone()],
)?;
let pb = ChatMessagePB::from(message);
chat_notification_builder(chat_id, ChatNotification::DidReceiveChatMessage)
.payload(pb)

View file

@ -2,9 +2,8 @@ use crate::local_ai::controller::LocalAISetting;
use crate::local_ai::resource::PendingResource;
use af_plugin::core::plugin::RunningState;
use flowy_ai_pub::cloud::{
AIModel, ChatMessage, ChatMessageMetadata, ChatMessageType, CompletionMessage, LLMModel,
OutputContent, OutputLayout, RelatedQuestion, RepeatedChatMessage, RepeatedRelatedQuestion,
ResponseFormat,
AIModel, ChatMessage, ChatMessageType, CompletionMessage, LLMModel, OutputContent, OutputLayout,
RelatedQuestion, RepeatedChatMessage, RepeatedRelatedQuestion, ResponseFormat,
};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use lib_infra::validator_fn::required_not_empty_str;
@ -71,9 +70,6 @@ pub struct StreamChatPayloadPB {
#[pb(index = 6, one_of)]
pub format: Option<PredefinedFormatPB>,
#[pb(index = 7)]
pub metadata: Vec<ChatMessageMetaPB>,
}
#[derive(Default, Debug)]
@ -84,7 +80,6 @@ pub struct StreamMessageParams {
pub answer_stream_port: i64,
pub question_stream_port: i64,
pub format: Option<PredefinedFormatPB>,
pub metadata: Vec<ChatMessageMetadata>,
}
#[derive(Default, ProtoBuf, Validate, Clone, Debug)]

View file

@ -2,16 +2,13 @@ use crate::ai_manager::{AIManager, GLOBAL_ACTIVE_MODEL_KEY};
use crate::completion::AICompletion;
use crate::entities::*;
use crate::util::ai_available_models_key;
use flowy_ai_pub::cloud::{
AIModel, ChatMessageMetadata, ChatMessageType, ChatRAGData, ContextLoader,
};
use flowy_ai_pub::cloud::{AIModel, ChatMessageType};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult};
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use tracing::trace;
use uuid::Uuid;
use validator::Validate;
@ -37,7 +34,6 @@ pub(crate) async fn stream_chat_message_handler(
answer_stream_port,
question_stream_port,
format,
metadata,
} = data;
let message_type = match message_type {
@ -45,32 +41,6 @@ pub(crate) async fn stream_chat_message_handler(
ChatMessageTypePB::User => ChatMessageType::User,
};
let metadata = metadata
.into_iter()
.map(|metadata| {
let (content_type, content_len) = match metadata.loader_type {
ContextLoaderTypePB::Txt => (ContextLoader::Text, metadata.data.len()),
ContextLoaderTypePB::Markdown => (ContextLoader::Markdown, metadata.data.len()),
ContextLoaderTypePB::PDF => (ContextLoader::PDF, 0),
ContextLoaderTypePB::UnknownLoaderType => (ContextLoader::Unknown, 0),
};
ChatMessageMetadata {
data: ChatRAGData {
content: metadata.data,
content_type,
size: content_len as i64,
},
id: metadata.id,
name: metadata.name.clone(),
source: metadata.source,
extra: None,
}
})
.collect::<Vec<_>>();
trace!("Stream chat message with metadata: {:?}", metadata);
let chat_id = Uuid::from_str(&chat_id)?;
let params = StreamMessageParams {
chat_id,
@ -79,7 +49,6 @@ pub(crate) async fn stream_chat_message_handler(
answer_stream_port,
question_stream_port,
format,
metadata,
};
let ai_manager = upgrade_ai_manager(ai_manager)?;

View file

@ -12,6 +12,7 @@ pub mod local_ai;
mod middleware;
pub mod notification;
pub mod offline;
mod protobuf;
mod stream_message;
mod util;

View file

@ -6,7 +6,6 @@ use crate::notification::{
};
use af_plugin::manager::PluginManager;
use anyhow::Error;
use flowy_ai_pub::cloud::{ChatMessageMetadata, ContextLoader};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use futures::Sink;
@ -21,9 +20,8 @@ use arc_swap::ArcSwapOption;
use futures_util::SinkExt;
use lib_infra::util::get_operating_system;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use tokio::select;
use tokio_stream::StreamExt;
@ -383,58 +381,59 @@ impl LocalAIController {
Ok(enabled)
}
#[instrument(level = "debug", skip_all)]
pub async fn index_message_metadata(
&self,
chat_id: &Uuid,
metadata_list: &[ChatMessageMetadata],
index_process_sink: &mut (impl Sink<String> + Unpin),
) -> FlowyResult<()> {
if !self.is_enabled() {
info!("[AI Plugin] local ai is disabled, skip indexing");
return Ok(());
}
for metadata in metadata_list {
let mut file_metadata = HashMap::new();
file_metadata.insert("id".to_string(), json!(&metadata.id));
file_metadata.insert("name".to_string(), json!(&metadata.name));
file_metadata.insert("source".to_string(), json!(&metadata.source));
let file_path = Path::new(&metadata.data.content);
if !file_path.exists() {
return Err(
FlowyError::record_not_found().with_context(format!("File not found: {:?}", file_path)),
);
}
info!(
"[AI Plugin] embed file: {:?}, with metadata: {:?}",
file_path, file_metadata
);
match &metadata.data.content_type {
ContextLoader::Unknown => {
error!(
"[AI Plugin] unsupported content type: {:?}",
metadata.data.content_type
);
},
ContextLoader::Text | ContextLoader::Markdown | ContextLoader::PDF => {
self
.process_index_file(
chat_id,
file_path.to_path_buf(),
&file_metadata,
index_process_sink,
)
.await?;
},
}
}
Ok(())
}
// #[instrument(level = "debug", skip_all)]
// pub async fn index_message_metadata(
// &self,
// chat_id: &Uuid,
// metadata_list: &[ChatMessageMetadata],
// index_process_sink: &mut (impl Sink<String> + Unpin),
// ) -> FlowyResult<()> {
// if !self.is_enabled() {
// info!("[AI Plugin] local ai is disabled, skip indexing");
// return Ok(());
// }
//
// for metadata in metadata_list {
// let mut file_metadata = HashMap::new();
// file_metadata.insert("id".to_string(), json!(&metadata.id));
// file_metadata.insert("name".to_string(), json!(&metadata.name));
// file_metadata.insert("source".to_string(), json!(&metadata.source));
//
// let file_path = Path::new(&metadata.data.content);
// if !file_path.exists() {
// return Err(
// FlowyError::record_not_found().with_context(format!("File not found: {:?}", file_path)),
// );
// }
// info!(
// "[AI Plugin] embed file: {:?}, with metadata: {:?}",
// file_path, file_metadata
// );
//
// match &metadata.data.content_type {
// ContextLoader::Unknown => {
// error!(
// "[AI Plugin] unsupported content type: {:?}",
// metadata.data.content_type
// );
// },
// ContextLoader::Text | ContextLoader::Markdown | ContextLoader::PDF => {
// self
// .process_index_file(
// chat_id,
// file_path.to_path_buf(),
// &file_metadata,
// index_process_sink,
// )
// .await?;
// },
// }
// }
//
// Ok(())
// }
#[allow(dead_code)]
async fn process_index_file(
&self,
chat_id: &Uuid,

View file

@ -9,33 +9,32 @@ use flowy_ai_pub::persistence::select_message_content;
use std::collections::HashMap;
use flowy_ai_pub::cloud::{
AIModel, AppErrorCode, AppResponseError, ChatCloudService, ChatMessage, ChatMessageMetadata,
ChatMessageType, ChatSettings, CompleteTextParams, CompletionStream, MessageCursor, ModelList,
RelatedQuestion, RepeatedChatMessage, RepeatedRelatedQuestion, ResponseFormat, StreamAnswer,
StreamComplete, UpdateChatParams,
AIModel, AppErrorCode, AppResponseError, ChatCloudService, ChatMessage, ChatMessageType,
ChatSettings, CompleteTextParams, CompletionStream, MessageCursor, ModelList, RelatedQuestion,
RepeatedChatMessage, RepeatedRelatedQuestion, ResponseFormat, StreamAnswer, StreamComplete,
UpdateChatParams,
};
use flowy_error::{FlowyError, FlowyResult};
use futures::{stream, Sink, StreamExt, TryStreamExt};
use futures::{stream, StreamExt, TryStreamExt};
use lib_infra::async_trait::async_trait;
use crate::local_ai::stream_util::QuestionStream;
use crate::stream_message::StreamMessage;
use flowy_storage_pub::storage::StorageService;
use futures_util::SinkExt;
use serde_json::{json, Value};
use std::path::Path;
use std::sync::{Arc, Weak};
use tracing::{info, trace};
use uuid::Uuid;
pub struct AICloudServiceMiddleware {
pub struct ChatServiceMiddleware {
cloud_service: Arc<dyn ChatCloudService>,
user_service: Arc<dyn AIUserService>,
local_ai: Arc<LocalAIController>,
#[allow(dead_code)]
storage_service: Weak<dyn StorageService>,
}
impl AICloudServiceMiddleware {
impl ChatServiceMiddleware {
pub fn new(
user_service: Arc<dyn AIUserService>,
cloud_service: Arc<dyn ChatCloudService>,
@ -50,34 +49,6 @@ impl AICloudServiceMiddleware {
}
}
pub async fn index_message_metadata(
&self,
chat_id: &Uuid,
metadata_list: &[ChatMessageMetadata],
index_process_sink: &mut (impl Sink<String> + Unpin),
) -> Result<(), FlowyError> {
if metadata_list.is_empty() {
return Ok(());
}
if self.local_ai.is_enabled() {
let _ = index_process_sink
.send(StreamMessage::IndexStart.to_string())
.await;
let result = self
.local_ai
.index_message_metadata(chat_id, metadata_list, index_process_sink)
.await;
let _ = index_process_sink
.send(StreamMessage::IndexEnd.to_string())
.await;
result?
} else if let Some(_storage_service) = self.storage_service.upgrade() {
//
}
Ok(())
}
fn get_message_content(&self, message_id: i64) -> FlowyResult<String> {
let uid = self.user_service.user_id()?;
let conn = self.user_service.sqlite_connection(uid)?;
@ -106,7 +77,7 @@ impl AICloudServiceMiddleware {
}
#[async_trait]
impl ChatCloudService for AICloudServiceMiddleware {
impl ChatCloudService for ChatServiceMiddleware {
async fn create_chat(
&self,
uid: &i64,
@ -128,11 +99,10 @@ impl ChatCloudService for AICloudServiceMiddleware {
chat_id: &Uuid,
message: &str,
message_type: ChatMessageType,
metadata: &[ChatMessageMetadata],
) -> Result<ChatMessage, FlowyError> {
self
.cloud_service
.create_question(workspace_id, chat_id, message, message_type, metadata)
.create_question(workspace_id, chat_id, message, message_type)
.await
}
@ -154,7 +124,7 @@ impl ChatCloudService for AICloudServiceMiddleware {
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
message_id: i64,
question_id: i64,
format: ResponseFormat,
ai_model: Option<AIModel>,
) -> Result<StreamAnswer, FlowyError> {
@ -166,7 +136,7 @@ impl ChatCloudService for AICloudServiceMiddleware {
info!("stream_answer use model: {:?}", ai_model);
if use_local_ai {
if self.local_ai.is_running() {
let content = self.get_message_content(message_id)?;
let content = self.get_message_content(question_id)?;
match self
.local_ai
.stream_question(
@ -191,7 +161,7 @@ impl ChatCloudService for AICloudServiceMiddleware {
} else {
self
.cloud_service
.stream_answer(workspace_id, chat_id, message_id, format, ai_model)
.stream_answer(workspace_id, chat_id, question_id, format, ai_model)
.await
}
}
@ -200,10 +170,10 @@ impl ChatCloudService for AICloudServiceMiddleware {
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
question_message_id: i64,
question_id: i64,
) -> Result<ChatMessage, FlowyError> {
if self.local_ai.is_running() {
let content = self.get_message_content(question_message_id)?;
let content = self.get_message_content(question_id)?;
match self
.local_ai
.ask_question(&chat_id.to_string(), &content)
@ -212,7 +182,7 @@ impl ChatCloudService for AICloudServiceMiddleware {
Ok(answer) => {
let message = self
.cloud_service
.create_answer(workspace_id, chat_id, &answer, question_message_id, None)
.create_answer(workspace_id, chat_id, &answer, question_id, None)
.await?;
Ok(message)
},
@ -224,7 +194,7 @@ impl ChatCloudService for AICloudServiceMiddleware {
} else {
self
.cloud_service
.get_answer(workspace_id, chat_id, question_message_id)
.get_answer(workspace_id, chat_id, question_id)
.await
}
}

View file

@ -0,0 +1 @@
pub mod offline_message_sync;

View file

@ -0,0 +1,258 @@
use crate::ai_manager::AIUserService;
use flowy_ai_pub::cloud::{
AIModel, ChatCloudService, ChatMessage, ChatMessageType, ChatSettings, CompleteTextParams,
MessageCursor, ModelList, RepeatedChatMessage, RepeatedRelatedQuestion, ResponseFormat,
StreamAnswer, StreamComplete, UpdateChatParams,
};
use flowy_ai_pub::persistence::{
update_chat_is_sync, update_chat_message_is_sync, upsert_chat, upsert_chat_messages,
ChatMessageTable, ChatTable,
};
use flowy_error::FlowyError;
use lib_infra::async_trait::async_trait;
use serde_json::Value;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use uuid::Uuid;
pub struct AutoSyncChatService {
cloud_service: Arc<dyn ChatCloudService>,
user_service: Arc<dyn AIUserService>,
}
impl AutoSyncChatService {
pub fn new(
cloud_service: Arc<dyn ChatCloudService>,
user_service: Arc<dyn AIUserService>,
) -> Self {
Self {
cloud_service,
user_service,
}
}
async fn upsert_message(
&self,
chat_id: &Uuid,
message: ChatMessage,
is_sync: bool,
) -> Result<(), FlowyError> {
let uid = self.user_service.user_id()?;
let conn = self.user_service.sqlite_connection(uid)?;
let row = ChatMessageTable::from_message(chat_id.to_string(), message, is_sync);
upsert_chat_messages(conn, &[row])?;
Ok(())
}
#[allow(dead_code)]
async fn update_message_is_sync(
&self,
chat_id: &Uuid,
message_id: i64,
) -> Result<(), FlowyError> {
let uid = self.user_service.user_id()?;
let conn = self.user_service.sqlite_connection(uid)?;
update_chat_message_is_sync(conn, &chat_id.to_string(), message_id, true)?;
Ok(())
}
}
#[async_trait]
impl ChatCloudService for AutoSyncChatService {
async fn create_chat(
&self,
uid: &i64,
workspace_id: &Uuid,
chat_id: &Uuid,
rag_ids: Vec<Uuid>,
name: &str,
metadata: Value,
) -> Result<(), FlowyError> {
let conn = self.user_service.sqlite_connection(*uid)?;
let chat = ChatTable::new(
chat_id.to_string(),
metadata.clone(),
rag_ids.clone(),
false,
);
upsert_chat(conn, &chat)?;
if self
.cloud_service
.create_chat(uid, workspace_id, chat_id, rag_ids, name, metadata)
.await
.is_ok()
{
let conn = self.user_service.sqlite_connection(*uid)?;
update_chat_is_sync(conn, &chat_id.to_string(), true)?;
}
Ok(())
}
async fn create_question(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
message: &str,
message_type: ChatMessageType,
) -> Result<ChatMessage, FlowyError> {
let message = self
.cloud_service
.create_question(workspace_id, chat_id, message, message_type)
.await?;
self.upsert_message(chat_id, message.clone(), true).await?;
// TODO: implement background sync
// self
// .update_message_is_sync(chat_id, message.message_id)
// .await?;
Ok(message)
}
async fn create_answer(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
message: &str,
question_id: i64,
metadata: Option<Value>,
) -> Result<ChatMessage, FlowyError> {
let message = self
.cloud_service
.create_answer(workspace_id, chat_id, message, question_id, metadata)
.await?;
// TODO: implement background sync
self.upsert_message(chat_id, message.clone(), true).await?;
Ok(message)
}
async fn stream_answer(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
question_id: i64,
format: ResponseFormat,
ai_model: Option<AIModel>,
) -> Result<StreamAnswer, FlowyError> {
self
.cloud_service
.stream_answer(workspace_id, chat_id, question_id, format, ai_model)
.await
}
async fn get_answer(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
question_id: i64,
) -> Result<ChatMessage, FlowyError> {
let message = self
.cloud_service
.get_answer(workspace_id, chat_id, question_id)
.await?;
// TODO: implement background sync
self.upsert_message(chat_id, message.clone(), true).await?;
Ok(message)
}
async fn get_chat_messages(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
offset: MessageCursor,
limit: u64,
) -> Result<RepeatedChatMessage, FlowyError> {
self
.cloud_service
.get_chat_messages(workspace_id, chat_id, offset, limit)
.await
}
async fn get_question_from_answer_id(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
answer_message_id: i64,
) -> Result<ChatMessage, FlowyError> {
self
.cloud_service
.get_question_from_answer_id(workspace_id, chat_id, answer_message_id)
.await
}
async fn get_related_message(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
message_id: i64,
ai_model: Option<AIModel>,
) -> Result<RepeatedRelatedQuestion, FlowyError> {
self
.cloud_service
.get_related_message(workspace_id, chat_id, message_id, ai_model)
.await
}
async fn stream_complete(
&self,
workspace_id: &Uuid,
params: CompleteTextParams,
ai_model: Option<AIModel>,
) -> Result<StreamComplete, FlowyError> {
self
.cloud_service
.stream_complete(workspace_id, params, ai_model)
.await
}
async fn embed_file(
&self,
workspace_id: &Uuid,
file_path: &Path,
chat_id: &Uuid,
metadata: Option<HashMap<String, Value>>,
) -> Result<(), FlowyError> {
self
.cloud_service
.embed_file(workspace_id, file_path, chat_id, metadata)
.await
}
async fn get_chat_settings(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
) -> Result<ChatSettings, FlowyError> {
// TODO: implement background sync
self
.cloud_service
.get_chat_settings(workspace_id, chat_id)
.await
}
async fn update_chat_settings(
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
params: UpdateChatParams,
) -> Result<(), FlowyError> {
// TODO: implement background sync
self
.cloud_service
.update_chat_settings(workspace_id, chat_id, params)
.await
}
async fn get_available_models(&self, workspace_id: &Uuid) -> Result<ModelList, FlowyError> {
self.cloud_service.get_available_models(workspace_id).await
}
async fn get_workspace_default_model(&self, workspace_id: &Uuid) -> Result<String, FlowyError> {
self
.cloud_service
.get_workspace_default_model(workspace_id)
.await
}
}

View file

@ -1,5 +1,6 @@
use std::fmt::Display;
#[allow(dead_code)]
pub enum StreamMessage {
MessageId(i64),
IndexStart,

View file

@ -14,9 +14,9 @@ use flowy_ai_pub::cloud::search_dto::{
SearchDocumentResponseItem, SearchResult, SearchSummaryResult,
};
use flowy_ai_pub::cloud::{
AIModel, ChatCloudService, ChatMessage, ChatMessageMetadata, ChatMessageType, ChatSettings,
CompleteTextParams, MessageCursor, ModelList, RepeatedChatMessage, ResponseFormat, StreamAnswer,
StreamComplete, UpdateChatParams,
AIModel, ChatCloudService, ChatMessage, ChatMessageType, ChatSettings, CompleteTextParams,
MessageCursor, ModelList, RepeatedChatMessage, ResponseFormat, StreamAnswer, StreamComplete,
UpdateChatParams,
};
use flowy_database_pub::cloud::{
DatabaseAIService, DatabaseCloudService, DatabaseSnapshot, EncodeCollabByOid, SummaryRowContent,
@ -683,13 +683,12 @@ impl ChatCloudService for ServerProvider {
chat_id: &Uuid,
message: &str,
message_type: ChatMessageType,
metadata: &[ChatMessageMetadata],
) -> Result<ChatMessage, FlowyError> {
let message = message.to_string();
self
.get_server()?
.chat_service()
.create_question(workspace_id, chat_id, &message, message_type, metadata)
.create_question(workspace_id, chat_id, &message, message_type)
.await
}
@ -712,14 +711,14 @@ impl ChatCloudService for ServerProvider {
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
message_id: i64,
question_id: i64,
format: ResponseFormat,
ai_model: Option<AIModel>,
) -> Result<StreamAnswer, FlowyError> {
let server = self.get_server()?;
server
.chat_service()
.stream_answer(workspace_id, chat_id, message_id, format, ai_model)
.stream_answer(workspace_id, chat_id, question_id, format, ai_model)
.await
}
@ -768,12 +767,12 @@ impl ChatCloudService for ServerProvider {
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
question_message_id: i64,
question_id: i64,
) -> Result<ChatMessage, FlowyError> {
let server = self.get_server();
server?
.chat_service()
.get_answer(workspace_id, chat_id, question_message_id)
.get_answer(workspace_id, chat_id, question_id)
.await
}

View file

@ -8,7 +8,7 @@ use flowy_error::{FlowyError, FlowyResult};
use flowy_folder::manager::FolderManager;
use flowy_search::folder::indexer::FolderIndexManagerImpl;
use flowy_search::services::manager::SearchManager;
use flowy_server::af_cloud::define::ServerUser;
use flowy_server::af_cloud::define::LoginUserService;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use std::time::Duration;
@ -336,7 +336,7 @@ impl ServerUserImpl {
}
#[async_trait]
impl ServerUser for ServerUserImpl {
impl LoginUserService for ServerUserImpl {
fn workspace_id(&self) -> FlowyResult<Uuid> {
self.upgrade_user()?.workspace_id()
}

View file

@ -2,24 +2,19 @@ use crate::AppFlowyCoreConfig;
use af_plugin::manager::PluginManager;
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use flowy_ai::ai_manager::AIUserService;
use flowy_ai::local_ai::controller::LocalAIController;
use flowy_error::{FlowyError, FlowyResult};
use flowy_server::af_cloud::define::ServerUser;
use flowy_server::af_cloud::define::{AIUserServiceImpl, LoginUserService};
use flowy_server::af_cloud::AppFlowyCloudServer;
use flowy_server::local_server::LocalServer;
use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
use flowy_server_pub::AuthenticatorType;
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
use flowy_user_pub::entities::*;
use lib_infra::async_trait::async_trait;
use serde_repr::*;
use std::fmt::{Display, Formatter};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::{Arc, Weak};
use uuid::Uuid;
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
@ -61,7 +56,7 @@ pub struct ServerProvider {
/// The authenticator type of the user.
authenticator: AtomicU8,
user: Arc<dyn ServerUser>,
user: Arc<dyn LoginUserService>,
pub(crate) uid: Arc<ArcSwapOption<i64>>,
pub local_ai: Arc<LocalAIController>,
}
@ -71,7 +66,7 @@ impl ServerProvider {
config: AppFlowyCoreConfig,
server: Server,
store_preferences: Weak<KVStorePreferences>,
server_user: impl ServerUser + 'static,
server_user: impl LoginUserService + 'static,
) -> Self {
let user = Arc::new(server_user);
let encryption = EncryptionImpl::new(None);
@ -182,28 +177,3 @@ pub fn current_server_type() -> Server {
AuthenticatorType::AppFlowyCloud => Server::AppFlowyCloud,
}
}
struct AIUserServiceImpl(Arc<dyn ServerUser>);
#[async_trait]
impl AIUserService for AIUserServiceImpl {
fn user_id(&self) -> Result<i64, FlowyError> {
self.0.user_id()
}
async fn is_local_model(&self) -> FlowyResult<bool> {
self.0.is_local_mode().await
}
fn workspace_id(&self) -> Result<Uuid, FlowyError> {
self.0.workspace_id()
}
fn sqlite_connection(&self, uid: i64) -> Result<DBConnection, FlowyError> {
self.0.get_sqlite_db(uid)
}
fn application_root_dir(&self) -> Result<PathBuf, FlowyError> {
self.0.application_root_dir()
}
}

View file

@ -1,7 +1,9 @@
use flowy_ai::ai_manager::AIUserService;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use lib_infra::async_trait::async_trait;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
pub const USER_SIGN_IN_URL: &str = "sign_in_url";
@ -11,7 +13,7 @@ pub const USER_DEVICE_ID: &str = "device_id";
/// Represents a user that is currently using the server.
#[async_trait]
pub trait ServerUser: Send + Sync {
pub trait LoginUserService: Send + Sync {
/// different user might return different workspace id.
fn workspace_id(&self) -> FlowyResult<Uuid>;
@ -21,3 +23,28 @@ pub trait ServerUser: Send + Sync {
fn get_sqlite_db(&self, uid: i64) -> Result<DBConnection, FlowyError>;
fn application_root_dir(&self) -> Result<PathBuf, FlowyError>;
}
pub struct AIUserServiceImpl(pub Arc<dyn LoginUserService>);
#[async_trait]
impl AIUserService for AIUserServiceImpl {
fn user_id(&self) -> Result<i64, FlowyError> {
self.0.user_id()
}
async fn is_local_model(&self) -> FlowyResult<bool> {
self.0.is_local_mode().await
}
fn workspace_id(&self) -> Result<Uuid, FlowyError> {
self.0.workspace_id()
}
fn sqlite_connection(&self, uid: i64) -> Result<DBConnection, FlowyError> {
self.0.get_sqlite_db(uid)
}
fn application_root_dir(&self) -> Result<PathBuf, FlowyError> {
self.0.application_root_dir()
}
}

View file

@ -8,8 +8,8 @@ use client_api::entity::chat_dto::{
RepeatedChatMessage,
};
use flowy_ai_pub::cloud::{
AIModel, ChatCloudService, ChatMessage, ChatMessageMetadata, ChatMessageType, ChatSettings,
ModelList, StreamAnswer, StreamComplete, UpdateChatParams,
AIModel, ChatCloudService, ChatMessage, ChatMessageType, ChatSettings, ModelList, StreamAnswer,
StreamComplete, UpdateChatParams,
};
use flowy_error::FlowyError;
use futures_util::{StreamExt, TryStreamExt};
@ -20,12 +20,12 @@ use std::path::Path;
use tracing::trace;
use uuid::Uuid;
pub(crate) struct AFCloudChatCloudServiceImpl<T> {
pub(crate) struct CloudChatServiceImpl<T> {
pub inner: T,
}
#[async_trait]
impl<T> ChatCloudService for AFCloudChatCloudServiceImpl<T>
impl<T> ChatCloudService for CloudChatServiceImpl<T>
where
T: AFServer,
{
@ -59,7 +59,6 @@ where
chat_id: &Uuid,
message: &str,
message_type: ChatMessageType,
metadata: &[ChatMessageMetadata],
) -> Result<ChatMessage, FlowyError> {
let chat_id = chat_id.to_string();
let try_get_client = self.inner.try_get_client();
@ -132,15 +131,11 @@ where
&self,
workspace_id: &Uuid,
chat_id: &Uuid,
question_message_id: i64,
question_id: i64,
) -> Result<ChatMessage, FlowyError> {
let try_get_client = self.inner.try_get_client();
let resp = try_get_client?
.get_answer(
workspace_id,
chat_id.to_string().as_str(),
question_message_id,
)
.get_answer(workspace_id, chat_id.to_string().as_str(), question_id)
.await
.map_err(FlowyError::from)?;
Ok(resp)

View file

@ -1,5 +1,5 @@
#![allow(unused_variables)]
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::define::LoginUserService;
use crate::af_cloud::impls::util::check_request_workspace_id_is_match;
use crate::af_cloud::AFServer;
use client_api::entity::ai_dto::{
@ -23,7 +23,7 @@ use uuid::Uuid;
pub(crate) struct AFCloudDatabaseCloudServiceImpl<T> {
pub inner: T,
pub user: Arc<dyn ServerUser>,
pub logged_user: Arc<dyn LoginUserService>,
}
#[async_trait]
@ -40,7 +40,7 @@ where
workspace_id: &Uuid,
) -> Result<Option<EncodedCollab>, FlowyError> {
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
let cloned_user = self.logged_user.clone();
let params = QueryCollabParams {
workspace_id: *workspace_id,
inner: QueryCollab::new(*object_id, collab_type),
@ -95,7 +95,7 @@ where
workspace_id: &Uuid,
) -> Result<EncodeCollabByOid, FlowyError> {
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
let cloned_user = self.logged_user.clone();
let client = try_get_client?;
let params = object_ids
.into_iter()

View file

@ -13,13 +13,13 @@ use std::sync::Arc;
use tracing::instrument;
use uuid::Uuid;
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::define::LoginUserService;
use crate::af_cloud::impls::util::check_request_workspace_id_is_match;
use crate::af_cloud::AFServer;
pub(crate) struct AFCloudDocumentCloudServiceImpl<T> {
pub inner: T,
pub user: Arc<dyn ServerUser>,
pub logged_user: Arc<dyn LoginUserService>,
}
#[async_trait]
@ -49,7 +49,7 @@ where
check_request_workspace_id_is_match(
workspace_id,
&self.user,
&self.logged_user,
format!("get document doc state:{}", document_id),
)?;
@ -85,7 +85,7 @@ where
.to_vec();
check_request_workspace_id_is_match(
workspace_id,
&self.user,
&self.logged_user,
format!("Get {} document", document_id),
)?;
let collab = Collab::new_with_source(

View file

@ -22,13 +22,13 @@ use flowy_folder_pub::cloud::{
use flowy_folder_pub::entities::PublishPayload;
use lib_infra::async_trait::async_trait;
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::define::LoginUserService;
use crate::af_cloud::impls::util::check_request_workspace_id_is_match;
use crate::af_cloud::AFServer;
pub(crate) struct AFCloudFolderCloudServiceImpl<T> {
pub inner: T,
pub user: Arc<dyn ServerUser>,
pub logged_user: Arc<dyn LoginUserService>,
}
#[async_trait]
@ -91,7 +91,7 @@ where
) -> Result<Option<FolderData>, FlowyError> {
let uid = *uid;
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
let cloned_user = self.logged_user.clone();
let params = QueryCollabParams {
workspace_id: *workspace_id,
inner: QueryCollab::new(*workspace_id, CollabType::Folder),
@ -131,7 +131,7 @@ where
object_id: &Uuid,
) -> Result<Vec<u8>, FlowyError> {
let try_get_client = self.inner.try_get_client();
let cloned_user = self.user.clone();
let cloned_user = self.logged_user.clone();
let params = QueryCollabParams {
workspace_id: *workspace_id,
inner: QueryCollab::new(*object_id, collab_type),

View file

@ -31,7 +31,7 @@ use lib_infra::async_trait::async_trait;
use lib_infra::box_any::BoxAny;
use uuid::Uuid;
use crate::af_cloud::define::{ServerUser, USER_SIGN_IN_URL};
use crate::af_cloud::define::{LoginUserService, USER_SIGN_IN_URL};
use crate::af_cloud::impls::user::dto::{
af_update_from_update_params, from_af_workspace_member, to_af_role, user_profile_from_af_profile,
};
@ -44,14 +44,14 @@ use super::dto::{from_af_workspace_invitation_status, to_workspace_invitation_st
pub(crate) struct AFCloudUserAuthServiceImpl<T> {
server: T,
user_change_recv: ArcSwapOption<tokio::sync::mpsc::Receiver<UserUpdate>>,
user: Arc<dyn ServerUser>,
user: Arc<dyn LoginUserService>,
}
impl<T> AFCloudUserAuthServiceImpl<T> {
pub(crate) fn new(
server: T,
user_change_recv: tokio::sync::mpsc::Receiver<UserUpdate>,
user: Arc<dyn ServerUser>,
user: Arc<dyn LoginUserService>,
) -> Self {
Self {
server,

View file

@ -1,4 +1,4 @@
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::define::LoginUserService;
use flowy_error::{FlowyError, FlowyResult};
use std::sync::Arc;
use tracing::warn;
@ -9,7 +9,7 @@ use uuid::Uuid;
/// This ensures that the operation is being performed in the correct workspace context, enhancing security.
pub fn check_request_workspace_id_is_match(
expected_workspace_id: &Uuid,
user: &Arc<dyn ServerUser>,
user: &Arc<dyn LoginUserService>,
action: impl AsRef<str>,
) -> FlowyResult<()> {
let actual_workspace_id = user.workspace_id()?;

View file

@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::define::{AIUserServiceImpl, LoginUserService};
use anyhow::Error;
use arc_swap::ArcSwap;
use client_api::collab_sync::ServerCollabMessage;
@ -24,6 +24,11 @@ use flowy_storage_pub::cloud::StorageCloudService;
use flowy_user_pub::cloud::{UserCloudService, UserUpdate};
use flowy_user_pub::entities::UserTokenState;
use crate::af_cloud::impls::{
AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl, AFCloudFileStorageServiceImpl,
AFCloudFolderCloudServiceImpl, AFCloudUserAuthServiceImpl, CloudChatServiceImpl,
};
use flowy_ai::offline::offline_message_sync::AutoSyncChatService;
use rand::Rng;
use semver::Version;
use tokio::select;
@ -34,11 +39,6 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use uuid::Uuid;
use crate::af_cloud::impls::{
AFCloudChatCloudServiceImpl, AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl,
AFCloudFileStorageServiceImpl, AFCloudFolderCloudServiceImpl, AFCloudUserAuthServiceImpl,
};
use crate::AppFlowyServer;
use super::impls::AFCloudSearchCloudServiceImpl;
@ -53,7 +53,7 @@ pub struct AppFlowyCloudServer {
network_reachable: Arc<AtomicBool>,
pub device_id: String,
ws_client: Arc<WSClient>,
user: Arc<dyn ServerUser>,
logged_user: Arc<dyn LoginUserService>,
}
impl AppFlowyCloudServer {
@ -62,7 +62,7 @@ impl AppFlowyCloudServer {
enable_sync: bool,
mut device_id: String,
client_version: Version,
user: Arc<dyn ServerUser>,
auth_user_service: Arc<dyn LoginUserService>,
) -> Self {
// The device id can't be empty, so we generate a new one if it is.
if device_id.is_empty() {
@ -100,7 +100,7 @@ impl AppFlowyCloudServer {
network_reachable,
device_id,
ws_client,
user,
logged_user: auth_user_service,
}
}
@ -187,7 +187,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
Arc::new(AFCloudUserAuthServiceImpl::new(
server,
rx,
self.user.clone(),
self.logged_user.clone(),
))
}
@ -197,7 +197,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
};
Arc::new(AFCloudFolderCloudServiceImpl {
inner: server,
user: self.user.clone(),
logged_user: self.logged_user.clone(),
})
}
@ -207,7 +207,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
};
Arc::new(AFCloudDatabaseCloudServiceImpl {
inner: server,
user: self.user.clone(),
logged_user: self.logged_user.clone(),
})
}
@ -217,7 +217,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
};
Some(Arc::new(AFCloudDatabaseCloudServiceImpl {
inner: server,
user: self.user.clone(),
logged_user: self.logged_user.clone(),
}))
}
@ -227,7 +227,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
};
Arc::new(AFCloudDocumentCloudServiceImpl {
inner: server,
user: self.user.clone(),
logged_user: self.logged_user.clone(),
})
}
@ -235,7 +235,11 @@ impl AppFlowyServer for AppFlowyCloudServer {
let server = AFServerImpl {
client: self.get_client(),
};
Arc::new(AFCloudChatCloudServiceImpl { inner: server })
Arc::new(AutoSyncChatService::new(
Arc::new(CloudChatServiceImpl { inner: server }),
Arc::new(AIUserServiceImpl(self.logged_user.clone())),
))
}
fn subscribe_ws_state(&self) -> Option<WSConnectStateReceiver> {

View file

@ -1,4 +1,4 @@
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::define::LoginUserService;
use chrono::{TimeZone, Utc};
use client_api::entity::ai_dto::RepeatedRelatedQuestion;
use client_api::entity::CompletionStream;
@ -6,14 +6,15 @@ use flowy_ai::local_ai::controller::LocalAIController;
use flowy_ai::local_ai::stream_util::QuestionStream;
use flowy_ai_pub::cloud::chat_dto::{ChatAuthor, ChatAuthorType};
use flowy_ai_pub::cloud::{
AIModel, AppErrorCode, AppResponseError, ChatCloudService, ChatMessage, ChatMessageMetadata,
ChatMessageType, ChatSettings, CompleteTextParams, MessageCursor, ModelList, RelatedQuestion,
RepeatedChatMessage, ResponseFormat, StreamAnswer, StreamComplete, UpdateChatParams,
AIModel, AppErrorCode, AppResponseError, ChatCloudService, ChatMessage, ChatMessageType,
ChatSettings, CompleteTextParams, MessageCursor, ModelList, RelatedQuestion, RepeatedChatMessage,
ResponseFormat, StreamAnswer, StreamComplete, UpdateChatParams,
};
use flowy_ai_pub::persistence::{
deserialize_chat_metadata, deserialize_rag_ids, read_chat, select_chat_messages,
select_message_content, select_message_where_match_reply_message_id, serialize_chat_metadata,
serialize_rag_ids, update_chat, upsert_chat, ChatMessageTable, ChatTable, ChatTableChangeset,
deserialize_chat_metadata, deserialize_rag_ids, read_chat,
select_answer_where_match_reply_message_id, select_chat_messages, select_message_content,
serialize_chat_metadata, serialize_rag_ids, update_chat, upsert_chat, upsert_chat_messages,
ChatMessageTable, ChatTable, ChatTableChangeset,
};
use flowy_error::{FlowyError, FlowyResult};
use futures_util::{stream, StreamExt, TryStreamExt};
@ -26,12 +27,12 @@ use std::sync::Arc;
use tracing::trace;
use uuid::Uuid;
pub struct LocalServerChatServiceImpl {
pub user: Arc<dyn ServerUser>,
pub struct LocalChatServiceImpl {
pub user: Arc<dyn LoginUserService>,
pub local_ai: Arc<LocalAIController>,
}
impl LocalServerChatServiceImpl {
impl LocalChatServiceImpl {
fn get_message_content(&self, message_id: i64) -> FlowyResult<String> {
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
@ -40,35 +41,30 @@ impl LocalServerChatServiceImpl {
})?;
Ok(content)
}
async fn upsert_message(&self, chat_id: &Uuid, message: ChatMessage) -> Result<(), FlowyError> {
let uid = self.user.user_id()?;
let conn = self.user.get_sqlite_db(uid)?;
let row = ChatMessageTable::from_message(chat_id.to_string(), message, true);
upsert_chat_messages(conn, &[row])?;
Ok(())
}
}
#[async_trait]
impl ChatCloudService for LocalServerChatServiceImpl {
impl ChatCloudService for LocalChatServiceImpl {
async fn create_chat(
&self,
_uid: &i64,
_workspace_id: &Uuid,
chat_id: &Uuid,
rag_ids: Vec<Uuid>,
name: &str,
_name: &str,
metadata: Value,
) -> Result<(), FlowyError> {
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let rag_ids = rag_ids
.iter()
.map(|v| v.to_string())
.collect::<Vec<String>>();
let row = ChatTable {
chat_id: chat_id.to_string(),
created_at: timestamp(),
name: name.to_string(),
metadata: serialize_chat_metadata(&metadata),
rag_ids: Some(serialize_rag_ids(&rag_ids)),
};
let row = ChatTable::new(chat_id.to_string(), metadata, rag_ids, true);
upsert_chat(db, &row)?;
Ok(())
}
@ -76,25 +72,23 @@ impl ChatCloudService for LocalServerChatServiceImpl {
async fn create_question(
&self,
_workspace_id: &Uuid,
_chat_id: &Uuid,
chat_id: &Uuid,
message: &str,
message_type: ChatMessageType,
_metadata: &[ChatMessageMetadata],
) -> Result<ChatMessage, FlowyError> {
match message_type {
ChatMessageType::System => Ok(ChatMessage::new_system(timestamp(), message.to_string())),
ChatMessageType::User => Ok(ChatMessage::new_human(
timestamp(),
message.to_string(),
None,
)),
}
let message = match message_type {
ChatMessageType::System => ChatMessage::new_system(timestamp(), message.to_string()),
ChatMessageType::User => ChatMessage::new_human(timestamp(), message.to_string(), None),
};
self.upsert_message(chat_id, message.clone()).await?;
Ok(message)
}
async fn create_answer(
&self,
_workspace_id: &Uuid,
_chat_id: &Uuid,
chat_id: &Uuid,
message: &str,
question_id: i64,
metadata: Option<serde_json::Value>,
@ -103,6 +97,7 @@ impl ChatCloudService for LocalServerChatServiceImpl {
if let Some(metadata) = metadata {
message.metadata = metadata;
}
self.upsert_message(chat_id, message.clone()).await?;
Ok(message)
}
@ -141,10 +136,16 @@ impl ChatCloudService for LocalServerChatServiceImpl {
async fn get_answer(
&self,
_workspace_id: &Uuid,
_chat_id: &Uuid,
_question_message_id: i64,
chat_id: &Uuid,
question_id: i64,
) -> Result<ChatMessage, FlowyError> {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
match select_answer_where_match_reply_message_id(db, &chat_id.to_string(), question_id)? {
None => Err(FlowyError::record_not_found()),
Some(message) => Ok(chat_message_from_row(message)),
}
}
async fn get_chat_messages(
@ -181,7 +182,7 @@ impl ChatCloudService for LocalServerChatServiceImpl {
let chat_id = chat_id.to_string();
let uid = self.user.user_id()?;
let db = self.user.get_sqlite_db(uid)?;
let row = select_message_where_match_reply_message_id(db, &chat_id, answer_message_id)?
let row = select_answer_where_match_reply_message_id(db, &chat_id, answer_message_id)?
.map(chat_message_from_row)
.ok_or_else(FlowyError::record_not_found)?;
Ok(row)
@ -281,7 +282,7 @@ impl ChatCloudService for LocalServerChatServiceImpl {
let db = self.user.get_sqlite_db(uid)?;
let row = read_chat(db, &chat_id)?;
let rag_ids = deserialize_rag_ids(&row.rag_ids);
let metadata = deserialize_chat_metadata::<serde_json::Value>(&row.metadata);
let metadata = deserialize_chat_metadata::<Value>(&row.metadata);
let setting = ChatSettings {
name: row.name,
rag_ids,
@ -304,6 +305,7 @@ impl ChatCloudService for LocalServerChatServiceImpl {
name: s.name,
metadata: s.metadata.map(|s| serialize_chat_metadata(&s)),
rag_ids: s.rag_ids.map(|s| serialize_rag_ids(&s)),
is_sync: None,
};
update_chat(&mut db, changeset)?;

View file

@ -1,11 +1,10 @@
use flowy_search_pub::cloud::SearchCloudService;
use std::sync::Arc;
use crate::af_cloud::define::ServerUser;
use crate::af_cloud::define::LoginUserService;
use crate::local_server::impls::{
LocalServerChatServiceImpl, LocalServerDatabaseCloudServiceImpl,
LocalServerDocumentCloudServiceImpl, LocalServerFolderCloudServiceImpl,
LocalServerUserServiceImpl,
LocalChatServiceImpl, LocalServerDatabaseCloudServiceImpl, LocalServerDocumentCloudServiceImpl,
LocalServerFolderCloudServiceImpl, LocalServerUserServiceImpl,
};
use crate::AppFlowyServer;
use flowy_ai::local_ai::controller::LocalAIController;
@ -18,13 +17,13 @@ use flowy_user_pub::cloud::UserCloudService;
use tokio::sync::mpsc;
pub struct LocalServer {
user: Arc<dyn ServerUser>,
user: Arc<dyn LoginUserService>,
local_ai: Arc<LocalAIController>,
stop_tx: Option<mpsc::Sender<()>>,
}
impl LocalServer {
pub fn new(user: Arc<dyn ServerUser>, local_ai: Arc<LocalAIController>) -> Self {
pub fn new(user: Arc<dyn LoginUserService>, local_ai: Arc<LocalAIController>) -> Self {
Self {
user,
local_ai,
@ -62,7 +61,7 @@ impl AppFlowyServer for LocalServer {
}
fn chat_service(&self) -> Arc<dyn ChatCloudService> {
Arc::new(LocalServerChatServiceImpl {
Arc::new(LocalChatServiceImpl {
user: self.user.clone(),
local_ai: self.local_ai.clone(),
})

View file

@ -8,7 +8,7 @@ use flowy_error::{FlowyError, FlowyResult};
use uuid::Uuid;
use crate::setup_log;
use flowy_server::af_cloud::define::ServerUser;
use flowy_server::af_cloud::define::LoginUserService;
use flowy_server::af_cloud::AppFlowyCloudServer;
use flowy_server_pub::af_cloud_config::AFCloudConfiguration;
use flowy_sqlite::DBConnection;
@ -42,7 +42,7 @@ pub fn af_cloud_server(config: AFCloudConfiguration) -> Arc<AppFlowyCloudServer>
struct FakeServerUserImpl;
#[async_trait]
impl ServerUser for FakeServerUserImpl {
impl LoginUserService for FakeServerUserImpl {
fn workspace_id(&self) -> FlowyResult<Uuid> {
todo!()
}

View file

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE chat_table DROP COLUMN is_sync;
ALTER TABLE chat_message_table DROP COLUMN is_sync;

View file

@ -0,0 +1,5 @@
-- Your SQL goes here
ALTER TABLE chat_table
ADD COLUMN is_sync BOOLEAN DEFAULT TRUE NOT NULL;
ALTER TABLE chat_message_table
ADD COLUMN is_sync BOOLEAN DEFAULT TRUE NOT NULL;

View file

@ -27,6 +27,7 @@ diesel::table! {
author_id -> Text,
reply_message_id -> Nullable<BigInt>,
metadata -> Nullable<Text>,
is_sync -> Bool,
}
}
@ -37,6 +38,7 @@ diesel::table! {
name -> Text,
metadata -> Text,
rag_ids -> Nullable<Text>,
is_sync -> Bool,
}
}