Merge pull request #853 from AppFlowy-IO/stateless

Stateless collab group
This commit is contained in:
Bartosz Sypytkowski 2024-12-27 13:00:44 +01:00 committed by GitHub
commit 48e038b9ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
75 changed files with 3057 additions and 837 deletions

View file

@ -69,7 +69,7 @@ jobs:
matrix:
include:
- test_service: "appflowy_cloud"
test_cmd: "--workspace --exclude appflowy-history --exclude appflowy-ai-client --features ai-test-enabled"
test_cmd: "--workspace --exclude appflowy-ai-client --features ai-test-enabled"
- test_service: "appflowy_worker"
test_cmd: "-p appflowy-worker"
- test_service: "admin_frontend"
@ -130,7 +130,7 @@ jobs:
- name: Run Tests
run: |
echo "Running tests for ${{ matrix.test_service }} with flags: ${{ matrix.test_cmd }}"
RUST_LOG="info" DISABLE_CI_TEST_LOG="true" cargo test ${{ matrix.test_cmd }}
RUST_LOG="info" DISABLE_CI_TEST_LOG="true" cargo test ${{ matrix.test_cmd }} -- --skip stress_test
- name: Server Logs
if: failure()

View file

@ -239,7 +239,6 @@ jobs:
if: always()
run: docker logout
appflowy_worker_image:
runs-on: ubuntu-22.04
env:

49
.github/workflows/stress_test.yml vendored Normal file
View file

@ -0,0 +1,49 @@
name: AppFlowy-Cloud Stress Test
on: [ pull_request ]
concurrency:
group: stress-test-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: false
env:
POSTGRES_HOST: localhost
REDIS_HOST: localhost
MINIO_HOST: localhost
SQLX_OFFLINE: true
RUST_TOOLCHAIN: "1.78"
jobs:
test:
name: Collab Stress Tests
runs-on: self-hosted-appflowy3
steps:
- name: Checkout Repository
uses: actions/checkout@v3
- name: Install Rust Toolchain
uses: dtolnay/rust-toolchain@stable
- name: Copy and Rename deploy.env to .env
run: cp deploy.env .env
- name: Replace Values in .env
run: |
sed -i '' 's|RUST_LOG=.*|RUST_LOG=debug|' .env
sed -i '' 's|API_EXTERNAL_URL=.*|API_EXTERNAL_URL=http://localhost:9999|' .env
sed -i '' 's|APPFLOWY_GOTRUE_BASE_URL=.*|APPFLOWY_GOTRUE_BASE_URL=http://localhost:9999|' .env
shell: bash
- name: Start Docker Compose Services
run: |
docker compose -f docker-compose-stress-test.yml up -d
docker ps -a
- name: Install Prerequisites
run: |
brew install protobuf
- name: Run Server and Test
run: |
cargo run --package xtask -- --stress-test

View file

@ -0,0 +1,65 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM af_collab_snapshot\n WHERE workspace_id = $1 AND oid = $2 AND deleted_at IS NULL\n ORDER BY created_at DESC\n LIMIT 1;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "sid",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "oid",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "blob",
"type_info": "Bytea"
},
{
"ordinal": 3,
"name": "len",
"type_info": "Int4"
},
{
"ordinal": 4,
"name": "encrypt",
"type_info": "Int4"
},
{
"ordinal": 5,
"name": "deleted_at",
"type_info": "Timestamptz"
},
{
"ordinal": 6,
"name": "workspace_id",
"type_info": "Uuid"
},
{
"ordinal": 7,
"name": "created_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": [
false,
false,
false,
false,
true,
true,
false,
false
]
},
"hash": "88516b9a2a424bc7697337d6f16b0d6e94b919597d709f930467423c5b4c0ec2"
}

26
Cargo.lock generated
View file

