AppFlowy-Cloud/tests/collab/single_device_edit.rs
Nathan.fooo 2ab1e94b99
chore: create embed in background (#1071)
* refactor: create indexer scheduler

* refactor: replace current calls

* chore: use rayon thread pool

* chore: index unindexed collab using thread pool

* chore: index collab when write to disk

* chore: add test

* chore: add test

* chore: add test

* chore: fix client api deps

* chore: chat with document test

* chore: fix test

* chore: clippy

* chore: update test
2024-12-15 12:29:51 +08:00

771 lines
20 KiB
Rust

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use assert_json_diff::assert_json_eq;
use client_api::entity::AFRole;
use collab::core::origin::CollabOrigin;
use collab_entity::CollabType;
use serde_json::json;
use tokio::time::sleep;
use uuid::Uuid;
use crate::collab::util::{
generate_random_bytes, generate_random_string, make_big_collab_doc_state,
};
use client_api_test::*;
use collab_rt_entity::{CollabMessage, RealtimeMessage, UpdateSync, MAXIMUM_REALTIME_MESSAGE_SIZE};
#[tokio::test]
async fn realtime_write_single_collab_test() {
let collab_type = CollabType::Unknown;
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = test_client
.create_and_edit_collab(&workspace_id, collab_type.clone())
.await;
test_client
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
// Edit the collab
for i in 0..=5 {
test_client
.insert_into(&object_id, &i.to_string(), i.to_string())
.await;
}
let expected_json = json!( {
"0": "0",
"1": "1",
"2": "2",
"3": "3",
});
test_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&collab_type,
10,
expected_json,
)
.await
.unwrap();
}
#[tokio::test]
async fn collab_write_small_chunk_of_data_test() {
let collab_type = CollabType::Unknown;
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = Uuid::new_v4().to_string();
// Calling the open_collab function directly will create the collab object in the plugin.
// The [CollabStoragePlugin] plugin try to get the collab object from the database, but it doesn't exist.
// So the plugin will create the collab object.
test_client
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
let mut expected_json = HashMap::new();
// Edit the collab
for i in 0..=20 {
test_client
.insert_into(&object_id, &i.to_string(), i.to_string())
.await;
expected_json.insert(i.to_string(), i.to_string());
sleep(Duration::from_millis(300)).await;
}
test_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
test_client.disconnect().await;
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&collab_type,
10,
json!(expected_json),
)
.await
.unwrap();
}
#[tokio::test]
async fn collab_write_big_chunk_of_data_test() {
let collab_type = CollabType::Unknown;
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = Uuid::new_v4().to_string();
test_client
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
let s = generate_random_string(10000);
test_client
.insert_into(&object_id, "big_text", s.clone())
.await;
test_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&collab_type,
10,
json!({
"big_text": s
}),
)
.await
.unwrap();
}
#[tokio::test]
async fn write_big_chunk_data_init_sync_test() {
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let object_id = Uuid::new_v4().to_string();
let big_text = generate_random_string((MAXIMUM_REALTIME_MESSAGE_SIZE / 2) as usize);
let collab_type = CollabType::Unknown;
let doc_state = make_big_collab_doc_state(&object_id, "big_text", big_text.clone());
// the big doc_state will force the init_sync using the http request.
// It will trigger the POST_REALTIME_MESSAGE_STREAM_HANDLER to handle the request.
test_client
.open_collab_with_doc_state(&workspace_id, &object_id, collab_type.clone(), doc_state)
.await;
test_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&collab_type,
10,
json!({
"big_text": big_text
}),
)
.await
.unwrap();
}
#[tokio::test]
async fn realtime_write_multiple_collab_test() {
let mut test_client = TestClient::new_user().await;
let workspace_id = test_client.workspace_id().await;
let mut object_ids = vec![];
for _ in 0..5 {
let collab_type = CollabType::Unknown;
let object_id = test_client
.create_and_edit_collab(&workspace_id, collab_type.clone())
.await;
test_client
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
for i in 0..=5 {
test_client
.insert_into(&object_id, &i.to_string(), i.to_string())
.await;
}
test_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
object_ids.push(object_id);
}
// Wait for the messages to be sent
for object_id in object_ids {
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&CollabType::Document,
10,
json!( {
"0": "0",
"1": "1",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
}),
)
.await
.unwrap();
}
}
#[tokio::test]
async fn second_connect_override_first_connect_test() {
// Different TestClient with same device connect, the last one will
// take over the connection.
let collab_type = CollabType::Unknown;
let mut client = TestClient::new_user().await;
let workspace_id = client.workspace_id().await;
let object_id = client
.create_and_edit_collab(&workspace_id, collab_type.clone())
.await;
client.insert_into(&object_id, "1", "a").await;
// Sleep one second for the doc observer the update. Otherwise, the
// sync complete might be called before the update being schedule
sleep(Duration::from_secs(1)).await;
client.wait_object_sync_complete(&object_id).await.unwrap();
// the new_client connect with same device_id, so it will replace the existing client
// in the server. Which means the old client will not receive updates.
let mut new_client =
TestClient::new_with_device_id(&client.device_id, client.user.clone(), true).await;
new_client
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
new_client.insert_into(&object_id, "2", "b").await;
new_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
assert_client_collab_include_value(
&mut new_client,
&object_id,
json!({
"1": "a",
"2": "b"
}),
)
.await
.unwrap();
assert_server_collab(
&workspace_id,
&mut new_client.api_client,
&object_id,
&collab_type,
60,
json!({
"1": "a",
"2": "b",
}),
)
.await
.unwrap();
}
#[tokio::test]
async fn same_device_multiple_connect_in_order_test() {
let collab_type = CollabType::Unknown;
let mut old_client = TestClient::new_user().await;
let workspace_id = old_client.workspace_id().await;
let object_id = old_client
.create_and_edit_collab(&workspace_id, collab_type.clone())
.await;
// simulate client try to connect the websocket server by three times
// each connect alter the document
for i in 0..3 {
let mut new_client =
TestClient::new_with_device_id(&old_client.device_id, old_client.user.clone(), true).await;
new_client
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
new_client.insert_into(&object_id, &i.to_string(), i).await;
sleep(Duration::from_millis(500)).await;
new_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
}
assert_server_collab(
&workspace_id,
&mut old_client.api_client,
&object_id,
&collab_type,
10,
json!({"0":0,"1":1,"2":2}),
)
.await
.unwrap();
}
#[tokio::test]
async fn two_direction_peer_sync_test() {
let collab_type = CollabType::Unknown;
let mut client_1 = TestClient::new_user().await;
let workspace_id = client_1.workspace_id().await;
let object_id = client_1
.create_and_edit_collab(&workspace_id, collab_type.clone())
.await;
let mut client_2 = TestClient::new_user().await;
// Before the client_2 want to edit the collab object, it needs to become a member of the collab
// Otherwise, the server will reject the edit request
client_1
.invite_and_accepted_workspace_member(&workspace_id, &client_2, AFRole::Member)
.await
.unwrap();
client_2
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
client_1.insert_into(&object_id, "name", "AppFlowy").await;
client_1
.wait_object_sync_complete(&object_id)
.await
.unwrap();
client_2
.insert_into(
&object_id,
"support platform",
"macOS, Windows, Linux, iOS, Android",
)
.await;
client_2
.wait_object_sync_complete(&object_id)
.await
.unwrap();
let expected_json = json!({
"name": "AppFlowy",
"support platform": "macOS, Windows, Linux, iOS, Android"
});
assert_client_collab_include_value(&mut client_1, &object_id, expected_json.clone())
.await
.unwrap();
assert_client_collab_include_value(&mut client_2, &object_id, expected_json.clone())
.await
.unwrap();
}
#[tokio::test]
async fn multiple_collab_edit_test() {
let collab_type = CollabType::Unknown;
let mut client_1 = TestClient::new_user().await;
let workspace_id_1 = client_1.workspace_id().await;
let object_id_1 = client_1
.create_and_edit_collab(&workspace_id_1, collab_type.clone())
.await;
client_1
.open_collab(&workspace_id_1, &object_id_1, collab_type.clone())
.await;
let mut client_2 = TestClient::new_user().await;
let workspace_id_2 = client_2.workspace_id().await;
let object_id_2 = client_2
.create_and_edit_collab(&workspace_id_2, collab_type.clone())
.await;
client_2
.open_collab(&workspace_id_2, &object_id_2, collab_type.clone())
.await;
client_1
.insert_into(&object_id_1, "title", "I am client 1")
.await;
client_1
.wait_object_sync_complete(&object_id_1)
.await
.unwrap();
client_2
.insert_into(&object_id_2, "title", "I am client 2")
.await;
client_2
.wait_object_sync_complete(&object_id_2)
.await
.unwrap();
assert_server_collab(
&workspace_id_1,
&mut client_1.api_client,
&object_id_1,
&collab_type,
10,
json!( {
"title": "I am client 1"
}),
)
.await
.unwrap();
assert_server_collab(
&workspace_id_2,
&mut client_2.api_client,
&object_id_2,
&collab_type,
10,
json!( {
"title": "I am client 2"
}),
)
.await
.unwrap();
}
#[tokio::test]
async fn simulate_multiple_user_edit_collab_test() {
let mut tasks = Vec::new();
for _i in 0..5 {
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user().await;
let collab_type = CollabType::Unknown;
let workspace_id = new_user.workspace_id().await;
let object_id = Uuid::new_v4().to_string();
new_user
.open_collab(&workspace_id, &object_id, collab_type.clone())
.await;
let random_str = generate_random_string(200);
new_user
.insert_into(&object_id, "string", random_str.clone())
.await;
let expected_json = json!({
"string": random_str
});
new_user
.wait_object_sync_complete(&object_id)
.await
.unwrap();
let json = (*new_user
.collabs
.get(&object_id)
.unwrap()
.collab
.read()
.await)
.to_json_value();
(expected_json, json)
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
for result in results {
let (expected_json, json) = result.unwrap();
assert_json_eq!(expected_json, json);
}
}
#[tokio::test]
async fn post_realtime_message_test() {
let mut tasks = Vec::new();
let big_text = generate_random_string(64 * 1024);
for _i in 0..5 {
let cloned_text = big_text.clone();
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user().await;
// sleep 2 secs to make sure it do not trigger register user too fast in gotrue
sleep(Duration::from_secs(2)).await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = new_user.workspace_id().await;
let doc_state = make_big_collab_doc_state(&object_id, "text", cloned_text);
// the big doc_state will force the init_sync using the http request.
// It will trigger the POST_REALTIME_MESSAGE_STREAM_HANDLER to handle the request.
new_user
.open_collab_with_doc_state(&workspace_id, &object_id, CollabType::Unknown, doc_state)
.await;
new_user
.wait_object_sync_complete(&object_id)
.await
.unwrap();
(new_user, object_id, workspace_id)
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
for result in results.into_iter() {
let (mut client, object_id, workspace_id) = result.unwrap();
assert_server_collab(
&workspace_id,
&mut client.api_client,
&object_id,
&CollabType::Document,
10,
json!({
"text": big_text
}),
)
.await
.unwrap();
drop(client);
}
}
#[tokio::test]
async fn post_realtime_message_without_ws_connect_test() {
let client = Arc::new(TestClient::new_user_without_ws_conn().await);
let mut handles = vec![];
// try to post 10 realtime message without connect to the websocket server.
for _ in 0..10 {
let cloned_client = client.clone();
let handle = tokio::spawn(async move {
let message = RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(UpdateSync::new(
CollabOrigin::Empty,
uuid::Uuid::new_v4().to_string(),
generate_random_bytes(1024),
1,
)))
.encode()
.unwrap();
cloned_client.post_realtime_binary(message).await.unwrap();
});
handles.push(handle);
}
for result in futures::future::join_all(handles).await {
result.unwrap();
}
}
#[tokio::test]
async fn post_realtime_message_with_ws_connect_test() {
let client = Arc::new(TestClient::new_user().await);
let message = RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(UpdateSync::new(
CollabOrigin::Empty,
uuid::Uuid::new_v4().to_string(),
generate_random_bytes(1024),
1,
)))
.encode()
.unwrap();
client.post_realtime_binary(message).await.unwrap();
}
#[tokio::test]
async fn simulate_10_offline_user_connect_and_then_sync_document_test() {
let text = generate_random_string(1024 * 1024 * 3);
let mut tasks = Vec::new();
for i in 0..10 {
let cloned_text = text.clone();
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user_without_ws_conn().await;
// sleep to make sure it do not trigger register user too fast in gotrue
sleep(Duration::from_secs(i % 5)).await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = new_user.workspace_id().await;
let doc_state = make_big_collab_doc_state(&object_id, "text", cloned_text);
new_user
.open_collab_with_doc_state(&workspace_id, &object_id, CollabType::Unknown, doc_state)
.await;
(new_user, object_id)
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
let mut tasks = Vec::new();
for result in results.into_iter() {
let task = tokio::spawn(async move {
let (client, object_id) = result.unwrap();
client.reconnect().await;
client.wait_object_sync_complete(&object_id).await.unwrap();
for i in 0..100 {
client
.insert_into(&object_id, &i.to_string(), i.to_string())
.await;
sleep(Duration::from_millis(60)).await;
}
client.wait_object_sync_complete(&object_id).await.unwrap();
});
tasks.push(task);
}
let results = futures::future::join_all(tasks).await;
for result in results {
result.unwrap()
}
}
#[tokio::test]
async fn offline_and_then_sync_through_http_request() {
let mut test_client = TestClient::new_user().await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = test_client.workspace_id().await;
let doc_state = make_big_collab_doc_state(&object_id, "1", "".to_string());
test_client
.open_collab_with_doc_state(&workspace_id, &object_id, CollabType::Unknown, doc_state)
.await;
test_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
test_client.disconnect().await;
// Verify server hasn't received small text update while offline
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&CollabType::Unknown,
10,
json!({"1":""}),
)
.await
.unwrap();
// First insertion - small text
let small_text = generate_random_string(100);
test_client
.insert_into(&object_id, "1", small_text.clone())
.await;
// Sync small text changes
let encode_collab = test_client
.collabs
.get(&object_id)
.unwrap()
.encode_collab()
.await;
test_client
.api_client
.collab_full_sync(
&workspace_id,
&object_id,
CollabType::Unknown,
encode_collab.doc_state.to_vec(),
encode_collab.state_vector.to_vec(),
)
.await
.unwrap();
// Verify server still has only small text
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&CollabType::Unknown,
10,
json!({"1": small_text.clone()}),
)
.await
.unwrap();
// Second insertion - medium text
let medium_text = generate_random_string(512);
test_client
.insert_into(&object_id, "2", medium_text.clone())
.await;
// Sync medium text changes
let encode_collab = test_client
.collabs
.get(&object_id)
.unwrap()
.encode_collab()
.await;
test_client
.api_client
.collab_full_sync(
&workspace_id,
&object_id,
CollabType::Unknown,
encode_collab.doc_state.to_vec(),
encode_collab.state_vector.to_vec(),
)
.await
.unwrap();
// Verify medium text was synced
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&CollabType::Unknown,
10,
json!({"1": small_text, "2": medium_text}),
)
.await
.unwrap();
}
#[tokio::test]
async fn insert_text_through_http_post_request() {
let mut test_client = TestClient::new_user().await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = test_client.workspace_id().await;
let doc_state = make_big_collab_doc_state(&object_id, "1", "".to_string());
test_client
.open_collab_with_doc_state(&workspace_id, &object_id, CollabType::Unknown, doc_state)
.await;
test_client
.wait_object_sync_complete(&object_id)
.await
.unwrap();
test_client.disconnect().await;
let mut final_text = HashMap::new();
for i in 0..1000 {
let key = i.to_string();
let text = generate_random_string(10);
test_client
.insert_into(&object_id, &key, text.clone())
.await;
final_text.insert(key, text);
if i % 100 == 0 {
tokio::time::sleep(tokio::time::Duration::from_millis(300)).await;
}
}
let encode_collab = test_client
.collabs
.get(&object_id)
.unwrap()
.encode_collab()
.await;
test_client
.api_client
.collab_full_sync(
&workspace_id,
&object_id,
CollabType::Unknown,
encode_collab.doc_state.to_vec(),
encode_collab.state_vector.to_vec(),
)
.await
.unwrap();
assert_server_collab(
&workspace_id,
&mut test_client.api_client,
&object_id,
&CollabType::Unknown,
10,
json!(final_text),
)
.await
.unwrap();
}