@ -638,6 +638,7 @@ dependencies = [
"derive_more",
"dotenvy",
"fancy-regex 0.11.0",
"flate2",
"futures",
"futures-lite",
"futures-util",
@ -716,6 +717,7 @@ dependencies = [
"anyhow",
"app-error",
"appflowy-ai-client",
"arc-swap",
"async-stream",
"async-trait",
"authentication",
@ -2354,11 +2356,15 @@ name = "collab-stream"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"bincode",
"bytes",
"chrono",
"collab",
"collab-entity",
"futures",
"loole",
"prost 0.13.3",
"rand 0.8.5",
"redis 0.25.4",
@ -2369,6 +2375,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"zstd 0.13.2",
]
[[package]]
@ -3323,9 +3330,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
@ -3354,9 +3361,9 @@ checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
@ -4514,6 +4521,16 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "loole"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2998397c725c822c6b2ba605fd9eb4c6a7a0810f1629ba3cc232ef4f0308d96"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "lru"
version = "0.12.4"
@ -8666,6 +8683,7 @@ name = "xtask"
version = "0.1.0"
dependencies = [
"anyhow",
"futures",
"tokio",
]

View file

@ -159,6 +159,7 @@ http.workspace = true
indexer.workspace = true
[dev-dependencies]
flate2 = "1.0"
once_cell = "1.19.0"
tempfile = "3.9.0"
assert-json-diff = "2.0.2"
@ -214,7 +215,6 @@ members = [
"libs/appflowy-ai-client",
"libs/client-api-entity",
# services
#"services/appflowy-history",
"services/appflowy-collaborate",
"services/appflowy-worker",
# xtask
@ -283,6 +283,7 @@ sanitize-filename = "0.5.0"
base64 = "0.22"
md5 = "0.7.0"
pin-project = "1.1.5"
arc-swap = { version = "1.7" }
validator = "0.19"
zstd = { version = "0.13.2", features = [] }
chrono = { version = "0.4.39", features = [

View file

@ -4,7 +4,7 @@
# PostgreSQL Settings
POSTGRES_HOST=postgres
POSTGRES_USER=postgres
POSTGRES_PASSWORD=changepassword
POSTGRES_PASSWORD=password
POSTGRES_PORT=5432
POSTGRES_DB=postgres
@ -15,6 +15,10 @@ SUPABASE_PASSWORD=root
REDIS_HOST=redis
REDIS_PORT=6379
# Minio Host
MINIO_HOST=minio
MINIO_PORT=9000
# AppFlowy Cloud
## URL that connects to the gotrue docker container
APPFLOWY_GOTRUE_BASE_URL=http://gotrue:9999
@ -69,11 +73,12 @@ GOTRUE_DISABLE_SIGNUP=false
# If you are using a different domain, you need to change the redirect_uri in the OAuth2 configuration
# Make sure that this domain is accessible to the user
# Make sure no endswith /
# Replace with your host name instead of localhost
API_EXTERNAL_URL=http://your-host
# In docker environment, `postgres` is the hostname of the postgres service
# GoTrue connect to postgres using this url
GOTRUE_DATABASE_URL=postgres://supabase_auth_admin:${SUPABASE_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
GOTRUE_DATABASE_URL=postgres://supabase_auth_admin:${SUPABASE_PASSWORD}@postgres:${POSTGRES_PORT}/${POSTGRES_DB}
# Refer to this for details: https://github.com/AppFlowy-IO/AppFlowy-Cloud/blob/main/doc/AUTHENTICATION.md
# Google OAuth2
@ -105,7 +110,7 @@ APPFLOWY_S3_CREATE_BUCKET=true
# By default, Minio is used as the default file storage which uses host's file system.
# Keep this as true if you are using other S3 compatible storage provider other than AWS.
APPFLOWY_S3_USE_MINIO=true
APPFLOWY_S3_MINIO_URL=http://minio:9000 # change this if you are using a different address for minio
APPFLOWY_S3_MINIO_URL=http://${MINIO_HOST}:${MINIO_PORT} # change this if you are using a different address for minio
APPFLOWY_S3_ACCESS_KEY=minioadmin
APPFLOWY_S3_SECRET_KEY=minioadmin
APPFLOWY_S3_BUCKET=appflowy

View file

@ -0,0 +1,97 @@
services:
nginx:
restart: on-failure
image: nginx
ports:
- 80:80 # Disable this if you are using TLS
- 443:443
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/ssl/certificate.crt:/etc/nginx/ssl/certificate.crt
- ./nginx/ssl/private_key.key:/etc/nginx/ssl/private_key.key
minio:
restart: on-failure
image: minio/minio
ports:
- 9000:9000
- 9001:9001
environment:
- MINIO_BROWSER_REDIRECT_URL=http://localhost:9001
command: server /data --console-address ":9001"
postgres:
restart: on-failure
image: pgvector/pgvector:pg16
environment:
- POSTGRES_USER=${POSTGRES_USER:-postgres}
- POSTGRES_DB=${POSTGRES_DB:-postgres}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password}
- POSTGRES_HOST=${POSTGRES_HOST:-postgres}
- SUPABASE_USER=${SUPABASE_USER:-supabase_auth_admin}
- SUPABASE_PASSWORD=${SUPABASE_PASSWORD:-root}
ports:
- 5432:5432
volumes:
- ./migrations/before:/docker-entrypoint-initdb.d
# comment out the following line if you want to persist data when restarting docker
#- postgres_data:/var/lib/postgresql/data
redis:
restart: on-failure
image: redis
ports:
- 6379:6379
gotrue:
restart: on-failure
image: supabase/gotrue:v2.159.1
depends_on:
- postgres
environment:
# Gotrue config: https://github.com/supabase/gotrue/blob/master/example.env
- GOTRUE_SITE_URL=appflowy-flutter:// # redirected to AppFlowy application
- URI_ALLOW_LIST=* # adjust restrict if necessary
- GOTRUE_JWT_SECRET=${GOTRUE_JWT_SECRET} # authentication secret
- GOTRUE_JWT_EXP=${GOTRUE_JWT_EXP}
- GOTRUE_DB_DRIVER=postgres
- API_EXTERNAL_URL=${API_EXTERNAL_URL}
- DATABASE_URL=${GOTRUE_DATABASE_URL}
- PORT=9999
- GOTRUE_MAILER_URLPATHS_CONFIRMATION=/verify
- GOTRUE_SMTP_HOST=${GOTRUE_SMTP_HOST} # e.g. smtp.gmail.com
- GOTRUE_SMTP_PORT=${GOTRUE_SMTP_PORT} # e.g. 465
- GOTRUE_SMTP_USER=${GOTRUE_SMTP_USER} # email sender, e.g. noreply@appflowy.io
- GOTRUE_SMTP_PASS=${GOTRUE_SMTP_PASS} # email password
- GOTRUE_SMTP_ADMIN_EMAIL=${GOTRUE_SMTP_ADMIN_EMAIL} # email with admin privileges e.g. internal@appflowy.io
- GOTRUE_SMTP_MAX_FREQUENCY=${GOTRUE_SMTP_MAX_FREQUENCY:-1ns} # set to 1ns for running tests
- GOTRUE_RATE_LIMIT_EMAIL_SENT=${GOTRUE_RATE_LIMIT_EMAIL_SENT:-100} # number of email sendable per minute
- GOTRUE_MAILER_AUTOCONFIRM=${GOTRUE_MAILER_AUTOCONFIRM:-false} # change this to true to skip email confirmation
# Google OAuth config
- GOTRUE_EXTERNAL_GOOGLE_ENABLED=${GOTRUE_EXTERNAL_GOOGLE_ENABLED}
- GOTRUE_EXTERNAL_GOOGLE_CLIENT_ID=${GOTRUE_EXTERNAL_GOOGLE_CLIENT_ID}
- GOTRUE_EXTERNAL_GOOGLE_SECRET=${GOTRUE_EXTERNAL_GOOGLE_SECRET}
- GOTRUE_EXTERNAL_GOOGLE_REDIRECT_URI=${GOTRUE_EXTERNAL_GOOGLE_REDIRECT_URI}
# Apple OAuth config
- GOTRUE_EXTERNAL_APPLE_ENABLED=${GOTRUE_EXTERNAL_APPLE_ENABLED}
- GOTRUE_EXTERNAL_APPLE_CLIENT_ID=${GOTRUE_EXTERNAL_APPLE_CLIENT_ID}
- GOTRUE_EXTERNAL_APPLE_SECRET=${GOTRUE_EXTERNAL_APPLE_SECRET}
- GOTRUE_EXTERNAL_APPLE_REDIRECT_URI=${GOTRUE_EXTERNAL_APPLE_REDIRECT_URI}
# GITHUB OAuth config
- GOTRUE_EXTERNAL_GITHUB_ENABLED=${GOTRUE_EXTERNAL_GITHUB_ENABLED}
- GOTRUE_EXTERNAL_GITHUB_CLIENT_ID=${GOTRUE_EXTERNAL_GITHUB_CLIENT_ID}
- GOTRUE_EXTERNAL_GITHUB_SECRET=${GOTRUE_EXTERNAL_GITHUB_SECRET}
- GOTRUE_EXTERNAL_GITHUB_REDIRECT_URI=${GOTRUE_EXTERNAL_GITHUB_REDIRECT_URI}
# Discord OAuth config
- GOTRUE_EXTERNAL_DISCORD_ENABLED=${GOTRUE_EXTERNAL_DISCORD_ENABLED}
- GOTRUE_EXTERNAL_DISCORD_CLIENT_ID=${GOTRUE_EXTERNAL_DISCORD_CLIENT_ID}
- GOTRUE_EXTERNAL_DISCORD_SECRET=${GOTRUE_EXTERNAL_DISCORD_SECRET}
- GOTRUE_EXTERNAL_DISCORD_REDIRECT_URI=${GOTRUE_EXTERNAL_DISCORD_REDIRECT_URI}
# Prometheus Metrics
- GOTRUE_METRICS_ENABLED=true
- GOTRUE_METRICS_EXPORTER=prometheus
- GOTRUE_MAILER_TEMPLATES_CONFIRMATION=${GOTRUE_MAILER_TEMPLATES_CONFIRMATION}
ports:
- 9999:9999
volumes:
postgres_data:

View file

@ -40,7 +40,7 @@ serde_json.workspace = true
serde.workspace = true
app-error = { workspace = true, features = ["tokio_error", "bincode_error"] }
scraper = { version = "0.17.1", optional = true }
arc-swap = "1.7"
arc-swap.workspace = true
shared-entity = { workspace = true }
collab-rt-entity = { workspace = true }

View file

@ -185,7 +185,7 @@ where
_event: &Event,
update: &AwarenessUpdate,
) {
let payload = Message::Awareness(update.clone()).encode_v1();
let payload = Message::Awareness(update.encode_v1()).encode_v1();
self.sync_queue.queue_msg(|msg_id| {
let update_sync = UpdateSync::new(origin.clone(), object_id.to_string(), payload, msg_id);
if cfg!(feature = "sync_verbose_log") {

View file

@ -199,7 +199,11 @@ where
return Ok(false);
}
trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
tracing::debug!(
"🔥{} restart sync due to missing update, reason:{}",
&sync_object.object_id,
reason
);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
sink.queue_init_sync(|msg_id| {
@ -236,8 +240,8 @@ where
SyncReason::CollabInitialize
| SyncReason::ServerCannotApplyUpdate
| SyncReason::NetworkResume => {
trace!(
"🔥{} start sync, reason: {}",
tracing::debug!(
"🔥{} resume network, reason: {}",
&sync_object.object_id,
reason
);

View file

@ -245,12 +245,13 @@ impl AckMeta {
impl Display for CollabAck {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}]",
"ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}|seq_nr:{}]",
self.origin.client_user_id().unwrap_or(0),
self.object_id,
self.msg_id,
self.payload.len(),
self.code,
self.seq_num
))
}
}

View file

@ -1,6 +1,5 @@
use std::fmt::{Debug, Display, Formatter};
use collab::core::awareness::AwarenessUpdate;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use yrs::updates::decoder::{Decode, Decoder};
@ -22,7 +21,7 @@ pub const PERMISSION_GRANTED: u8 = 1;
pub enum Message {
Sync(SyncMessage),
Auth(Option<String>),
Awareness(AwarenessUpdate),
Awareness(Vec<u8>),
Custom(CustomMessage),
}
@ -44,7 +43,7 @@ impl Encode for Message {
},
Message::Awareness(update) => {
encoder.write_var(MSG_AWARENESS);
encoder.write_buf(update.encode_v1())
encoder.write_buf(update)
},
Message::Custom(msg) => {
encoder.write_var(MSG_CUSTOM);
@ -64,8 +63,7 @@ impl Decode for Message {
},
MSG_AWARENESS => {
let data = decoder.read_buf()?;
let update = AwarenessUpdate::decode_v1(data)?;
Ok(Message::Awareness(update))
Ok(Message::Awareness(data.into()))
},
MSG_AUTH => {
let reason = if decoder.read_var::<u8>()? == PERMISSION_DENIED {

View file

@ -93,6 +93,7 @@ pub trait CollabSyncProtocol {
Message::Auth(reason) => self.handle_auth(collab, reason).await,
//FIXME: where is the QueryAwareness protocol?
Message::Awareness(update) => {
let update = AwarenessUpdate::decode_v1(&update)?;
self
.handle_awareness_update(message_origin, collab, update)
.await
@ -117,7 +118,7 @@ pub trait CollabSyncProtocol {
.map_err(|e| RTProtocolError::YrsTransaction(e.to_string()))?
.state_vector();
let awareness_update = awareness.update()?;
(state_vector, awareness_update)
(state_vector, awareness_update.encode_v1())
};
// 1. encode doc state vector
@ -220,14 +221,14 @@ pub trait CollabSyncProtocol {
}
}
const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB
pub const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB
#[inline]
pub async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
pub async fn decode_update(update: Vec<u8>) -> Result<Update, yrs::encoding::read::Error> {
let update = if update.len() > LARGE_UPDATE_THRESHOLD {
spawn_blocking(move || Update::decode_v1(&update))
.await
.map_err(|err| RTProtocolError::Internal(err.into()))?
.map_err(|err| yrs::encoding::read::Error::Custom(err.to_string()))?
} else {
Update::decode_v1(&update)
}?;

View file

@ -16,12 +16,16 @@ tracing = "0.1"
serde = { version = "1", features = ["derive"] }
bincode = "1.3.3"
bytes.workspace = true
collab.workspace = true
collab-entity.workspace = true
serde_json.workspace = true
chrono = "0.4"
tokio-util = { version = "0.7" }
prost.workspace = true
async-stream.workspace = true
async-trait.workspace = true
zstd = "0.13"
loole = "0.4.0"
[dev-dependencies]
futures = "0.3.30"

View file

@ -1,29 +1,61 @@
use crate::error::StreamError;
use crate::pubsub::{CollabStreamPub, CollabStreamSub};
use crate::stream::CollabStream;
use crate::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
use crate::error::{internal, StreamError};
use crate::lease::{Lease, LeaseAcquisition};
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
use crate::stream_group::{StreamConfig, StreamGroup};
use crate::stream_router::{StreamRouter, StreamRouterOptions};
use futures::Stream;
use redis::aio::ConnectionManager;
use redis::streams::StreamReadReply;
use redis::{AsyncCommands, FromRedisValue};
use std::sync::Arc;
use std::time::Duration;
use tracing::error;
pub const CONTROL_STREAM_KEY: &str = "af_collab_control";
#[derive(Clone)]
pub struct CollabRedisStream {
connection_manager: ConnectionManager,
stream_router: Arc<StreamRouter>,
}
impl CollabRedisStream {
pub const LEASE_TTL: Duration = Duration::from_secs(60);
pub async fn new(redis_client: redis::Client) -> Result<Self, redis::RedisError> {
let router_options = StreamRouterOptions {
worker_count: 60,
xread_streams: 100,
xread_block_millis: Some(5000),
xread_count: None,
};
let stream_router = Arc::new(StreamRouter::with_options(&redis_client, router_options)?);
let connection_manager = redis_client.get_connection_manager().await?;
Ok(Self::new_with_connection_manager(connection_manager))
Ok(Self::new_with_connection_manager(
connection_manager,
stream_router,
))
}
pub fn new_with_connection_manager(connection_manager: ConnectionManager) -> Self {
Self { connection_manager }
pub fn new_with_connection_manager(
connection_manager: ConnectionManager,
stream_router: Arc<StreamRouter>,
) -> Self {
Self {
connection_manager,
stream_router,
}
}
pub async fn stream(&self, workspace_id: &str, oid: &str) -> CollabStream {
CollabStream::new(workspace_id, oid, self.connection_manager.clone())
pub async fn lease(
&self,
workspace_id: &str,
object_id: &str,
) -> Result<Option<LeaseAcquisition>, StreamError> {
let lease_key = format!("af:{}:{}:snapshot_lease", workspace_id, object_id);
self
.connection_manager
.lease(lease_key, Self::LEASE_TTL)
.await
}
pub async fn collab_control_stream(
@ -46,7 +78,7 @@ impl CollabRedisStream {
Ok(group)
}
pub async fn collab_update_stream(
pub async fn collab_update_stream_group(
&self,
workspace_id: &str,
oid: &str,
@ -66,29 +98,106 @@ impl CollabRedisStream {
group.ensure_consumer_group().await?;
Ok(group)
}
}
pub struct PubSubClient {
redis_client: redis::Client,
connection_manager: ConnectionManager,
}
impl PubSubClient {
pub async fn new(redis_client: redis::Client) -> Result<Self, redis::RedisError> {
let connection_manager = redis_client.get_connection_manager().await?;
Ok(Self {
redis_client,
connection_manager,
})
pub fn collab_update_sink(&self, workspace_id: &str, object_id: &str) -> CollabUpdateSink {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
CollabUpdateSink::new(self.connection_manager.clone(), stream_key)
}
pub async fn collab_pub(&self) -> CollabStreamPub {
CollabStreamPub::new(self.connection_manager.clone())
pub fn awareness_update_sink(&self, workspace_id: &str, object_id: &str) -> AwarenessUpdateSink {
let stream_key = AwarenessStreamUpdate::stream_key(workspace_id, object_id);
AwarenessUpdateSink::new(self.connection_manager.clone(), stream_key)
}
#[allow(deprecated)]
pub async fn collab_sub(&self) -> Result<CollabStreamSub, StreamError> {
let conn = self.redis_client.get_async_connection().await?;
Ok(CollabStreamSub::new(conn))
/// Reads all collab updates for a given `workspace_id`:`object_id` entry, starting
/// from a given message id. Once Redis stream return no more results, the stream will be closed.
pub async fn current_collab_updates(
&self,
workspace_id: &str,
object_id: &str,
since: Option<MessageId>,
) -> Result<Vec<(MessageId, CollabStreamUpdate)>, StreamError> {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
let since = since.unwrap_or_default().to_string();
let mut conn = self.connection_manager.clone();
let mut result = Vec::new();
let mut reply: StreamReadReply = conn.xread(&[&stream_key], &[&since]).await?;
if let Some(key) = reply.keys.pop() {
if key.key == stream_key {
for stream_id in key.ids {
let message_id = MessageId::try_from(stream_id.id)?;
let stream_update = CollabStreamUpdate::try_from(stream_id.map)?;
result.push((message_id, stream_update));
}
}
}
Ok(result)
}
/// Reads all collab updates for a given `workspace_id`:`object_id` entry, starting
/// from a given message id. This stream will be kept alive and pass over all future messages
/// coming from corresponding Redis stream until explicitly closed.
pub fn live_collab_updates(
&self,
workspace_id: &str,
object_id: &str,
since: Option<MessageId>,
) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
let since = since.map(|id| id.to_string());
let mut reader = self.stream_router.observe(stream_key, since);
async_stream::try_stream! {
while let Some((message_id, fields)) = reader.recv().await {
tracing::trace!("incoming collab update `{}`", message_id);
let message_id = MessageId::try_from(message_id).map_err(|e| internal(e.to_string()))?;
let collab_update = CollabStreamUpdate::try_from(fields)?;
yield (message_id, collab_update);
}
}
}
pub fn awareness_updates(
&self,
workspace_id: &str,
object_id: &str,
since: Option<MessageId>,
) -> impl Stream<Item = Result<AwarenessStreamUpdate, StreamError>> {
let stream_key = AwarenessStreamUpdate::stream_key(workspace_id, object_id);
let since = since.map(|id| id.to_string());
let mut reader = self.stream_router.observe(stream_key, since);
async_stream::try_stream! {
while let Some((message_id, fields)) = reader.recv().await {
tracing::trace!("incoming awareness update `{}`", message_id);
let awareness_update = AwarenessStreamUpdate::try_from(fields)?;
yield awareness_update;
}
}
}
pub async fn prune_stream(
&self,
stream_key: &str,
mut message_id: MessageId,
) -> Result<usize, StreamError> {
let mut conn = self.connection_manager.clone();
// we want to delete everything <= message_id
message_id.sequence_number += 1;
let value = conn
.send_packed_command(
redis::cmd("XTRIM")
.arg(stream_key)
.arg("MINID")
.arg(format!("{}", message_id)),
)
.await?;
let count = usize::from_redis_value(&value)?;
drop(conn);
tracing::debug!(
"pruned redis stream `{}` <= `{}` ({} objects)",
stream_key,
message_id,
count
);
Ok(count)
}
}

View file

@ -0,0 +1,66 @@
use crate::error::StreamError;
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
use redis::aio::ConnectionManager;
use redis::cmd;
use tokio::sync::Mutex;
pub struct CollabUpdateSink {
conn: Mutex<ConnectionManager>,
stream_key: String,
}
impl CollabUpdateSink {
pub fn new(conn: ConnectionManager, stream_key: String) -> Self {
CollabUpdateSink {
conn: conn.into(),
stream_key,
}
}
pub async fn send(&self, msg: &CollabStreamUpdate) -> Result<MessageId, StreamError> {
let mut lock = self.conn.lock().await;
let msg_id: MessageId = cmd("XADD")
.arg(&self.stream_key)
.arg("*")
.arg("flags")
.arg(msg.flags)
.arg("sender")
.arg(msg.sender.to_string())
.arg("data")
.arg(&*msg.data)
.query_async(&mut *lock)
.await?;
Ok(msg_id)
}
}
pub struct AwarenessUpdateSink {
conn: Mutex<ConnectionManager>,
stream_key: String,
}
impl AwarenessUpdateSink {
pub fn new(conn: ConnectionManager, stream_key: String) -> Self {
AwarenessUpdateSink {
conn: conn.into(),
stream_key,
}
}
pub async fn send(&self, msg: &AwarenessStreamUpdate) -> Result<MessageId, StreamError> {
let mut lock = self.conn.lock().await;
let msg_id: MessageId = cmd("XADD")
.arg(&self.stream_key)
.arg("MAXLEN")
.arg("~")
.arg(100) // we cap awareness stream to at most 20 awareness updates
.arg("*")
.arg("sender")
.arg(msg.sender.to_string())
.arg("data")
.arg(&*msg.data)
.query_async(&mut *lock)
.await?;
Ok(msg_id)
}
}

View file

@ -32,6 +32,12 @@ pub enum StreamError {
#[error(transparent)]
BinCodeSerde(#[from] bincode::Error),
#[error("failed to decode update: {0}")]
UpdateError(#[from] collab::preclude::encoding::read::Error),
#[error("I/O error: {0}")]
IO(#[from] std::io::Error),
#[error("Internal error: {0}")]
Internal(anyhow::Error),
}

View file

@ -0,0 +1,145 @@
use crate::error::StreamError;
use async_trait::async_trait;
use redis::aio::ConnectionManager;
use redis::Value;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
const RELEASE_SCRIPT: &str = r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"#;
pub struct LeaseAcquisition {
conn: Option<ConnectionManager>,
stream_key: String,
token: u128,
}
impl LeaseAcquisition {
pub async fn release(&mut self) -> Result<bool, StreamError> {
if let Some(conn) = self.conn.take() {
Self::release_internal(conn, &self.stream_key, self.token).await
} else {
Ok(false)
}
}
async fn release_internal<S: AsRef<str>>(
mut conn: ConnectionManager,
stream_key: S,
token: u128,
) -> Result<bool, StreamError> {
let script = redis::Script::new(RELEASE_SCRIPT);
let result: i32 = script
.key(stream_key.as_ref())
.arg(token.to_le_bytes().as_slice())
.invoke_async(&mut conn)
.await?;
Ok(result == 1)
}
}
impl Drop for LeaseAcquisition {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let stream_key = self.stream_key.clone();
let token = self.token;
tokio::spawn(async move {
if let Err(err) = Self::release_internal(conn, stream_key, token).await {
tracing::error!("error while releasing lease (drop): {}", err);
}
});
}
}
}
/// This is Redlock algorithm implementation.
/// See: https://redis.io/docs/latest/commands/set#patterns
#[async_trait]
pub trait Lease {
/// Attempt to acquire lease on a stream for a given time-to-live.
/// Returns `None` if the lease could not be acquired.
async fn lease(
&self,
stream_key: String,
ttl: Duration,
) -> Result<Option<LeaseAcquisition>, StreamError>;
}
#[async_trait]
impl Lease for ConnectionManager {
async fn lease(
&self,
stream_key: String,
ttl: Duration,
) -> Result<Option<LeaseAcquisition>, StreamError> {
let mut conn = self.clone();
let ttl = ttl.as_millis() as u64;
let token = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
tracing::trace!("acquiring lease `{}` for {}ms", stream_key, ttl);
let result: Value = redis::cmd("SET")
.arg(&stream_key)
.arg(token.to_le_bytes().as_slice())
.arg("NX")
.arg("PX")
.arg(ttl)
.query_async(&mut conn)
.await?;
match result {
Value::Okay => Ok(Some(LeaseAcquisition {
conn: Some(conn),
stream_key,
token,
})),
o => {
tracing::trace!("lease locked: {:?}", o);
Ok(None)
},
}
}
}
#[cfg(test)]
mod test {
use crate::lease::Lease;
use redis::Client;
#[tokio::test]
async fn lease_acquisition() {
let redis_client = Client::open("redis://localhost:6379").unwrap();
let conn = redis_client.get_connection_manager().await.unwrap();
let l1 = conn
.lease("stream1".into(), std::time::Duration::from_secs(1))
.await
.unwrap();
assert!(l1.is_some(), "should successfully acquire lease");
let l2 = conn
.lease("stream1".into(), std::time::Duration::from_secs(1))
.await
.unwrap();
assert!(l2.is_none(), "should fail to acquire lease");
l1.unwrap().release().await.unwrap();
let l3 = conn
.lease("stream1".into(), std::time::Duration::from_secs(1))
.await
.unwrap();
assert!(
l3.is_some(),
"should successfully acquire lease after it was released"
);
}
}

View file

@ -1,6 +1,8 @@
pub mod client;
pub mod collab_update_sink;
pub mod error;
pub mod lease;
pub mod model;
pub mod pubsub;
pub mod stream;
pub mod stream_group;
pub mod stream_router;

View file

@ -1,12 +1,14 @@
use crate::error::{internal, StreamError};
use bytes::Bytes;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::updates::decoder::Decode;
use collab_entity::proto::collab::collab_update_event::Update;
use collab_entity::{proto, CollabType};
use prost::Message;
use redis::streams::StreamId;
use redis::{FromRedisValue, RedisError, RedisResult, Value};
use redis::{FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::str::FromStr;
@ -20,12 +22,21 @@ use std::str::FromStr;
///
/// An example message ID might look like this: 1631020452097-0. In this example, 1631020452097 is
/// the timestamp in milliseconds, and 0 is the sequence number.
#[derive(Debug, Clone)]
#[derive(Debug, Copy, Clone, Default, Ord, PartialOrd, Eq, PartialEq)]
pub struct MessageId {
pub timestamp_ms: u64,
pub sequence_number: u16,
}
impl MessageId {
pub fn new(timestamp_ms: u64, sequence_number: u16) -> Self {
MessageId {
timestamp_ms,
sequence_number,
}
}
}
impl Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}", self.timestamp_ms, self.sequence_number)
@ -355,8 +366,218 @@ impl TryFrom<CollabUpdateEvent> for StreamBinary {
}
}
pub struct CollabStreamUpdate {
pub data: Vec<u8>, // yrs::Update::encode_v1
pub sender: CollabOrigin,
pub flags: UpdateFlags,
}
impl CollabStreamUpdate {
pub fn new<B, F>(data: B, sender: CollabOrigin, flags: F) -> Self
where
B: Into<Vec<u8>>,
F: Into<UpdateFlags>,
{
CollabStreamUpdate {
data: data.into(),
sender,
flags: flags.into(),
}
}
/// Returns Redis stream key, that's storing entries mapped to/from [CollabStreamUpdate].
pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
// use `:` separator as it adheres to Redis naming conventions
format!("af:{}:{}:updates", workspace_id, object_id)
}
pub fn into_update(self) -> Result<collab::preclude::Update, StreamError> {
let bytes = if self.flags.is_compressed() {
zstd::decode_all(std::io::Cursor::new(self.data))?
} else {
self.data
};
let update = if self.flags.is_v1_encoded() {
collab::preclude::Update::decode_v1(&bytes)?
} else {
collab::preclude::Update::decode_v2(&bytes)?
};
Ok(update)
}
}
impl TryFrom<HashMap<String, redis::Value>> for CollabStreamUpdate {
type Error = StreamError;
fn try_from(fields: HashMap<String, Value>) -> Result<Self, Self::Error> {
let sender = match fields.get("sender") {
None => CollabOrigin::Empty,
Some(sender) => {
let raw_origin = String::from_redis_value(sender)?;
collab_origin_from_str(&raw_origin)?
},
};
let flags = match fields.get("flags") {
None => UpdateFlags::default(),
Some(flags) => u8::from_redis_value(flags).unwrap_or(0).into(),
};
let data_raw = fields
.get("data")
.ok_or_else(|| internal("expecting field `data`"))?;
let data: Vec<u8> = FromRedisValue::from_redis_value(data_raw)?;
Ok(CollabStreamUpdate {
data,
sender,
flags,
})
}
}
pub struct AwarenessStreamUpdate {
pub data: Vec<u8>, // AwarenessUpdate::encode_v1
pub sender: CollabOrigin,
}
impl AwarenessStreamUpdate {
/// Returns Redis stream key, that's storing entries mapped to/from [AwarenessStreamUpdate].
pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
format!("af:{}:{}:awareness", workspace_id, object_id)
}
}
impl TryFrom<HashMap<String, redis::Value>> for AwarenessStreamUpdate {
type Error = StreamError;
fn try_from(fields: HashMap<String, Value>) -> Result<Self, Self::Error> {
let sender = match fields.get("sender") {
None => CollabOrigin::Empty,
Some(sender) => {
let raw_origin = String::from_redis_value(sender)?;
collab_origin_from_str(&raw_origin)?
},
};
let data_raw = fields
.get("data")
.ok_or_else(|| internal("expecting field `data`"))?;
let data: Vec<u8> = FromRedisValue::from_redis_value(data_raw)?;
Ok(AwarenessStreamUpdate { data, sender })
}
}
//FIXME: this should be `impl FromStr for CollabOrigin`
fn collab_origin_from_str(value: &str) -> RedisResult<CollabOrigin> {
match value {
"" => Ok(CollabOrigin::Empty),
"server" => Ok(CollabOrigin::Server),
other => {
let mut split = other.split('|');
match (split.next(), split.next()) {
(Some(uid), Some(device_id)) | (Some(device_id), Some(uid))
if uid.starts_with("uid:") && device_id.starts_with("device_id:") =>
{
let uid = uid.trim_start_matches("uid:");
let device_id = device_id.trim_start_matches("device_id:").to_string();
let uid: i64 = uid
.parse()
.map_err(|err| internal(format!("failed to parse uid: {}", err)))?;
Ok(CollabOrigin::Client(CollabClient { uid, device_id }))
},
_ => Err(internal(format!(
"couldn't parse collab origin from `{}`",
other
))),
}
},
}
}
#[repr(transparent)]
#[derive(Copy, Clone, Eq, PartialEq, Default)]
pub struct UpdateFlags(u8);
impl UpdateFlags {
/// Flag bit to mark if update is encoded using [EncoderV2] (if set) or [EncoderV1] (if clear).
pub const IS_V2_ENCODED: u8 = 0b0000_0001;
/// Flag bit to mark if update is compressed.
pub const IS_COMPRESSED: u8 = 0b0000_0010;
#[inline]
pub fn is_v2_encoded(&self) -> bool {
self.0 & Self::IS_V2_ENCODED != 0
}
#[inline]
pub fn is_v1_encoded(&self) -> bool {
!self.is_v2_encoded()
}
#[inline]
pub fn is_compressed(&self) -> bool {
self.0 & Self::IS_COMPRESSED != 0
}
}
impl ToRedisArgs for UpdateFlags {
#[inline]
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
self.0.write_redis_args(out)
}
}
impl From<u8> for UpdateFlags {
#[inline]
fn from(value: u8) -> Self {
UpdateFlags(value)
}
}
impl Display for UpdateFlags {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if !self.is_v2_encoded() {
write!(f, ".v1")?;
} else {
write!(f, ".v2")?;
}
if self.is_compressed() {
write!(f, ".zstd")?;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use crate::model::collab_origin_from_str;
use collab::core::origin::{CollabClient, CollabOrigin};
#[test]
fn parse_collab_origin_empty() {
let expected = CollabOrigin::Empty;
let actual = collab_origin_from_str(&expected.to_string()).unwrap();
assert_eq!(actual, expected);
}
#[test]
fn parse_collab_origin_server() {
let expected = CollabOrigin::Server;
let actual = collab_origin_from_str(&expected.to_string()).unwrap();
assert_eq!(actual, expected);
}
#[test]
fn parse_collab_origin_client() {
let expected = CollabOrigin::Client(CollabClient {
uid: 123,
device_id: "test-device".to_string(),
});
let actual = collab_origin_from_str(&expected.to_string()).unwrap();
assert_eq!(actual, expected);
}
#[test]
fn test_collab_update_event_decoding() {

View file

@ -1,99 +0,0 @@
use crate::error::StreamError;
use crate::model::{MessageId, StreamBinary, StreamMessage, StreamMessageByStreamKey};
use redis::aio::ConnectionManager;
use redis::streams::{StreamMaxlen, StreamReadOptions};
use redis::{pipe, AsyncCommands, Pipeline, RedisError};
pub struct CollabStream {
connection_manager: ConnectionManager,
stream_key: String,
}
impl CollabStream {
pub fn new(workspace_id: &str, oid: &str, connection_manager: ConnectionManager) -> Self {
let stream_key = format!("af_collab-{}-{}", workspace_id, oid);
Self {
connection_manager,
stream_key,
}
}
/// Inserts a single message into the Redis stream.
pub async fn insert_message(&mut self, message: StreamBinary) -> Result<MessageId, StreamError> {
let tuple = message.into_tuple_array();
let message_id = self
.connection_manager
.xadd(&self.stream_key, "*", tuple.as_slice())
.await?;
Ok(message_id)
}
/// Inserts multiple messages into the Redis stream using a pipeline.
///
pub async fn insert_messages(&mut self, messages: Vec<StreamBinary>) -> Result<(), StreamError> {
let mut pipe = pipe();
for message in messages {
let tuple = message.into_tuple_array();
let _: &mut Pipeline = pipe.xadd(&self.stream_key, "*", tuple.as_slice());
}
let () = pipe.query_async(&mut self.connection_manager).await?;
Ok(())
}
/// Fetches the next message from a Redis stream after a specified entry.
///
pub async fn next(&mut self) -> Result<Option<StreamMessage>, StreamError> {
let options = StreamReadOptions::default().count(1).block(100);
let map: StreamMessageByStreamKey = self
.connection_manager
.xread_options(&[&self.stream_key], &["$"], &options)
.await?;
let (_, mut messages) = map
.0
.into_iter()
.next()
.ok_or_else(|| StreamError::UnexpectedValue("Empty stream".into()))?;
debug_assert_eq!(messages.len(), 1);
Ok(messages.pop())
}
pub async fn next_after(
&mut self,
after: Option<MessageId>,
) -> Result<Option<StreamMessage>, StreamError> {
let message_id = after
.map(|ct| ct.to_string())
.unwrap_or_else(|| "$".to_string());
let options = StreamReadOptions::default().group("1", "2").block(100);
let map: StreamMessageByStreamKey = self
.connection_manager
.xread_options(&[&self.stream_key], &[&message_id], &options)
.await?;
let (_, mut messages) = map
.0
.into_iter()
.next()
.ok_or_else(|| StreamError::UnexpectedValue("Empty stream".into()))?;
debug_assert_eq!(messages.len(), 1);
Ok(messages.pop())
}
pub async fn read_all_message(&mut self) -> Result<Vec<StreamBinary>, StreamError> {
let read_messages: Vec<StreamMessage> =
self.connection_manager.xrange_all(&self.stream_key).await?;
Ok(read_messages.into_iter().map(Into::into).collect())
}
pub async fn clear(&mut self) -> Result<(), RedisError> {
let () = self
.connection_manager
.xtrim(&self.stream_key, StreamMaxlen::Equals(0))
.await?;
Ok(())
}
}

View file

@ -418,12 +418,12 @@ impl StreamGroup {
_ = interval.tick() => {
if let Ok(len) = get_stream_length(&mut connection_manager, &stream_key).await {
if len + 100 > max_len {
warn!("stream len is going to exceed the max len: {}, current: {}", max_len, len);
warn!("stream `{}` len is going to exceed the max len: {}, current: {}", stream_key, max_len, len);
}
}
}
_ = cancel_token.cancelled() => {
trace!("Stream length check task cancelled.");
trace!("Stream `{}` length check task cancelled.", stream_key);
break;
}
}

View file

@ -0,0 +1,373 @@
use loole::{Receiver, Sender};
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::Client;
use redis::Commands;
use redis::Connection;
use redis::RedisError;
use redis::RedisResult;
use redis::Value;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
/// Redis stream key.
pub type StreamKey = String;
/// Channel returned by [StreamRouter::observe], that allows to receive messages retrieved by
/// the router.
pub type StreamReader = tokio::sync::mpsc::UnboundedReceiver<(String, RedisMap)>;
/// Redis stream router used to multiplex multiple number of Redis stream read requests over a
/// fixed number of Redis connections.
pub struct StreamRouter {
buf: Sender<StreamHandle>,
alive: Arc<AtomicBool>,
#[allow(dead_code)]
workers: Vec<Worker>,
}
impl StreamRouter {
pub fn new(client: &Client) -> Result<Self, RedisError> {
Self::with_options(client, Default::default())
}
pub fn with_options(client: &Client, options: StreamRouterOptions) -> Result<Self, RedisError> {
let alive = Arc::new(AtomicBool::new(true));
let (tx, rx) = loole::unbounded();
let mut workers = Vec::with_capacity(options.worker_count);
for worker_id in 0..options.worker_count {
let conn = client.get_connection()?;
let worker = Worker::new(
worker_id,
conn,
tx.clone(),
rx.clone(),
alive.clone(),
&options,
);
workers.push(worker);
}
tracing::info!("stared Redis stream router with {} workers", workers.len());
Ok(Self {
buf: tx,
workers,
alive,
})
}
pub fn observe(&self, stream_key: StreamKey, last_id: Option<String>) -> StreamReader {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let last_id = last_id.unwrap_or_else(|| "0".to_string());
let h = StreamHandle::new(stream_key, last_id, tx);
self.buf.send(h).unwrap();
rx
}
}
impl Drop for StreamRouter {
fn drop(&mut self) {
self.alive.store(false, SeqCst);
}
}
/// Options used to configure [StreamRouter].
#[derive(Debug, Clone)]
pub struct StreamRouterOptions {
/// Number of worker threads. Each worker thread has its own Redis connection.
/// Default: number of CPU threads but can vary under specific circumstances.
pub worker_count: usize,
/// How many Redis streams a single Redis poll worker can read at a time.
/// Default: 100
pub xread_streams: usize,
/// How long poll worker will be blocked while waiting for Redis `XREAD` request to respond.
/// This blocks a worker thread and doesn't affect other threads.
///
/// If set to `None` it won't block and will return immediately, which gives a biggest
/// responsiveness but can lead to unnecessary active loops causing CPU spikes even when idle.
///
/// Default: `Some(0)` meaning blocking for indefinite amount of time.
pub xread_block_millis: Option<usize>,
/// How many messages a single worker's `XREAD` request is allowed to return.
/// Default: `None` (unbounded).
pub xread_count: Option<usize>,
}
impl Default for StreamRouterOptions {
fn default() -> Self {
StreamRouterOptions {
worker_count: std::thread::available_parallelism().unwrap().get(),
xread_streams: 100,
xread_block_millis: Some(0),
xread_count: None,
}
}
}
struct Worker {
_handle: JoinHandle<()>,
}
impl Worker {
fn new(
worker_id: usize,
conn: Connection,
tx: Sender<StreamHandle>,
rx: Receiver<StreamHandle>,
alive: Arc<AtomicBool>,
options: &StreamRouterOptions,
) -> Self {
let mut xread_options = StreamReadOptions::default();
if let Some(block_millis) = options.xread_block_millis {
xread_options = xread_options.block(block_millis);
}
if let Some(count) = options.xread_count {
xread_options = xread_options.count(count);
}
let count = options.xread_streams;
let handle = std::thread::spawn(move || {
if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count) {
tracing::error!("worker {} failed: {}", worker_id, err);
}
});
Self { _handle: handle }
}
fn process_streams(
mut conn: Connection,
tx: Sender<StreamHandle>,
rx: Receiver<StreamHandle>,
alive: Arc<AtomicBool>,
options: StreamReadOptions,
count: usize,
) -> RedisResult<()> {
let mut stream_keys = Vec::with_capacity(count);
let mut message_ids = Vec::with_capacity(count);
let mut senders = HashMap::with_capacity(count);
while alive.load(SeqCst) {
if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) {
break; // rx channel has closed
}
let key_count = stream_keys.len();
if key_count == 0 {
tracing::warn!("Bug: read empty buf");
sleep(Duration::from_millis(100));
continue;
}
let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?;
let mut msgs = 0;
for stream in result.keys {
let mut remove_sender = false;
if let Some((sender, idx)) = senders.get(stream.key.as_str()) {
for id in stream.ids {
let message_id = id.id;
let value = id.map;
message_ids[*idx].clone_from(&message_id); //TODO: optimize
msgs += 1;
if let Err(err) = sender.send((message_id, value)) {
tracing::warn!("failed to send: {}", err);
remove_sender = true;
}
}
}
if remove_sender {
senders.remove(stream.key.as_str());
}
}
if msgs > 0 {
tracing::trace!(
"XREAD: read total of {} messages for {} streams",
msgs,
key_count
);
}
Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
}
Ok(())
}
fn schedule_back(
tx: &Sender<StreamHandle>,
keys: &mut Vec<StreamKey>,
ids: &mut Vec<String>,
senders: &mut HashMap<&str, (StreamSender, usize)>,
) {
let keys = keys.drain(..);
let mut ids = ids.drain(..);
for key in keys {
if let Some(last_id) = ids.next() {
if let Some((sender, _)) = senders.remove(key.as_str()) {
let h = StreamHandle::new(key, last_id, sender);
if let Err(err) = tx.send(h) {
tracing::warn!("failed to reschedule: {}", err);
break;
}
}
}
}
senders.clear();
}
fn read_buf(
rx: &Receiver<StreamHandle>,
stream_keys: &mut Vec<StreamKey>,
message_ids: &mut Vec<String>,
senders: &mut HashMap<&'static str, (StreamSender, usize)>,
) -> bool {
// try to receive first element - block thread if there's none
let mut count = stream_keys.capacity();
if let Ok(h) = rx.recv() {
// senders and stream_keys have bound lifetimes and fixed internal buffers
// since API users are using StreamKeys => String, we want to avoid allocations
let key_ref: &'static str = unsafe { std::mem::transmute(h.key.as_str()) };
senders.insert(key_ref, (h.sender, stream_keys.len()));
stream_keys.push(h.key);
message_ids.push(h.last_id.to_string());
count -= 1;
if count == 0 {
return true;
}
// try to fill more without blocking if there's anything on the receiver
while let Ok(h) = rx.try_recv() {
let key_ref: &'static str = unsafe { std::mem::transmute(h.key.as_str()) };
senders.insert(key_ref, (h.sender, stream_keys.len()));
stream_keys.push(h.key);
message_ids.push(h.last_id.to_string());
count -= 1;
if count == 0 {
return true;
}
}
true
} else {
false
}
}
}
type RedisMap = HashMap<String, Value>;
type StreamSender = tokio::sync::mpsc::UnboundedSender<(String, RedisMap)>;
struct StreamHandle {
key: StreamKey,
last_id: String,
sender: StreamSender,
}
impl StreamHandle {
fn new(key: StreamKey, last_id: String, sender: StreamSender) -> Self {
StreamHandle {
key,
last_id,
sender,
}
}
}
#[cfg(test)]
mod test {
use crate::stream_router::StreamRouter;
use rand::random;
use redis::{Client, Commands, FromRedisValue};
use tokio::task::JoinSet;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn multi_worker_preexisting_messages() {
const ROUTES_COUNT: usize = 200;
const MSG_PER_ROUTE: usize = 10;
let mut client = Client::open("redis://127.0.0.1/").unwrap();
let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
let router = StreamRouter::new(&client).unwrap();
let mut join_set = JoinSet::new();
for key in keys {
let mut observer = router.observe(key.clone(), None);
join_set.spawn(async move {
for i in 0..MSG_PER_ROUTE {
let (_msg_id, map) = observer.recv().await.unwrap();
let value = String::from_redis_value(&map["data"]).unwrap();
assert_eq!(value, format!("{}-{}", key, i));
}
});
}
while let Some(t) = join_set.join_next().await {
t.unwrap();
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn multi_worker_live_messages() {
const ROUTES_COUNT: usize = 200;
const MSG_PER_ROUTE: usize = 10;
let mut client = Client::open("redis://127.0.0.1/").unwrap();
let keys = init_streams(&mut client, ROUTES_COUNT, 0);
let router = StreamRouter::new(&client).unwrap();
let mut join_set = JoinSet::new();
for key in keys.iter() {
let mut observer = router.observe(key.clone(), None);
let key = key.clone();
join_set.spawn(async move {
for i in 0..MSG_PER_ROUTE {
let (_msg_id, map) = observer.recv().await.unwrap();
let value = String::from_redis_value(&map["data"]).unwrap();
assert_eq!(value, format!("{}-{}", key, i));
}
});
}
for msg_idx in 0..MSG_PER_ROUTE {
for key in keys.iter() {
let data = format!("{}-{}", key, msg_idx);
let _: String = client.xadd(key, "*", &[("data", data)]).unwrap();
}
}
while let Some(t) = join_set.join_next().await {
t.unwrap();
}
}
#[tokio::test]
async fn stream_reader_continue_from() {
let mut client = Client::open("redis://127.0.0.1/").unwrap();
let key = format!("test:{}:{}", random::<u32>(), 0);
let _: String = client.xadd(&key, "*", &[("data", 1)]).unwrap();
let m2: String = client.xadd(&key, "*", &[("data", 2)]).unwrap();
let m3: String = client.xadd(&key, "*", &[("data", 3)]).unwrap();
let router = StreamRouter::new(&client).unwrap();
let mut observer = router.observe(key, Some(m2));
let (msg_id, m) = observer.recv().await.unwrap();
assert_eq!(msg_id, m3);
assert_eq!(u32::from_redis_value(&m["data"]).unwrap(), 3);
}
fn init_streams(client: &mut Client, stream_count: usize, msgs_per_stream: usize) -> Vec<String> {
let test_prefix: u32 = random();
let mut keys = Vec::with_capacity(stream_count);
for worker_idx in 0..stream_count {
let key = format!("test:{}:{}", test_prefix, worker_idx);
for msg_idx in 0..msgs_per_stream {
let data = format!("{}-{}", key, msg_idx);
let _: String = client.xadd(&key, "*", &[("data", data)]).unwrap();
}
keys.push(key);
}
keys
}
}

View file

@ -1,4 +1,3 @@
mod pubsub_test;
mod stream_group_test;
mod stream_test;
mod test_util;

View file

@ -1,37 +0,0 @@
use crate::collab_stream_test::test_util::{pubsub_client, random_i64};
use collab_stream::pubsub::PubSubMessage;
use futures::StreamExt;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn pubsub_test() {
let oid = format!("o{}", random_i64());
let client_1 = pubsub_client().await;
let client_2 = pubsub_client().await;
let mut publish = client_1.collab_pub().await;
let send_msg = PubSubMessage {
workspace_id: "1".to_string(),
oid: oid.clone(),
};
let cloned_msg = send_msg.clone();
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
match publish.publish(cloned_msg).await {
Ok(_) => {},
Err(err) => {
panic!("failed to publish message: {:?}", err);
},
}
});
let subscriber = client_2.collab_sub().await.unwrap();
let mut pubsub = subscriber.subscribe().await.unwrap();
let receive_msg = pubsub.next().await.unwrap().unwrap();
assert_eq!(send_msg.workspace_id, receive_msg.workspace_id);
}

View file

@ -10,7 +10,7 @@ async fn single_group_read_message_test() {
let oid = format!("o{}", random_i64());
let client = stream_client().await;
let mut group = client
.collab_update_stream(workspace_id, &oid, "g1")
.collab_update_stream_group(workspace_id, &oid, "g1")
.await
.unwrap();
let msg = StreamBinary(vec![1, 2, 3, 4, 5]);
@ -18,7 +18,7 @@ async fn single_group_read_message_test() {
{
let client = stream_client().await;
let mut group = client
.collab_update_stream(workspace_id, &oid, "g2")
.collab_update_stream_group(workspace_id, &oid, "g2")
.await
.unwrap();
group.insert_binary(msg).await.unwrap();
@ -45,7 +45,7 @@ async fn single_group_async_read_message_test() {
let oid = format!("o{}", random_i64());
let client = stream_client().await;
let mut group = client
.collab_update_stream(workspace_id, &oid, "g1")
.collab_update_stream_group(workspace_id, &oid, "g1")
.await
.unwrap();
@ -54,7 +54,7 @@ async fn single_group_async_read_message_test() {
{
let client = stream_client().await;
let mut group = client
.collab_update_stream(workspace_id, &oid, "g2")
.collab_update_stream_group(workspace_id, &oid, "g2")
.await
.unwrap();
group.insert_binary(msg).await.unwrap();
@ -79,14 +79,23 @@ async fn single_group_async_read_message_test() {
async fn different_group_read_undelivered_message_test() {
let oid = format!("o{}", random_i64());
let client = stream_client().await;
let mut group_1 = client.collab_update_stream("w1", &oid, "g1").await.unwrap();
let mut group_2 = client.collab_update_stream("w1", &oid, "g2").await.unwrap();
let mut group_1 = client
.collab_update_stream_group("w1", &oid, "g1")
.await
.unwrap();
let mut group_2 = client
.collab_update_stream_group("w1", &oid, "g2")
.await
.unwrap();
let msg = StreamBinary(vec![1, 2, 3, 4, 5]);
{
let client = stream_client().await;
let mut group = client.collab_update_stream("w1", &oid, "g2").await.unwrap();
let mut group = client
.collab_update_stream_group("w1", &oid, "g2")
.await
.unwrap();
group.insert_binary(msg).await.unwrap();
}
@ -105,14 +114,23 @@ async fn different_group_read_undelivered_message_test() {
async fn different_group_read_message_test() {
let oid = format!("o{}", random_i64());
let client = stream_client().await;
let mut group_1 = client.collab_update_stream("w1", &oid, "g1").await.unwrap();
let mut group_2 = client.collab_update_stream("w1", &oid, "g2").await.unwrap();
let mut group_1 = client
.collab_update_stream_group("w1", &oid, "g1")
.await
.unwrap();
let mut group_2 = client
.collab_update_stream_group("w1", &oid, "g2")
.await
.unwrap();
let msg = StreamBinary(vec![1, 2, 3, 4, 5]);
{
let client = stream_client().await;
let mut group = client.collab_update_stream("w1", &oid, "g2").await.unwrap();
let mut group = client
.collab_update_stream_group("w1", &oid, "g2")
.await
.unwrap();
group.insert_binary(msg).await.unwrap();
}
let msg = group_1
@ -140,13 +158,13 @@ async fn read_specific_num_of_message_test() {
let object_id = format!("o{}", random_i64());
let client = stream_client().await;
let mut group_1 = client
.collab_update_stream("w1", &object_id, "g1")
.collab_update_stream_group("w1", &object_id, "g1")
.await
.unwrap();
{
let client = stream_client().await;
let mut group = client
.collab_update_stream("w1", &object_id, "g2")
.collab_update_stream_group("w1", &object_id, "g2")
.await
.unwrap();
let mut messages = vec![];
@ -177,13 +195,13 @@ async fn read_all_message_test() {
let object_id = format!("o{}", random_i64());
let client = stream_client().await;
let mut group = client
.collab_update_stream("w1", &object_id, "g1")
.collab_update_stream_group("w1", &object_id, "g1")
.await
.unwrap();
{
let client = stream_client().await;
let mut group_2 = client
.collab_update_stream("w1", &object_id, "g2")
.collab_update_stream_group("w1", &object_id, "g2")
.await
.unwrap();
let mut messages = vec![];
@ -211,10 +229,16 @@ async fn group_already_exist_test() {
let client = stream_client().await;
// create group
client.collab_update_stream("w1", &oid, "g2").await.unwrap();
client
.collab_update_stream_group("w1", &oid, "g2")
.await
.unwrap();
// create same group
client.collab_update_stream("w1", &oid, "g2").await.unwrap();
client
.collab_update_stream_group("w1", &oid, "g2")
.await
.unwrap();
}
#[tokio::test]
@ -223,7 +247,10 @@ async fn group_not_exist_test() {
let client = stream_client().await;
// create group
let mut group = client.collab_update_stream("w1", &oid, "g2").await.unwrap();
let mut group = client
.collab_update_stream_group("w1", &oid, "g2")
.await
.unwrap();
group.destroy_group().await;
let err = group

View file

@ -1,50 +1 @@
use crate::collab_stream_test::test_util::{random_i64, stream_client};
use collab_stream::model::StreamBinary;
#[tokio::test]
async fn read_single_message_test() {
let oid = format!("o{}", random_i64());
let client_2 = stream_client().await;
let mut stream_2 = client_2.stream("w1", &oid).await;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let msg = stream_2.next().await.unwrap();
tx.send(msg).await.unwrap();
});
let msg = StreamBinary(vec![1, 2, 3]);
{
let client_1 = stream_client().await;
let mut stream_1 = client_1.stream("w1", &oid).await;
stream_1.insert_message(msg).await.unwrap();
}
let msg = rx.recv().await.unwrap().unwrap();
assert_eq!(msg.data, vec![1, 2, 3]);
}
#[tokio::test]
async fn read_multiple_messages_test() {
let oid = format!("o{}", random_i64());
let client_2 = stream_client().await;
let mut stream_2 = client_2.stream("w1", &oid).await;
stream_2.clear().await.unwrap();
{
let client_1 = stream_client().await;
let mut stream_1 = client_1.stream("w1", &oid).await;
let messages = vec![
StreamBinary(vec![1, 2, 3]),
StreamBinary(vec![4, 5, 6]),
StreamBinary(vec![7, 8, 9]),
];
stream_1.insert_messages(messages).await.unwrap();
}
let msg = stream_2.read_all_message().await.unwrap();
assert_eq!(msg.len(), 3);
assert_eq!(*msg[0], vec![1, 2, 3]);
assert_eq!(*msg[1], vec![4, 5, 6]);
assert_eq!(*msg[2], vec![7, 8, 9]);
}

View file

@ -1,5 +1,5 @@
use anyhow::Context;
use collab_stream::client::{CollabRedisStream, PubSubClient};
use collab_stream::client::CollabRedisStream;
use rand::{thread_rng, Rng};
pub async fn redis_client() -> redis::Client {
@ -17,14 +17,6 @@ pub async fn stream_client() -> CollabRedisStream {
.unwrap()
}
pub async fn pubsub_client() -> PubSubClient {
let redis_client = redis_client().await;
PubSubClient::new(redis_client)
.await
.context("failed to create pubsub client")
.unwrap()
}
pub fn random_i64() -> i64 {
let mut rng = thread_rng();
let num: i64 = rng.gen();

View file

@ -107,16 +107,16 @@ impl Display for CollabParams {
}
impl CollabParams {
pub fn new<T: ToString>(
pub fn new<T: ToString, B: Into<Bytes>>(
object_id: T,
collab_type: CollabType,
encoded_collab_v1: Vec<u8>,
encoded_collab_v1: B,
) -> Self {
let object_id = object_id.to_string();
Self {
object_id,
collab_type,
encoded_collab_v1: Bytes::from(encoded_collab_v1),
encoded_collab_v1: encoded_collab_v1.into(),
}
}
@ -217,7 +217,7 @@ pub struct InsertSnapshotParams {
#[validate(custom(function = "validate_not_empty_str"))]
pub object_id: String,
#[validate(custom(function = "validate_not_empty_payload"))]
pub data: Bytes,
pub doc_state: Bytes,
#[validate(custom(function = "validate_not_empty_str"))]
pub workspace_id: String,
pub collab_type: CollabType,
@ -254,13 +254,13 @@ impl Display for QueryCollabParams {
}
impl QueryCollabParams {
pub fn new<T1: ToString, T2: ToString>(
pub fn new<T1: Into<String>, T2: Into<String>>(
object_id: T1,
collab_type: CollabType,
workspace_id: T2,
) -> Self {
let workspace_id = workspace_id.to_string();
let object_id = object_id.to_string();
let workspace_id = workspace_id.into();
let object_id = object_id.into();
let inner = QueryCollab {
object_id,
collab_type,

View file

@ -445,6 +445,28 @@ pub async fn select_snapshot(
Ok(row)
}
#[inline]
pub async fn select_latest_snapshot(
pg_pool: &PgPool,
workspace_id: &Uuid,
object_id: &str,
) -> Result<Option<AFSnapshotRow>, Error> {
let row = sqlx::query_as!(
AFSnapshotRow,
r#"
SELECT * FROM af_collab_snapshot
WHERE workspace_id = $1 AND oid = $2 AND deleted_at IS NULL
ORDER BY created_at DESC
LIMIT 1;
"#,
workspace_id,
object_id
)
.fetch_optional(pg_pool)
.await?;
Ok(row)
}
/// Returns list of snapshots for given object_id in descending order of creation time.
pub async fn get_all_collab_snapshot_meta(
pg_pool: &PgPool,

View file

@ -6,6 +6,7 @@ use database_entity::dto::{
QueryCollabParams, QueryCollabResult, SnapshotData,
};
use crate::collab::CollabType;
use collab::entity::EncodedCollab;
use serde::{Deserialize, Serialize};
use sqlx::Transaction;
@ -147,6 +148,13 @@ pub trait CollabStorage: Send + Sync + 'static {
snapshot_id: &i64,
) -> AppResult<SnapshotData>;
async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
collab_type: CollabType,
) -> AppResult<Option<SnapshotData>>;
/// Returns list of snapshots for given object_id in descending order of creation time.
async fn get_collab_snapshot_list(
&self,

View file

@ -8,7 +8,6 @@ use crate::vector::embedder::Embedder;
use crate::vector::open_ai;
use app_error::AppError;
use appflowy_ai_client::dto::{EmbeddingRequest, OpenAIEmbeddingResponse};
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_document::document::DocumentBody;
use collab_entity::CollabType;
@ -243,7 +242,7 @@ impl IndexerScheduler {
&self,
workspace_id: &str,
object_id: &str,
collab: &Arc<RwLock<Collab>>,
collab: &Collab,
collab_type: &CollabType,
) -> Result<(), AppError> {
if !self.index_enabled() {
@ -256,11 +255,9 @@ impl IndexerScheduler {
match collab_type {
CollabType::Document => {
let lock = collab.read().await;
let txn = lock.transact();
let text = DocumentBody::from_collab(&lock)
let txn = collab.transact();
let text = DocumentBody::from_collab(collab)
.and_then(|body| body.to_plain_text(txn, false, true).ok());
drop(lock); // release the read lock ASAP
if let Some(text) = text {
if !text.is_empty() {
@ -268,7 +265,7 @@ impl IndexerScheduler {
Uuid::parse_str(workspace_id)?,
object_id.to_string(),
collab_type.clone(),
UnindexedData::UnindexedText(text),
UnindexedData::Text(text),
);
self.embed_immediately(pending)?;
}
@ -491,7 +488,7 @@ fn process_collab(
) -> Result<Option<(u32, Vec<AFCollabEmbeddedChunk>)>, AppError> {
if let Some(indexer) = indexer {
let chunks = match data {
UnindexedData::UnindexedText(text) => {
UnindexedData::Text(text) => {
indexer.create_embedded_chunks_from_text(object_id.to_string(), text, embedder.model())?
},
};
@ -543,13 +540,13 @@ impl UnindexedCollabTask {
#[derive(Debug, Serialize, Deserialize)]
pub enum UnindexedData {
UnindexedText(String),
Text(String),
}
impl UnindexedData {
pub fn is_empty(&self) -> bool {
match self {
UnindexedData::UnindexedText(text) => text.is_empty(),
UnindexedData::Text(text) => text.is_empty(),
}
}
}

View file

@ -61,6 +61,7 @@ thiserror = "1.0.56"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
anyhow.workspace = true
bytes.workspace = true
arc-swap.workspace = true
ureq = { version = "2.1.0", features = ["json"] }
collab = { workspace = true }

View file

@ -107,7 +107,7 @@ where
user: self.user.clone(),
message,
})
.map_err(|err| RealtimeError::Internal(err.into()))
.map_err(|err| RealtimeError::SendWSMessageFailed(err.to_string()))
}
}

View file

@ -113,7 +113,6 @@ async fn post_realtime_message_stream_handler(
bytes.extend_from_slice(&item?);
}
event!(tracing::Level::INFO, "message len: {}", bytes.len());
let device_id = device_id.to_string();
let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;

View file

@ -23,6 +23,10 @@ use tracing::info;
use crate::actix_ws::server::RealtimeServerActor;
use crate::api::{collab_scope, ws_scope};
use crate::collab::access_control::CollabStorageAccessControlImpl;
use access_control::casbin::access::AccessControl;
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
use database::file::s3_client_impl::AwsS3BucketClientImpl;
use crate::collab::cache::CollabCache;
use crate::collab::storage::CollabStorageImpl;
use crate::command::{CLCommandReceiver, CLCommandSender};
@ -31,8 +35,6 @@ use crate::pg_listener::PgListeners;
use crate::snapshot::SnapshotControl;
use crate::state::{AppMetrics, AppState, UserCache};
use crate::CollaborationServer;
use access_control::casbin::access::AccessControl;
use database::file::s3_client_impl::AwsS3BucketClientImpl;
use indexer::collab_indexer::IndexerProvider;
use indexer::scheduler::{IndexerConfiguration, IndexerScheduler};
@ -78,9 +80,10 @@ pub async fn run_actix_server(
)),
state.metrics.realtime_metrics.clone(),
rt_cmd_recv,
state.redis_stream_router.clone(),
state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs),
config.collab.edit_state_max_count,
config.collab.edit_state_max_secs,
Duration::from_secs(config.collab.group_prune_grace_period_secs),
state.indexer_scheduler.clone(),
)
.await
@ -107,7 +110,8 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
let user_cache = UserCache::new(pg_pool.clone()).await;
info!("Connecting to Redis...");
let redis_conn_manager = get_redis_client(config.redis_uri.expose_secret()).await?;
let (redis_conn_manager, redis_stream_router) =
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
// Pg listeners
info!("Setting up Pg listeners...");
@ -172,6 +176,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
config: Arc::new(config.clone()),
pg_listeners,
user_cache,
redis_stream_router,
redis_connection_manager: redis_conn_manager,
access_control,
collab_access_control_storage: collab_storage,
@ -181,14 +186,28 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
Ok(app_state)
}
async fn get_redis_client(redis_uri: &str) -> Result<redis::aio::ConnectionManager, Error> {
async fn get_redis_client(
redis_uri: &str,
worker_count: usize,
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
info!("Connecting to redis with uri: {}", redis_uri);
let manager = redis::Client::open(redis_uri)
.context("failed to connect to redis")?
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
let router = StreamRouter::with_options(
&client,
StreamRouterOptions {
worker_count,
xread_streams: 100,
xread_block_millis: Some(5000),
xread_count: None,
},
)?;
let manager = client
.get_connection_manager()
.await
.context("failed to get the connection manager")?;
Ok(manager)
Ok((manager, router.into()))
}
async fn get_connection_pool(setting: &DatabaseSetting) -> Result<PgPool, Error> {

View file

@ -1,4 +1,6 @@
use bytes::Bytes;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use futures_util::{stream, StreamExt};
use itertools::{Either, Itertools};
use sqlx::{PgPool, Transaction};
@ -186,6 +188,11 @@ impl CollabCache {
// when the data is written to the disk cache but fails to be written to the memory cache
// we log the error and continue.
self.cache_collab(object_id, collab_type, encode_collab_data);
Ok(())
}
fn cache_collab(&self, object_id: String, collab_type: CollabType, encode_collab_data: Bytes) {
let mem_cache = self.mem_cache.clone();
tokio::spawn(async move {
if let Err(err) = mem_cache
@ -203,20 +210,6 @@ impl CollabCache {
);
}
});
Ok(())
}
pub async fn get_encode_collab_from_disk(
&self,
workspace_id: &str,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
let encode_collab = self
.disk_cache
.get_collab_encoded_from_disk(workspace_id, query)
.await?;
Ok(encode_collab)
}
pub async fn insert_encode_collab_to_disk(
@ -225,10 +218,12 @@ impl CollabCache {
uid: &i64,
params: CollabParams,
) -> Result<(), AppError> {
let p = params.clone();
self
.disk_cache
.upsert_collab(workspace_id, uid, params)
.await?;
self.cache_collab(p.object_id, p.collab_type, p.encoded_collab_v1);
Ok(())
}

View file

@ -357,7 +357,7 @@ impl CollabDiskCache {
while let Err(err) = s3.put_blob(&key, doc_state.clone().into(), None).await {
match err {
AppError::ServiceTemporaryUnavailable(err) if retries > 0 => {
tracing::info!(
tracing::debug!(
"S3 service is temporarily unavailable: {}. Remaining retries: {}",
err,
retries
@ -371,6 +371,7 @@ impl CollabDiskCache {
},
}
}
tracing::trace!("saved collab to S3: {}", key);
Ok(())
}

View file

@ -154,6 +154,7 @@ impl CollabMemCache {
timestamp: i64,
expiration_seconds: Option<u64>,
) -> redis::RedisResult<()> {
tracing::trace!("insert collab {} to memory cache", object_id);
self
.insert_data_with_timestamp(object_id, data, timestamp, expiration_seconds)
.await

View file

@ -545,6 +545,18 @@ where
.await
}
async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
collab_type: CollabType,
) -> AppResult<Option<SnapshotData>> {
self
.snapshot_control
.get_latest_snapshot(workspace_id, object_id, collab_type)
.await
}
async fn get_collab_snapshot_list(
&self,
workspace_id: &str,

View file

@ -16,6 +16,7 @@ pub struct Config {
pub gotrue: GoTrueSetting,
pub collab: CollabSetting,
pub redis_uri: Secret<String>,
pub redis_worker_count: usize,
pub ai: AISettings,
pub s3: S3Setting,
}
@ -127,6 +128,7 @@ pub struct GoTrueSetting {
#[derive(Clone, Debug)]
pub struct CollabSetting {
pub group_persistence_interval_secs: u64,
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32,
pub edit_state_max_secs: i64,
pub s3_collab_threshold: u64,
@ -198,11 +200,14 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
"60",
)
.parse()?,
group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60")
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
s3_collab_threshold: get_env_var("APPFLOWY_COLLAB_S3_THRESHOLD", "8000").parse()?,
},
redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(),
redis_worker_count: get_env_var("APPFLOWY_REDIS_WORKERS", "60").parse()?,
ai: AISettings {
port: get_env_var("APPFLOWY_AI_SERVER_PORT", "5001").parse()?,
host: get_env_var("APPFLOWY_AI_SERVER_HOST", "localhost"),

View file

@ -60,6 +60,27 @@ pub enum RealtimeError {
#[error("Collab redis stream error: {0}")]
StreamError(#[from] StreamError),
#[error("Cannot create group: {0}")]
CannotCreateGroup(String),
#[error("BinCodeCollab error: {0}")]
BincodeEncode(String),
#[error("Failed to create snapshot: {0}")]
CreateSnapshotFailed(String),
#[error("Failed to get latest snapshot: {0}")]
GetLatestSnapshotFailed(String),
#[error("Collab Schema Error: {0}")]
CollabSchemaError(String),
#[error("failed to obtain lease: {0}")]
Lease(Box<dyn std::error::Error + Send + Sync>),
#[error("failed to send ws message: {0}")]
SendWSMessageFailed(String),
}
#[derive(Debug)]

View file

@ -6,6 +6,11 @@ use async_stream::stream;
use bytes::Bytes;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::entity::EncodedCollab;
use dashmap::DashMap;
use futures_util::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use collab_entity::CollabType;
use collab_rt_entity::user::RealtimeUser;
use collab_rt_entity::CollabAck;
@ -13,10 +18,7 @@ use collab_rt_entity::{
AckCode, ClientCollabMessage, MessageByObjectId, ServerCollabMessage, SinkMessage, UpdateSync,
};
use collab_rt_protocol::{Message, SyncMessage};
use dashmap::DashMap;
use database::collab::CollabStorage;
use futures_util::StreamExt;
use std::sync::Arc;
use tracing::{error, instrument, trace, warn};
use yrs::updates::encoder::Encode;
use yrs::StateVector;
@ -147,7 +149,10 @@ where
},
GroupCommand::GenerateCollabEmbedding { object_id } => {
if let Some(group) = self.group_manager.get_group(&object_id).await {
group.generate_embeddings().await;
match group.generate_embeddings().await {
Ok(_) => trace!("successfully created embeddings for {}", object_id),
Err(err) => trace!("failed to create embeddings for {}: {}", object_id, err),
}
}
},
GroupCommand::CalculateMissingUpdate {
@ -334,16 +339,15 @@ where
};
if let Some(group) = self.group_manager.get_group(&object_id).await {
let (collab_message_sender, _collab_message_receiver) = futures::channel::mpsc::channel(1);
let (mut message_by_oid_sender, message_by_oid_receiver) = futures::channel::mpsc::channel(1);
group.subscribe(
&server_rt_user,
CollabOrigin::Server,
collab_message_sender,
NullSender::default(),
message_by_oid_receiver,
);
let message = MessageByObjectId::new_with_message(object_id.clone(), messages);
if let Err(err) = message_by_oid_sender.try_send(message) {
let message = HashMap::from([(object_id.clone(), messages)]);
if let Err(err) = message_by_oid_sender.try_send(MessageByObjectId(message)) {
error!(
"failed to send message to group: {}, object_id: {}",
err, object_id

File diff suppressed because it is too large Load diff

View file

@ -1,25 +1,26 @@
use std::sync::Arc;
use std::time::Duration;
use access_control::collab::RealtimeAccessControl;
use app_error::AppError;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::preclude::Collab;
use collab_entity::CollabType;
use tracing::{instrument, trace};
use access_control::collab::RealtimeAccessControl;
use app_error::AppError;
use collab_rt_entity::user::RealtimeUser;
use collab_rt_entity::CollabMessage;
use collab_stream::client::CollabRedisStream;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::QueryCollabParams;
use tracing::{instrument, trace};
use yrs::{ReadTxn, StateVector};
use crate::client::client_msg_router::ClientMessageRouter;
use crate::error::{CreateGroupFailedReason, RealtimeError};
use crate::error::RealtimeError;
use crate::group::group_init::CollabGroup;
use crate::group::state::GroupManagementState;
use crate::metrics::CollabRealtimeMetrics;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::QueryCollabParams;
use indexer::scheduler::IndexerScheduler;
pub struct GroupManager<S> {
@ -27,9 +28,9 @@ pub struct GroupManager<S> {
storage: Arc<S>,
access_control: Arc<dyn RealtimeAccessControl>,
metrics_calculate: Arc<CollabRealtimeMetrics>,
collab_redis_stream: Arc<CollabRedisStream>,
persistence_interval: Duration,
edit_state_max_count: u32,
edit_state_max_secs: i64,
prune_grace_period: Duration,
indexer_scheduler: Arc<IndexerScheduler>,
}
@ -42,19 +43,20 @@ where
storage: Arc<S>,
access_control: Arc<dyn RealtimeAccessControl>,
metrics_calculate: Arc<CollabRealtimeMetrics>,
collab_stream: CollabRedisStream,
persistence_interval: Duration,
edit_state_max_count: u32,
edit_state_max_secs: i64,
prune_grace_period: Duration,
indexer_scheduler: Arc<IndexerScheduler>,
) -> Result<Self, RealtimeError> {
let collab_stream = Arc::new(collab_stream);
Ok(Self {
state: GroupManagementState::new(metrics_calculate.clone()),
storage,
access_control,
metrics_calculate,
collab_redis_stream: collab_stream,
persistence_interval,
edit_state_max_count,
edit_state_max_secs,
prune_grace_period,
indexer_scheduler,
})
}
@ -87,17 +89,18 @@ where
client_msg_router: &mut ClientMessageRouter,
) -> Result<(), RealtimeError> {
// Lock the group and subscribe the user to the group.
if let Some(group) = self.state.get_mut_group(object_id).await {
if let Some(mut e) = self.state.get_mut_group(object_id).await {
let group = e.value_mut();
trace!("[realtime]: {} subscribe group:{}", user, object_id,);
let (sink, stream) = client_msg_router.init_client_communication::<CollabMessage>(
&group.workspace_id,
group.workspace_id(),
user,
object_id,
self.access_control.clone(),
);
group.subscribe(user, message_origin.clone(), sink, stream);
// explicitly drop the group to release the lock.
drop(group);
drop(e);
self.state.insert_user(user, object_id)?;
} else {
@ -115,29 +118,23 @@ where
object_id: &str,
collab_type: CollabType,
) -> Result<(), RealtimeError> {
let mut is_new_collab = false;
let params = QueryCollabParams::new(object_id, collab_type.clone(), workspace_id);
let result = load_collab(user.uid, object_id, params, self.storage.clone()).await;
let (collab, _encode_collab) = {
let (mut collab, encode_collab) = match result {
Ok(value) => value,
Err(err) => {
if err.is_record_not_found() {
is_new_collab = true;
let collab = Collab::new_with_origin(CollabOrigin::Server, object_id, vec![], false);
let encode_collab = collab.encode_collab_v1(|_| Ok::<_, RealtimeError>(()))?;
(collab, encode_collab)
} else {
return Err(RealtimeError::CreateGroupFailed(
CreateGroupFailedReason::CannotGetCollabData,
));
}
},
};
collab.initialize();
(collab, encode_collab)
let res = self
.storage
.get_encode_collab(GetCollabOrigin::Server, params, false)
.await;
let state_vector = match res {
Ok(collab) => Collab::new_with_source(
CollabOrigin::Server,
object_id,
DataSource::DocStateV1(collab.doc_state.into()),
vec![],
false,
)?
.transact()
.state_vector(),
Err(err) if err.is_record_not_found() => StateVector::default(),
Err(err) => return Err(RealtimeError::CannotCreateGroup(err.to_string())),
};
trace!(
@ -148,25 +145,25 @@ where
collab_type
);
let group = Arc::new(CollabGroup::new(
let group = CollabGroup::new(
user.uid,
workspace_id.to_string(),
object_id.to_string(),
collab_type,
collab,
self.metrics_calculate.clone(),
self.storage.clone(),
is_new_collab,
self.collab_redis_stream.clone(),
self.persistence_interval,
self.edit_state_max_count,
self.edit_state_max_secs,
self.prune_grace_period,
state_vector,
self.indexer_scheduler.clone(),
)?);
)?;
self.state.insert_group(object_id, group);
Ok(())
}
}
#[allow(dead_code)]
#[instrument(level = "trace", skip_all)]
async fn load_collab<S>(
uid: i64,

View file

@ -1,9 +1,6 @@
pub(crate) mod broadcast;
pub(crate) mod cmd;
pub(crate) mod group_init;
pub(crate) mod manager;
mod null_sender;
mod persistence;
mod plugin;
pub(crate) mod protocol;
mod state;

View file

@ -64,7 +64,7 @@ where
let data = encode_collab.doc_state;
let params = InsertSnapshotParams {
object_id,
data,
doc_state: data,
workspace_id,
collab_type,
};

View file

@ -1,127 +0,0 @@
use std::sync::Arc;
use async_trait::async_trait;
use collab::core::collab::{TransactionExt, TransactionMutExt};
use collab::core::origin::CollabOrigin;
use tokio::time::Instant;
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::{ReadTxn, StateVector, Transact};
use collab_rt_protocol::CollabSyncProtocol;
use collab_rt_protocol::{
decode_update, CollabRef, CustomMessage, Message, RTProtocolError, SyncMessage,
};
use crate::CollabRealtimeMetrics;
#[derive(Clone)]
pub struct ServerSyncProtocol {
metrics: Arc<CollabRealtimeMetrics>,
}
impl ServerSyncProtocol {
pub fn new(metrics: Arc<CollabRealtimeMetrics>) -> Self {
Self { metrics }
}
}
#[async_trait]
impl CollabSyncProtocol for ServerSyncProtocol {
async fn handle_sync_step1(
&self,
collab: &CollabRef,
sv: StateVector,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
let (doc_state, state_vector) = {
let lock = collab.read().await;
let collab = (*lock).borrow();
let txn = collab.get_awareness().doc().try_transact().map_err(|err| {
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
})?;
let doc_state = txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
RTProtocolError::YrsEncodeState(format!(
"fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}",
err,
sv,
txn.store()
))
})?;
(doc_state, txn.state_vector())
};
// Retrieve the latest document state from the client after they return online from offline editing.
let mut encoder = EncoderV1::new();
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
//FIXME: this should never happen as response to sync step 1 from the client, but rather be
// send when a connection is established
Message::Sync(SyncMessage::SyncStep1(state_vector)).encode(&mut encoder);
Ok(Some(encoder.to_vec()))
}
async fn handle_sync_step2(
&self,
origin: &CollabOrigin,
collab: &CollabRef,
update: Vec<u8>,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
self.metrics.apply_update_size.observe(update.len() as f64);
let start = Instant::now();
let result = {
let update = decode_update(update).await?;
let mut lock = collab.write().await;
let collab = (*lock).borrow_mut();
let mut txn = collab
.get_awareness()
.doc()
.try_transact_mut_with(origin.clone())
.map_err(|err| {
RTProtocolError::YrsTransaction(format!("sync step2 transaction acquire: {}", err))
})?;
txn.try_apply_update(update).map_err(|err| {
RTProtocolError::YrsApplyUpdate(format!(
"sync step2 apply update: {}\ndocument state: {:#?}",
err,
txn.store()
))
})?;
// If server can't apply updates sent by client, which means the server is missing some updates
// from the client or the client is missing some updates from the server.
// If the client can't apply broadcast from server, which means the client is missing some
// updates.
match txn.store().pending_update() {
Some(_update) => {
// let state_vector_v1 = update.missing.encode_v1();
// for the moment, we don't need to send missing updates to the client. passing None
// instead, which will trigger a sync step 0 on client
let state_vector_v1 = txn.state_vector().encode_v1();
Err(RTProtocolError::MissUpdates {
state_vector_v1: Some(state_vector_v1),
reason: "server miss updates".to_string(),
})
},
None => Ok(None),
}
};
let elapsed = start.elapsed();
self
.metrics
.apply_update_time
.observe(elapsed.as_millis() as f64);
result
}
async fn handle_custom_message(
&self,
_collab: &CollabRef,
_msg: CustomMessage,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
Ok(None)
}
}

View file

@ -64,7 +64,9 @@ impl GroupManagementState {
loop {
match self.group_by_object_id.try_get(object_id) {
TryResult::Present(group) => return Some(group.clone()),
TryResult::Present(group) => {
return Some(group.clone());
},
TryResult::Absent => return None,
TryResult::Locked => {
attempts += 1;
@ -106,22 +108,29 @@ impl GroupManagementState {
}
}
pub(crate) fn insert_group(&self, object_id: &str, group: Arc<CollabGroup>) {
self.group_by_object_id.insert(object_id.to_string(), group);
pub(crate) fn insert_group(&self, object_id: &str, group: CollabGroup) {
self
.group_by_object_id
.insert(object_id.to_string(), group.into());
self.metrics_calculate.opening_collab_count.inc();
}
pub(crate) fn contains_group(&self, object_id: &str) -> bool {
self.group_by_object_id.contains_key(object_id)
if let Some(group) = self.group_by_object_id.get(object_id) {
let cancelled = group.is_cancelled();
!cancelled
} else {
false
}
}
pub(crate) fn remove_group(&self, object_id: &str) {
let entry = self.group_by_object_id.remove(object_id);
if entry.is_none() {
let group_not_found = self.group_by_object_id.remove(object_id).is_none();
if group_not_found {
// Log error if the group doesn't exist
error!("Group for object_id:{} not found", object_id);
}
self
.metrics_calculate
.opening_collab_count

View file

@ -11,7 +11,6 @@ use bytes::Bytes;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_entity::CollabType;
use dashmap::DashMap;
@ -281,7 +280,7 @@ impl IndexerScheduler {
&self,
workspace_id: &str,
object_id: &str,
collab: &Arc<RwLock<Collab>>,
collab: &Collab,
collab_type: &CollabType,
) -> Result<(), AppError> {
if !self.index_enabled() {
@ -304,9 +303,7 @@ impl IndexerScheduler {
let workspace_id = Uuid::parse_str(workspace_id)?;
let embedder = self.create_embedder()?;
let lock = collab.read().await;
let chunks = indexer.create_embedded_chunks(&lock, embedder.model())?;
drop(lock); // release the read lock ASAP
let chunks = indexer.create_embedded_chunks(collab, embedder.model())?;
let threads = self.threads.clone();
let tx = self.schedule_tx.clone();

View file

@ -8,7 +8,7 @@ pub mod compression;
pub mod config;
pub mod connect_state;
pub mod error;
mod group;
pub mod group;
pub mod metrics;
mod permission;
mod pg_listener;

View file

@ -8,16 +8,20 @@ pub struct CollabRealtimeMetrics {
pub(crate) connected_users: Gauge,
pub(crate) opening_collab_count: Gauge,
pub(crate) num_of_editing_users: Gauge,
/// Number of times a compact state collab load has been done.
pub(crate) load_collab_count: Gauge,
/// Number of times a full state collab (with history) load has been done.
pub(crate) load_full_collab_count: Gauge,
/// The number of apply update
pub(crate) apply_update_count: Gauge,
/// The number of apply update failed
pub(crate) apply_update_failed_count: Gauge,
pub(crate) acquire_collab_lock_count: Gauge,
pub(crate) acquire_collab_lock_fail_count: Gauge,
/// How long it takes to apply update in milliseconds.
pub(crate) apply_update_time: Histogram,
/// How big the update is in bytes.
pub(crate) apply_update_size: Histogram,
/// How long it takes to load a collab (from snapshot and updates combined).
pub(crate) load_collab_time: Histogram,
/// How big is the collab (no history, after applying all updates).
pub(crate) collab_size: Histogram,
/// How big is the collab (with history, after applying all updates).
pub(crate) full_collab_size: Histogram,
}
impl CollabRealtimeMetrics {
@ -28,23 +32,30 @@ impl CollabRealtimeMetrics {
num_of_editing_users: Gauge::default(),
apply_update_count: Default::default(),
apply_update_failed_count: Default::default(),
acquire_collab_lock_count: Default::default(),
acquire_collab_lock_fail_count: Default::default(),
// when it comes to histograms we organize them by buckets or specific sizes - since our
// prometheus client doesn't support Summary type, we use Histogram type instead
// time spent on apply_update in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s
apply_update_time: Histogram::new(
// time spent on loading collab in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s
load_collab_time: Histogram::new(
[1.0, 5.0, 15.0, 30.0, 100.0, 200.0, 500.0, 1000.0].into_iter(),
),
// update size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB
apply_update_size: Histogram::new(
// collab size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB
collab_size: Histogram::new(
[
128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0,
]
.into_iter(),
),
// collab size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB
full_collab_size: Histogram::new(
[
128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0,
]
.into_iter(),
),
load_collab_count: Default::default(),
load_full_collab_count: Default::default(),
}
}
@ -76,28 +87,31 @@ impl CollabRealtimeMetrics {
"number of apply update failed",
metrics.apply_update_failed_count.clone(),
);
realtime_registry.register(
"acquire_collab_lock_count",
"number of acquire collab lock",
metrics.acquire_collab_lock_count.clone(),
"load_collab_time",
"time spent on loading collab in milliseconds",
metrics.load_collab_time.clone(),
);
realtime_registry.register(
"acquire_collab_lock_fail_count",
"number of acquire collab lock failed",
metrics.acquire_collab_lock_fail_count.clone(),
"collab_size",
"size of compact collab in bytes",
metrics.collab_size.clone(),
);
realtime_registry.register(
"apply_update_time",
"time spent on applying collab updates in milliseconds",
metrics.apply_update_time.clone(),
"full_collab_size",
"size of full collab in bytes",
metrics.full_collab_size.clone(),
);
realtime_registry.register(
"apply_update_size",
"size of updates applied to collab in bytes",
metrics.apply_update_size.clone(),
"load_collab_count",
"number of collab loads (no history)",
metrics.load_collab_count.clone(),
);
realtime_registry.register(
"load_full_collab_count",
"number of collab loads (with history)",
metrics.load_full_collab_count.clone(),
);
metrics
}
}

View file

@ -6,8 +6,11 @@ use anyhow::{anyhow, Result};
use app_error::AppError;
use collab_rt_entity::user::{RealtimeUser, UserDevice};
use collab_rt_entity::MessageByObjectId;
use collab_stream::client::CollabRedisStream;
use collab_stream::stream_router::StreamRouter;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use redis::aio::ConnectionManager;
use tokio::sync::mpsc::Sender;
use tokio::task::yield_now;
use tokio::time::interval;
@ -50,9 +53,10 @@ where
access_control: Arc<dyn RealtimeAccessControl>,
metrics: Arc<CollabRealtimeMetrics>,
command_recv: CLCommandReceiver,
redis_stream_router: Arc<StreamRouter>,
redis_connection_manager: ConnectionManager,
group_persistence_interval: Duration,
edit_state_max_count: u32,
edit_state_max_secs: i64,
prune_grace_period: Duration,
indexer_scheduler: Arc<IndexerScheduler>,
) -> Result<Self, RealtimeError> {
let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false")
@ -66,14 +70,16 @@ where
}
let connect_state = ConnectState::new();
let collab_stream =
CollabRedisStream::new_with_connection_manager(redis_connection_manager, redis_stream_router);
let group_manager = Arc::new(
GroupManager::new(
storage.clone(),
access_control.clone(),
metrics.clone(),
collab_stream,
group_persistence_interval,
edit_state_max_count,
edit_state_max_secs,
prune_grace_period,
indexer_scheduler.clone(),
)
.await?,

View file

@ -3,6 +3,7 @@ use std::time::Duration;
use chrono::{DateTime, Utc};
use collab::entity::{EncodedCollab, EncoderVersion};
use collab_entity::CollabType;
use sqlx::PgPool;
use tracing::{debug, error, trace, warn};
use validator::Validate;
@ -14,6 +15,7 @@ use database::collab::{
};
use database::file::s3_client_impl::AwsS3BucketClientImpl;
use database::file::{BucketClient, ResponseBlob};
use database::history::ops::get_latest_snapshot;
use database_entity::dto::{
AFSnapshotMeta, AFSnapshotMetas, InsertSnapshotParams, SnapshotData, ZSTD_COMPRESSION_LEVEL,
};
@ -60,7 +62,6 @@ fn get_meta(objct_key: String) -> Option<AFSnapshotMeta> {
}
#[derive(Clone)]
// #[deprecated(note = "snapshot is implemented in the appflowy-history")]
pub struct SnapshotControl {
pg_pool: PgPool,
s3: AwsS3BucketClientImpl,
@ -119,7 +120,7 @@ impl SnapshotControl {
let timestamp = Utc::now();
let snapshot_id = timestamp.timestamp_millis();
let key = collab_snapshot_key(&params.workspace_id, &params.object_id, snapshot_id);
let compressed = zstd::encode_all(params.data.as_ref(), ZSTD_COMPRESSION_LEVEL)?;
let compressed = zstd::encode_all(params.doc_state.as_ref(), ZSTD_COMPRESSION_LEVEL)?;
if let Err(err) = self.s3.put_blob(&key, compressed.into(), None).await {
self.collab_metrics.write_snapshot_failures.inc();
return Err(err);
@ -241,6 +242,41 @@ impl SnapshotControl {
.await
}
pub async fn get_latest_snapshot(
&self,
workspace_id: &str,
oid: &str,
collab_type: CollabType,
) -> Result<Option<SnapshotData>, AppError> {
let snapshot_prefix = collab_snapshot_prefix(workspace_id, oid);
let mut resp = self.s3.list_dir(&snapshot_prefix, 1).await?;
if let Some(key) = resp.pop() {
let resp = self.s3.get_blob(&key).await?;
let decompressed = zstd::decode_all(&*resp.to_blob())?;
let encoded_collab = EncodedCollab {
state_vector: Default::default(),
doc_state: decompressed.into(),
version: EncoderVersion::V1,
};
Ok(Some(SnapshotData {
object_id: oid.to_string(),
encoded_collab_v1: encoded_collab.encode_to_bytes()?,
workspace_id: workspace_id.to_string(),
}))
} else {
let snapshot = get_latest_snapshot(oid, &collab_type, &self.pg_pool).await?;
Ok(
snapshot
.and_then(|row| row.snapshot_meta)
.map(|meta| SnapshotData {
object_id: oid.to_string(),
encoded_collab_v1: meta.snapshot,
workspace_id: workspace_id.to_string(),
}),
)
}
}
async fn latest_snapshot_time(
&self,
workspace_id: &str,

View file

@ -13,6 +13,7 @@ use crate::pg_listener::PgListeners;
use crate::CollabRealtimeMetrics;
use access_control::metrics::AccessControlMetrics;
use app_error::AppError;
use collab_stream::stream_router::StreamRouter;
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
use indexer::metrics::EmbeddingMetrics;
use indexer::scheduler::IndexerScheduler;
@ -24,6 +25,7 @@ pub struct AppState {
pub config: Arc<Config>,
pub pg_listeners: Arc<PgListeners>,
pub user_cache: UserCache,
pub redis_stream_router: Arc<StreamRouter>,
pub redis_connection_manager: RedisConnectionManager,
pub access_control: AccessControl,
pub collab_access_control_storage: Arc<CollabAccessControlStorage>,

View file

@ -239,7 +239,7 @@ async fn init_collab_handle(
) -> Result<OpenCollabHandle, HistoryError> {
let group_name = format!("history_{}:{}", workspace_id, object_id);
let update_stream = redis_stream
.collab_update_stream(workspace_id, object_id, &group_name)
.collab_update_stream_group(workspace_id, object_id, &group_name)
.await
.unwrap();

View file

@ -27,7 +27,7 @@ async fn apply_update_stream_updates_test() {
.unwrap();
let mut update_group = redis_stream
.collab_update_stream(&workspace_id, &object_id, "appflowy_cloud")
.collab_update_stream_group(&workspace_id, &object_id, "appflowy_cloud")
.await
.unwrap();
@ -81,7 +81,7 @@ async fn apply_update_stream_updates_test() {
// .unwrap();
//
// let mut update_group = redis_stream
// .collab_update_stream(&workspace_id, &object_id, "appflowy_cloud")
// .collab_update_stream_group(&workspace_id, &object_id, "appflowy_cloud")
// .await
// .unwrap();
//

View file

@ -10,7 +10,7 @@ async fn single_reader_single_sender_update_stream_test() {
let object_id = uuid::Uuid::new_v4().to_string();
let mut send_group = redis_stream
.collab_update_stream(&workspace, &object_id, "write")
.collab_update_stream_group(&workspace, &object_id, "write")
.await
.unwrap();
for i in 0..5 {
@ -18,7 +18,7 @@ async fn single_reader_single_sender_update_stream_test() {
}
let mut recv_group = redis_stream
.collab_update_stream(&workspace, &object_id, "read1")
.collab_update_stream_group(&workspace, &object_id, "read1")
.await
.unwrap();
@ -55,19 +55,19 @@ async fn multiple_reader_single_sender_update_stream_test() {
let object_id = uuid::Uuid::new_v4().to_string();
let mut send_group = redis_stream
.collab_update_stream(&workspace, &object_id, "write")
.collab_update_stream_group(&workspace, &object_id, "write")
.await
.unwrap();
send_group.insert_message(vec![1, 2, 3]).await.unwrap();
send_group.insert_message(vec![4, 5, 6]).await.unwrap();
let recv_group_1 = redis_stream
.collab_update_stream(&workspace, &object_id, "read1")
.collab_update_stream_group(&workspace, &object_id, "read1")
.await
.unwrap();
let recv_group_2 = redis_stream
.collab_update_stream(&workspace, &object_id, "read2")
.collab_update_stream_group(&workspace, &object_id, "read2")
.await
.unwrap();
// Both groups should have the same messages

View file

@ -228,7 +228,7 @@ fn handle_task(
task.collab_type
);
let chunks = match task.data {
UnindexedData::UnindexedText(text) => indexer
UnindexedData::Text(text) => indexer
.create_embedded_chunks_from_text(task.object_id.clone(), text, embedder.model())
.ok()?,
};

View file

@ -743,7 +743,7 @@ async fn create_collab_handler(
workspace_id_uuid,
params.object_id.clone(),
params.collab_type.clone(),
UnindexedData::UnindexedText(text),
UnindexedData::Text(text),
);
state
.indexer_scheduler
@ -875,8 +875,7 @@ async fn batch_create_collab_handler(
let total_size = collab_params_list
.iter()
.fold(0, |acc, x| acc + x.1.encoded_collab_v1.len());
event!(
tracing::Level::INFO,
tracing::info!(
"decompressed {} collab objects in {:?}",
collab_params_list.len(),
start.elapsed()
@ -903,7 +902,7 @@ async fn batch_create_collab_handler(
workspace_id_uuid,
value.1.object_id.clone(),
value.1.collab_type.clone(),
UnindexedData::UnindexedText(text),
UnindexedData::Text(text),
)
})
.ok(),
@ -922,8 +921,7 @@ async fn batch_create_collab_handler(
.batch_insert_new_collab(&workspace_id, &uid, collab_params_list)
.await?;
event!(
tracing::Level::INFO,
tracing::info!(
"inserted collab objects to disk in {:?}, total size:{}",
start.elapsed(),
total_size
@ -1373,7 +1371,7 @@ async fn create_collab_snapshot_handler(
.create_snapshot(InsertSnapshotParams {
object_id,
workspace_id,
data,
doc_state: data,
collab_type,
})
.await?;
@ -1462,7 +1460,7 @@ async fn update_collab_handler(
workspace_id_uuid,
params.object_id.clone(),
params.collab_type.clone(),
UnindexedData::UnindexedText(text),
UnindexedData::Text(text),
);
state
.indexer_scheduler
@ -1984,7 +1982,6 @@ async fn post_realtime_message_stream_handler(
bytes.extend_from_slice(&item?);
}
event!(tracing::Level::INFO, "message len: {}", bytes.len());
let device_id = device_id.to_string();
let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;

View file

@ -42,6 +42,7 @@ use appflowy_collaborate::collab::storage::CollabStorageImpl;
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
use appflowy_collaborate::snapshot::SnapshotControl;
use appflowy_collaborate::CollaborationServer;
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
use indexer::collab_indexer::IndexerProvider;
use indexer::scheduler::{IndexerConfiguration, IndexerScheduler};
@ -134,9 +135,10 @@ pub async fn run_actix_server(
state.realtime_access_control.clone(),
state.metrics.realtime_metrics.clone(),
rt_cmd_recv,
state.redis_stream_router.clone(),
state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs),
config.collab.edit_state_max_count,
config.collab.edit_state_max_secs,
Duration::from_secs(config.collab.group_prune_grace_period_secs),
state.indexer_scheduler.clone(),
)
.await
@ -246,7 +248,8 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
// Redis
info!("Connecting to Redis...");
let redis_conn_manager = get_redis_client(config.redis_uri.expose_secret()).await?;
let (redis_conn_manager, redis_stream_router) =
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
info!("Setup AppFlowy AI: {}", config.appflowy_ai.url());
let appflowy_ai_client = AppFlowyAIClient::new(&config.appflowy_ai.url());
@ -348,6 +351,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
user_cache,
id_gen: Arc::new(RwLock::new(Snowflake::new(1))),
gotrue_client,
redis_stream_router,
redis_connection_manager: redis_conn_manager,
collab_cache,
collab_access_control_storage,
@ -380,14 +384,28 @@ fn get_admin_client(
)
}
async fn get_redis_client(redis_uri: &str) -> Result<redis::aio::ConnectionManager, Error> {
async fn get_redis_client(
redis_uri: &str,
worker_count: usize,
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
info!("Connecting to redis with uri: {}", redis_uri);
let manager = redis::Client::open(redis_uri)
.context("failed to connect to redis")?
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
let router = StreamRouter::with_options(
&client,
StreamRouterOptions {
worker_count,
xread_streams: 100,
xread_block_millis: Some(5000),
xread_count: None,
},
)?;
let manager = client
.get_connection_manager()
.await
.context("failed to get the connection manager")?;
Ok(manager)
Ok((manager, router.into()))
}
pub async fn get_aws_s3_client(s3_setting: &S3Setting) -> Result<aws_sdk_s3::Client, Error> {

View file

@ -741,7 +741,7 @@ pub async fn broadcast_update(
oid: &str,
encoded_update: Vec<u8>,
) -> Result<(), AppError> {
tracing::info!("broadcasting update to group: {}", oid);
tracing::trace!("broadcasting update to group: {}", oid);
let payload = Message::Sync(SyncMessage::Update(encoded_update)).encode_v1();
let msg = ClientCollabMessage::ClientUpdateSync {
data: UpdateSync {

View file

@ -19,6 +19,7 @@ pub struct Config {
pub application: ApplicationSetting,
pub websocket: WebsocketSetting,
pub redis_uri: Secret<String>,
pub redis_worker_count: usize,
pub s3: S3Setting,
pub appflowy_ai: AppFlowyAISetting,
pub grpc_history: GrpcHistorySetting,
@ -143,6 +144,7 @@ pub struct GrpcHistorySetting {
#[derive(Clone, Debug)]
pub struct CollabSetting {
pub group_persistence_interval_secs: u64,
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32,
pub edit_state_max_secs: i64,
pub s3_collab_threshold: u64,
@ -224,6 +226,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
min_client_version: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_MIN_VERSION", "0.5.0").parse()?,
},
redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(),
redis_worker_count: get_env_var("APPFLOWY_REDIS_WORKERS", "60").parse()?,
s3: S3Setting {
create_bucket: get_env_var("APPFLOWY_S3_CREATE_BUCKET", "true")
.parse()
@ -250,6 +253,8 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
"60",
)
.parse()?,
group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60")
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
s3_collab_threshold: get_env_var("APPFLOWY_COLLAB_S3_THRESHOLD", "8000").parse()?,

View file

@ -17,6 +17,7 @@ use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use appflowy_collaborate::metrics::CollabMetrics;
use appflowy_collaborate::CollabRealtimeMetrics;
use collab_stream::stream_router::StreamRouter;
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
use gotrue::grant::{Grant, PasswordGrant};
@ -39,6 +40,7 @@ pub struct AppState {
pub user_cache: UserCache,
pub id_gen: Arc<RwLock<Snowflake>>,
pub gotrue_client: gotrue::api::Client,
pub redis_stream_router: Arc<StreamRouter>,
pub redis_connection_manager: RedisConnectionManager,
pub collab_cache: CollabCache,
pub collab_access_control_storage: Arc<CollabAccessControlStorage>,

Binary file not shown.

View file

@ -24,13 +24,14 @@ async fn viewing_document_editing_users_test() {
let owner_uid = owner.uid().await;
let clients = owner.get_connect_users(&object_id).await;
assert_eq!(clients.len(), 1);
assert_eq!(clients.len(), 1, "guest shouldn't be connected yet");
assert_eq!(clients[0], owner_uid);
guest
.open_collab(&workspace_id, &object_id, collab_type)
.await;
guest.wait_object_sync_complete(&object_id).await.unwrap();
sleep(Duration::from_secs(1)).await;
// after guest open the collab, it will emit an awareness that contains the user id of guest.
// This awareness will be sent to the server. Server will broadcast the awareness to all the clients
@ -42,7 +43,7 @@ async fn viewing_document_editing_users_test() {
let mut expected_clients = [owner_uid, guest_uid];
expected_clients.sort();
assert_eq!(clients.len(), 2);
assert_eq!(clients.len(), 2, "expected owner and member connected");
assert_eq!(clients, expected_clients);
// simulate the guest close the collab
guest.clean_awareness_state(&object_id).await;
@ -50,7 +51,7 @@ async fn viewing_document_editing_users_test() {
sleep(Duration::from_secs(5)).await;
guest.wait_object_sync_complete(&object_id).await.unwrap();
let clients = owner.get_connect_users(&object_id).await;
assert_eq!(clients.len(), 1);
assert_eq!(clients.len(), 1, "expected only owner connected");
assert_eq!(clients[0], owner_uid);
// simulate the guest open the collab again

View file

@ -7,6 +7,8 @@ mod missing_update_test;
mod multi_devices_edit;
mod permission_test;
mod single_device_edit;
mod snapshot_test;
mod storage_test;
mod stress_test;
pub mod util;
mod web_edit;

View file

@ -0,0 +1,85 @@
use client_api_test::{assert_server_collab, TestClient};
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::preclude::{Collab, JsonValue};
use collab_entity::CollabType;
use serde_json::json;
#[tokio::test]
async fn read_write_snapshot() {
let mut c = TestClient::new_user().await;
// prepare initial document
let wid = c.workspace_id().await;
let oid = c.create_and_edit_collab(&wid, CollabType::Unknown).await;
c.open_collab(&wid, &oid, CollabType::Unknown).await;
c.insert_into(&oid, "title", "t1").await;
c.wait_object_sync_complete(&oid).await.unwrap();
assert_server_collab(
&wid,
&mut c.api_client,
&oid,
&CollabType::Unknown,
10,
json!({"title": "t1"}),
)
.await
.unwrap();
// create the 1st snapshot
let m1 = c
.create_snapshot(&wid, &oid, CollabType::Unknown)
.await
.unwrap();
c.insert_into(&oid, "title", "t2").await;
c.wait_object_sync_complete(&oid).await.unwrap();
assert_server_collab(
&wid,
&mut c.api_client,
&oid,
&CollabType::Unknown,
10,
json!({"title": "t2"}),
)
.await
.unwrap();
// create the 2nd snapshot
let m2 = c
.create_snapshot(&wid, &oid, CollabType::Unknown)
.await
.unwrap();
let snapshots = c.get_snapshot_list(&wid, &oid).await.unwrap();
assert_eq!(snapshots.0.len(), 2, "expecting 2 snapshots");
// retrieve state
verify_snapshot_state(&c, &wid, &oid, &m1.snapshot_id, json!({"title": "t1"})).await;
verify_snapshot_state(&c, &wid, &oid, &m2.snapshot_id, json!({"title": "t2"})).await;
}
async fn verify_snapshot_state(
c: &TestClient,
workspace_id: &str,
oid: &str,
snapshot_id: &i64,
expected: JsonValue,
) {
let snapshot = c
.get_snapshot(workspace_id, oid, snapshot_id)
.await
.unwrap();
// retrieve state
let encoded_collab = EncodedCollab::decode_from_bytes(&snapshot.encoded_collab_v1).unwrap();
let collab = Collab::new_with_source(
CollabOrigin::Empty,
oid,
DataSource::DocStateV1(encoded_collab.doc_state.into()),
vec![],
true,
)
.unwrap();
let actual = collab.to_json_value();
assert_eq!(actual, expected);
}

View file

@ -0,0 +1,81 @@
use std::sync::Arc;
use std::time::Duration;
use collab_entity::CollabType;
use serde_json::json;
use tokio::time::sleep;
use uuid::Uuid;
use super::util::TestScenario;
use client_api_test::{assert_server_collab, TestClient};
use database_entity::dto::AFRole;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn stress_test_run_multiple_text_edits() {
const READER_COUNT: usize = 1;
let test_scenario = Arc::new(TestScenario::open(
"./tests/collab/asset/automerge-paper.json.gz",
));
// create writer
let mut writer = TestClient::new_user().await;
sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue
let object_id = Uuid::new_v4().to_string();
let workspace_id = writer.workspace_id().await;
writer
.open_collab(&workspace_id, &object_id, CollabType::Unknown)
.await;
// create readers and invite them into the same workspace
let mut readers = Vec::with_capacity(READER_COUNT);
for _ in 0..READER_COUNT {
let mut reader = TestClient::new_user().await;
sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue
writer
.invite_and_accepted_workspace_member(&workspace_id, &reader, AFRole::Member)
.await
.unwrap();
reader
.open_collab(&workspace_id, &object_id, CollabType::Unknown)
.await;
readers.push(reader);
}
// run test scenario
let collab = writer.collabs.get(&object_id).unwrap().collab.clone();
let expected = test_scenario.execute(collab, 20_000).await;
// wait for the writer to complete sync
writer.wait_object_sync_complete(&object_id).await.unwrap();
// wait for the readers to complete sync
let mut tasks = Vec::with_capacity(READER_COUNT);
for reader in readers.iter() {
let fut = reader.wait_object_sync_complete(&object_id);
tasks.push(fut);
}
let results = futures::future::join_all(tasks).await;
// make sure that the readers are in correct state
for res in results {
res.unwrap();
}
for mut reader in readers.drain(..) {
assert_server_collab(
&workspace_id,
&mut reader.api_client,
&object_id,
&CollabType::Unknown,
10,
json!({
"text-id": &expected,
}),
)
.await
.unwrap();
}
}

View file

@ -204,3 +204,97 @@ pub async fn redis_connection_manager() -> ConnectionManager {
}
}
}
use std::io::{BufReader, Read};
use collab::preclude::MapExt;
use flate2::bufread::GzDecoder;
use serde::Deserialize;
use yrs::{GetString, Text, TextRef};
use client_api_test::CollabRef;
/// (position, delete length, insert content).
#[derive(Debug, Clone, Deserialize, Eq, PartialEq)]
pub struct TestPatch(pub usize, pub usize, pub String);
#[derive(Debug, Clone, Deserialize, Eq, PartialEq)]
pub struct TestTxn {
// time: String, // ISO String. Unused.
pub patches: Vec<TestPatch>,
}
#[derive(Debug, Clone, Deserialize, Eq, PartialEq)]
pub struct TestScenario {
#[serde(default)]
pub using_byte_positions: bool,
#[serde(rename = "startContent")]
pub start_content: String,
#[serde(rename = "endContent")]
pub end_content: String,
pub txns: Vec<TestTxn>,
}
impl TestScenario {
/// Load the testing data at the specified file. If the filename ends in .gz, it will be
/// transparently uncompressed.
///
/// This method panics if the file does not exist, or is corrupt. It'd be better to have a try_
/// variant of this method, but given this is mostly for benchmarking and testing, I haven't felt
/// the need to write that code.
pub fn open(fpath: &str) -> TestScenario {
// let start = SystemTime::now();
// let mut file = File::open("benchmark_data/automerge-paper.json.gz").unwrap();
let file = std::fs::File::open(fpath).unwrap();
let mut reader = BufReader::new(file);
// We could pass the GzDecoder straight to serde, but it makes it way slower to parse for
// some reason.
let mut raw_json = vec![];
if fpath.ends_with(".gz") {
let mut reader = GzDecoder::new(reader);
reader.read_to_end(&mut raw_json).unwrap();
} else {
reader.read_to_end(&mut raw_json).unwrap();
}
let data: TestScenario = serde_json::from_reader(raw_json.as_slice()).unwrap();
data
}
pub async fn execute(&self, collab: CollabRef, step_count: usize) -> String {
let mut i = 0;
for t in self.txns.iter().take(step_count) {
i += 1;
if i % 10_000 == 0 {
tracing::trace!("Executed {}/{} steps", i, step_count);
}
let mut lock = collab.write().await;
let collab = lock.borrow_mut();
let mut txn = collab.context.transact_mut();
let txt = collab.data.get_or_init_text(&mut txn, "text-id");
for patch in t.patches.iter() {
let at = patch.0;
let delete = patch.1;
let content = patch.2.as_str();
if delete != 0 {
txt.remove_range(&mut txn, at as u32, delete as u32);
}
if !content.is_empty() {
txt.insert(&mut txn, at as u32, content);
}
}
}
// validate after applying all patches
let lock = collab.read().await;
let collab = lock.borrow();
let txn = collab.context.transact();
let txt: TextRef = collab.data.get_with_txn(&txn, "text-id").unwrap();
txt.get_string(&txn)
}
}

View file

@ -21,8 +21,8 @@ async fn quick_note_crud_test() {
// To ensure that the creation time is different
time::sleep(Duration::from_millis(1)).await;
}
let quick_note_id_1 = quick_note_ids[0];
let quick_note_id_2 = quick_note_ids[1];
let _quick_note_id_1 = quick_note_ids[0];
let _quick_note_id_2 = quick_note_ids[1];
let quick_notes = client
.api_client
.list_quick_notes(workspace_uuid, None, None, None)
@ -30,9 +30,11 @@ async fn quick_note_crud_test() {
.expect("list quick notes");
assert_eq!(quick_notes.quick_notes.len(), 2);
assert!(!quick_notes.has_more);
assert_eq!(quick_notes.quick_notes[0].id, quick_note_id_2);
assert_eq!(quick_notes.quick_notes[1].id, quick_note_id_1);
let mut notes_sorted_by_created_at_asc = quick_notes.quick_notes.clone();
notes_sorted_by_created_at_asc.sort_by(|a, b| a.created_at.cmp(&b.created_at));
let quick_note_id_1 = notes_sorted_by_created_at_asc[0].id;
let quick_note_id_2 = notes_sorted_by_created_at_asc[1].id;
let data_1 = json!([
{
"type": "paragraph",

View file

@ -7,4 +7,5 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3.31"

View file

@ -1,61 +1,112 @@
use anyhow::{anyhow, Context, Result};
use std::process::Stdio;
use tokio::process::Command;
use tokio::select;
use tokio::time::{sleep, Duration};
/// Using 'cargo run --package xtask' to run servers in parallel.
/// 1. AppFlowy Cloud
/// 2. AppFlowy History
/// 3. AppFlowy Indexer
/// Run servers:
/// cargo run --package xtask
///
/// Before running this command, make sure the other dependencies servers are running. For example,
/// Redis, Postgres, etc.
/// Run servers and stress tests:
/// cargo run --package xtask -- --stress-test
///
/// Note: test start with 'stress_test' will be run as stress tests
#[tokio::main]
async fn main() -> Result<()> {
let appflowy = "appflowy_cloud";
let worker = "appflowy_worker";
let is_stress_test = std::env::args().any(|arg| arg == "--stress-test");
let target_dir = "./target";
std::env::set_var("CARGO_TARGET_DIR", target_dir);
kill_existing_process(appflowy).await?;
kill_existing_process(worker).await?;
let appflowy_cloud_bin_name = "appflowy_cloud";
let worker_bin_name = "appflowy_worker";
let enable_runtime_profile = false;
let mut appflowy_cloud_cmd = Command::new("cargo");
// Step 1: Kill existing processes
kill_existing_process(appflowy_cloud_bin_name).await?;
kill_existing_process(worker_bin_name).await?;
appflowy_cloud_cmd
.env("RUSTFLAGS", "--cfg tokio_unstable")
.args(["run", "--features"]);
if enable_runtime_profile {
appflowy_cloud_cmd.args(["history,tokio-runtime-profile"]);
} else {
appflowy_cloud_cmd.args(["history"]);
}
// Step 2: Start servers sequentially
println!("Starting {} server...", appflowy_cloud_bin_name);
let mut appflowy_cloud_cmd = spawn_server(
"cargo",
&["run", "--features", "history"],
appflowy_cloud_bin_name,
is_stress_test,
)?;
wait_for_readiness(appflowy_cloud_bin_name).await?;
let mut appflowy_cloud_handle = appflowy_cloud_cmd
.spawn()
.context("Failed to start AppFlowy-Cloud process")?;
let mut appflowy_worker_handle = Command::new("cargo")
.args([
println!("Starting {} server...", worker_bin_name);
let mut appflowy_worker_cmd = spawn_server(
"cargo",
&[
"run",
"--manifest-path",
"./services/appflowy-worker/Cargo.toml",
])
.spawn()
.context("Failed to start AppFlowy-Worker process")?;
],
worker_bin_name,
is_stress_test,
)?;
wait_for_readiness(worker_bin_name).await?;
println!("All servers are up and running.");
// Step 3: Run stress tests if flag is set
let stress_test_cmd = if is_stress_test {
println!("Running stress tests (tests starting with 'stress_test')...");
Some(
Command::new("cargo")
.args(["test", "stress_test", "--", "--nocapture"])
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.context("Failed to start stress test process")?,
)
} else {
None
};
// Step 4: Monitor all processes
select! {
status = appflowy_cloud_handle.wait() => {
handle_process_exit(status?, appflowy)?;
status = appflowy_cloud_cmd.wait() => {
handle_process_exit(status?, worker_bin_name)?;
},
status = appflowy_worker_cmd.wait() => {
handle_process_exit(status?, worker_bin_name)?;
},
status = async {
if let Some(mut stress_cmd) = stress_test_cmd {
stress_cmd.wait().await
} else {
futures::future::pending().await
}
} => {
if is_stress_test {
handle_process_exit(status?, "cargo test stress_test")?;
}
},
status = appflowy_worker_handle.wait() => {
handle_process_exit(status?, worker)?;
}
}
Ok(())
}
fn spawn_server(
command: &str,
args: &[&str],
name: &str,
suppress_output: bool,
) -> Result<tokio::process::Child> {
println!("Spawning {} process...", name);
let mut cmd = Command::new(command);
cmd.args(args);
if suppress_output {
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
cmd
.spawn()
.context(format!("Failed to start {} process", name))
}
async fn kill_existing_process(process_identifier: &str) -> Result<()> {
let _ = Command::new("pkill")
.arg("-f")
@ -79,3 +130,10 @@ fn handle_process_exit(status: std::process::ExitStatus, process_name: &str) ->
))
}
}
async fn wait_for_readiness(process_name: &str) -> Result<()> {
println!("Waiting for {} to be ready...", process_name);
sleep(Duration::from_secs(3)).await;
println!("{} is ready.", process_name);
Ok(())
}