mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
chore: disable history service (#985)
* chore: disable history service * chore: fix build
This commit is contained in:
parent
85452ddfab
commit
9778843746
44 changed files with 28 additions and 356 deletions
8
.github/workflows/integration_test.yml
vendored
8
.github/workflows/integration_test.yml
vendored
|
@ -44,20 +44,17 @@ jobs:
|
|||
- name: Build Docker Images
|
||||
run: |
|
||||
export DOCKER_DEFAULT_PLATFORM=linux/amd64
|
||||
docker compose build appflowy_cloud appflowy_history appflowy_worker admin_frontend
|
||||
docker compose build appflowy_cloud appflowy_worker admin_frontend
|
||||
|
||||
- name: Push docker images to docker hub
|
||||
run: |
|
||||
docker tag appflowyinc/appflowy_cloud appflowyinc/appflowy_cloud:${GITHUB_SHA}
|
||||
docker tag appflowyinc/appflowy_history appflowyinc/appflowy_history:${GITHUB_SHA}
|
||||
docker tag appflowyinc/appflowy_worker appflowyinc/appflowy_worker:${GITHUB_SHA}
|
||||
docker tag appflowyinc/admin_frontend appflowyinc/admin_frontend:${GITHUB_SHA}
|
||||
echo ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} | docker login --username appflowyinc --password-stdin
|
||||
docker push appflowyinc/appflowy_cloud:${GITHUB_SHA}
|
||||
docker push appflowyinc/appflowy_history:${GITHUB_SHA}
|
||||
docker push appflowyinc/appflowy_worker:${GITHUB_SHA}
|
||||
docker push appflowyinc/admin_frontend:${GITHUB_SHA}
|
||||
APPFLOWY_HISTORY_VERSION=${GITHUB_SHA}
|
||||
APPFLOWY_WORKER_VERSION=${GITHUB_SHA}
|
||||
APPFLOWY_CLOUD_VERSION=${GITHUB_SHA}
|
||||
APPFLOWY_ADMIN_FRONTEND_VERSION=${GITHUB_SHA}
|
||||
|
@ -71,8 +68,6 @@ jobs:
|
|||
include:
|
||||
- test_service: "appflowy_cloud"
|
||||
test_cmd: "--workspace --exclude appflowy-history --exclude appflowy-ai-client --features ai-test-enabled"
|
||||
- test_service: "appflowy_history"
|
||||
test_cmd: "-p appflowy-history"
|
||||
- test_service: "appflowy_worker"
|
||||
test_cmd: "-p appflowy-worker"
|
||||
- test_service: "admin_frontend"
|
||||
|
@ -112,7 +107,6 @@ jobs:
|
|||
|
||||
- name: Run Docker-Compose
|
||||
run: |
|
||||
export APPFLOWY_HISTORY_VERSION=${GITHUB_SHA}
|
||||
export APPFLOWY_WORKER_VERSION=${GITHUB_SHA}
|
||||
export APPFLOWY_CLOUD_VERSION=${GITHUB_SHA}
|
||||
export APPFLOWY_ADMIN_FRONTEND_VERSION=${GITHUB_SHA}
|
||||
|
|
49
.github/workflows/push_latest_docker.yml
vendored
49
.github/workflows/push_latest_docker.yml
vendored
|
@ -238,16 +238,6 @@ jobs:
|
|||
if: always()
|
||||
run: docker logout
|
||||
|
||||
appflowy_history_image:
|
||||
runs-on: ubuntu-22.04
|
||||
env:
|
||||
IMAGE_NAME: ${{ secrets.DOCKER_HUB_USERNAME }}/appflowy_history
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
job:
|
||||
- { name: "amd64", docker_platform: "linux/amd64" }
|
||||
- { name: "arm64v8", docker_platform: "linux/arm64" }
|
||||
|
||||
steps:
|
||||
- name: Check out the repository
|
||||
|
@ -295,45 +285,6 @@ jobs:
|
|||
if: always()
|
||||
run: docker logout
|
||||
|
||||
appflowy_history_manifest:
|
||||
runs-on: ubuntu-22.04
|
||||
needs: [ appflowy_history_image ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
job:
|
||||
- { image_name: "appflowy_history" }
|
||||
|
||||
steps:
|
||||
- name: Log in to Docker Hub
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_HUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}
|
||||
|
||||
- name: Get git tag
|
||||
id: vars
|
||||
run: |
|
||||
T=${GITHUB_REF#refs/*/} # Remove "refs/*/" prefix from GITHUB_REF
|
||||
echo "GIT_TAG=$T" >> $GITHUB_ENV
|
||||
|
||||
- name: Create and push manifest for ${{ matrix.job.image_name }}:version
|
||||
uses: Noelware/docker-manifest-action@master
|
||||
with:
|
||||
inputs: ${{ secrets.DOCKER_HUB_USERNAME }}/${{ matrix.job.image_name }}:${{ env.GIT_TAG }}
|
||||
images: ${{ secrets.DOCKER_HUB_USERNAME }}/${{ matrix.job.image_name }}:${{ env.GIT_TAG }}-amd64,${{ secrets.DOCKER_HUB_USERNAME }}/${{ matrix.job.image_name }}:${{ env.GIT_TAG }}-arm64v8
|
||||
push: true
|
||||
|
||||
- name: Create and push manifest for ${{ matrix.job.image_name }}:latest
|
||||
uses: Noelware/docker-manifest-action@master
|
||||
with:
|
||||
inputs: ${{ secrets.DOCKER_HUB_USERNAME }}/${{ matrix.job.image_name }}:${{ env.LATEST_TAG }}
|
||||
images: ${{ secrets.DOCKER_HUB_USERNAME }}/${{ matrix.job.image_name }}:${{ env.LATEST_TAG }}-amd64,${{ secrets.DOCKER_HUB_USERNAME }}/${{ matrix.job.image_name }}:${{ env.LATEST_TAG }}-arm64v8
|
||||
push: true
|
||||
|
||||
- name: Logout from Docker Hub
|
||||
if: always()
|
||||
run: docker logout
|
||||
|
||||
appflowy_worker_image:
|
||||
runs-on: ubuntu-22.04
|
||||
|
|
80
Cargo.lock
generated
80
Cargo.lock
generated
|
@ -758,46 +758,6 @@ dependencies = [
|
|||
"yrs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "appflowy-history"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"assert-json-diff",
|
||||
"axum 0.7.5",
|
||||
"bincode",
|
||||
"chrono",
|
||||
"collab",
|
||||
"collab-entity",
|
||||
"collab-stream",
|
||||
"dashmap 5.5.3",
|
||||
"database",
|
||||
"dotenvy",
|
||||
"futures",
|
||||
"infra",
|
||||
"log",
|
||||
"prost",
|
||||
"rand 0.8.5",
|
||||
"redis 0.25.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_repr",
|
||||
"serial_test",
|
||||
"sqlx",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tonic-proto",
|
||||
"tower",
|
||||
"tower-http",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "appflowy-worker"
|
||||
version = "0.1.0"
|
||||
|
@ -6314,15 +6274,6 @@ dependencies = [
|
|||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scc"
|
||||
version = "2.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "05ccfb12511cdb770157ace92d7dda771e498445b78f9886e8cdbc5140a4eced"
|
||||
dependencies = [
|
||||
"sdd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.23"
|
||||
|
@ -6377,12 +6328,6 @@ dependencies = [
|
|||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sdd"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "177258b64c0faaa9ffd3c65cd3262c2bc7e2588dbbd9c1641d0346145c1bbda8"
|
||||
|
||||
[[package]]
|
||||
name = "seahash"
|
||||
version = "4.1.0"
|
||||
|
@ -6560,31 +6505,6 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test"
|
||||
version = "3.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b4b487fe2acf240a021cf57c6b2b4903b1e78ca0ecd862a71b71d2a51fed77d"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.3",
|
||||
"scc",
|
||||
"serial_test_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test_derive"
|
||||
version = "3.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.72",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "servo_arc"
|
||||
version = "0.3.0"
|
||||
|
|
|
@ -215,7 +215,7 @@ members = [
|
|||
"libs/appflowy-ai-client",
|
||||
"libs/client-api-entity",
|
||||
# services
|
||||
"services/appflowy-history",
|
||||
#"services/appflowy-history",
|
||||
"services/appflowy-collaborate",
|
||||
"services/appflowy-worker",
|
||||
# xtask
|
||||
|
|
|
@ -147,11 +147,6 @@ APPFLOWY_AI_SERVER_HOST=ai
|
|||
APPFLOWY_AI_DATABASE_URL=postgresql+psycopg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
|
||||
APPFLOWY_LOCAL_AI_TEST_ENABLED=false
|
||||
|
||||
# AppFlowy History
|
||||
APPFLOWY_GRPC_HISTORY_ADDRS=http://localhost:50051
|
||||
APPFLOWY_HISTORY_REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT}
|
||||
APPFLOWY_HISTORY_DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
|
||||
|
||||
# AppFlowy Indexer
|
||||
APPFLOWY_INDEXER_ENABLED=true
|
||||
APPFLOWY_INDEXER_DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
|
||||
|
|
5
dev.env
5
dev.env
|
@ -113,11 +113,6 @@ APPFLOWY_AI_SERVER_HOST=localhost
|
|||
APPFLOWY_AI_DATABASE_URL=postgresql+psycopg://postgres:password@postgres:5432/postgres
|
||||
APPFLOWY_LOCAL_AI_TEST_ENABLED=false
|
||||
|
||||
# AppFlowy History
|
||||
APPFLOWY_GRPC_HISTORY_ADDRS=http://localhost:50051
|
||||
APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
|
||||
APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
|
||||
|
||||
# AppFlowy Indexer
|
||||
APPFLOWY_INDEXER_ENABLED=true
|
||||
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
|
||||
|
|
|
@ -148,20 +148,6 @@ services:
|
|||
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
|
||||
- APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL}
|
||||
|
||||
appflowy_history:
|
||||
restart: on-failure
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ./services/appflowy-history/Dockerfile
|
||||
image: appflowyinc/appflowy_history:${APPFLOWY_HISTORY_VERSION:-latest}
|
||||
ports:
|
||||
- "50051:50051"
|
||||
environment:
|
||||
- RUST_LOG=${RUST_LOG:-info}
|
||||
- APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
|
||||
- APPFLOWY_HISTORY_ENVIRONMENT=production
|
||||
- APPFLOWY_HISTORY_DATABASE_URL=${APPFLOWY_HISTORY_DATABASE_URL}
|
||||
|
||||
appflowy_worker:
|
||||
restart: on-failure
|
||||
image: appflowyinc/appflowy_worker:${APPFLOWY_WORKER_VERSION:-latest}
|
||||
|
|
|
@ -141,18 +141,6 @@ services:
|
|||
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
|
||||
- APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL}
|
||||
|
||||
appflowy_history:
|
||||
restart: on-failure
|
||||
image: appflowyinc/appflowy_history:${APPFLOWY_HISTORY_VERSION:-latest}
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ./services/appflowy-history/Dockerfile
|
||||
environment:
|
||||
- RUST_LOG=${RUST_LOG:-info}
|
||||
- APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
|
||||
- APPFLOWY_HISTORY_ENVIRONMENT=production
|
||||
- APPFLOWY_HISTORY_DATABASE_URL=${APPFLOWY_HISTORY_DATABASE_URL}
|
||||
|
||||
appflowy_worker:
|
||||
restart: on-failure
|
||||
image: appflowyinc/appflowy_worker:${APPFLOWY_WORKER_VERSION:-latest}
|
||||
|
|
|
@ -48,7 +48,6 @@ else
|
|||
export RUST_LOG=trace
|
||||
export APPFLOWY_CLOUD_VERSION=$IMAGE_VERSION
|
||||
export APPFLOWY_WORKER_VERSION=$IMAGE_VERSION
|
||||
export APPFLOWY_HISTORY_VERSION=$IMAGE_VERSION
|
||||
export APPFLOWY_ADMIN_FRONTEND_VERSION=$IMAGE_VERSION
|
||||
docker compose -f docker-compose-ci.yml pull
|
||||
|
||||
|
|
|
@ -74,7 +74,6 @@ pub async fn run_actix_server(
|
|||
)),
|
||||
state.metrics.realtime_metrics.clone(),
|
||||
rt_cmd_recv,
|
||||
state.redis_connection_manager.clone(),
|
||||
Duration::from_secs(config.collab.group_persistence_interval_secs),
|
||||
config.collab.edit_state_max_count,
|
||||
config.collab.edit_state_max_secs,
|
||||
|
|
|
@ -47,7 +47,6 @@ pub struct CollabBroadcast {
|
|||
edit_state: Arc<EditState>,
|
||||
/// The last modified time of the document.
|
||||
pub modified_at: Arc<parking_lot::Mutex<Instant>>,
|
||||
update_streaming: Arc<dyn CollabUpdateStreaming>,
|
||||
}
|
||||
|
||||
unsafe impl Send for CollabBroadcast {}
|
||||
|
@ -71,9 +70,7 @@ impl CollabBroadcast {
|
|||
buffer_capacity: usize,
|
||||
edit_state: Arc<EditState>,
|
||||
collab: &Collab,
|
||||
update_streaming: impl CollabUpdateStreaming,
|
||||
) -> Self {
|
||||
let update_streaming = Arc::new(update_streaming);
|
||||
let object_id = object_id.to_owned();
|
||||
// broadcast channel
|
||||
let (sender, _) = channel(buffer_capacity);
|
||||
|
@ -84,7 +81,6 @@ impl CollabBroadcast {
|
|||
doc_subscription: Default::default(),
|
||||
edit_state,
|
||||
modified_at: Arc::new(parking_lot::Mutex::new(Instant::now())),
|
||||
update_streaming,
|
||||
};
|
||||
this.observe_collab_changes(collab);
|
||||
this
|
||||
|
@ -97,7 +93,6 @@ impl CollabBroadcast {
|
|||
let broadcast_sink = self.broadcast_sender.clone();
|
||||
let modified_at = self.modified_at.clone();
|
||||
let edit_state = self.edit_state.clone();
|
||||
let update_streaming = self.update_streaming.clone();
|
||||
|
||||
// Observer the document's update and broadcast it to all subscribers. When one of the clients
|
||||
// sends an update to the document that alters its state, the document observer will trigger
|
||||
|
@ -115,10 +110,6 @@ impl CollabBroadcast {
|
|||
origin
|
||||
);
|
||||
|
||||
let stream_update = event.update.clone();
|
||||
if let Err(err) = update_streaming.send_update(stream_update) {
|
||||
warn!("fail to send updates to redis:{}", err)
|
||||
}
|
||||
let payload = gen_update_message(&event.update);
|
||||
let msg = BroadcastSync::new(origin, cloned_oid.clone(), payload, seq_num);
|
||||
if let Err(err) = broadcast_sink.send(msg.into()) {
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::fmt::Display;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
@ -12,22 +11,18 @@ use collab_entity::CollabType;
|
|||
use dashmap::DashMap;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{error, event, info, trace};
|
||||
use yrs::updates::decoder::Decode;
|
||||
use yrs::updates::encoder::Encode;
|
||||
use yrs::Update;
|
||||
use tracing::{event, info, trace};
|
||||
|
||||
use collab_rt_entity::user::RealtimeUser;
|
||||
use collab_rt_entity::CollabMessage;
|
||||
use collab_rt_entity::MessageByObjectId;
|
||||
use collab_stream::client::CollabRedisStream;
|
||||
|
||||
use collab_stream::error::StreamError;
|
||||
use collab_stream::model::{CollabUpdateEvent, StreamBinary};
|
||||
use collab_stream::stream_group::StreamGroup;
|
||||
|
||||
use database::collab::CollabStorage;
|
||||
|
||||
use crate::error::RealtimeError;
|
||||
use crate::group::broadcast::{CollabBroadcast, CollabUpdateStreaming, Subscription};
|
||||
use crate::group::broadcast::{CollabBroadcast, Subscription};
|
||||
use crate::group::persistence::GroupPersistence;
|
||||
use crate::indexer::Indexer;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
|
@ -65,7 +60,6 @@ impl CollabGroup {
|
|||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
storage: Arc<S>,
|
||||
is_new_collab: bool,
|
||||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
persistence_interval: Duration,
|
||||
edit_state_max_count: u32,
|
||||
edit_state_max_secs: i64,
|
||||
|
@ -81,13 +75,7 @@ impl CollabGroup {
|
|||
));
|
||||
let broadcast = {
|
||||
let lock = collab.read().await;
|
||||
CollabBroadcast::new(
|
||||
&object_id,
|
||||
10,
|
||||
edit_state.clone(),
|
||||
&lock,
|
||||
CollabUpdateStreamingImpl::new(&workspace_id, &object_id, &collab_redis_stream).await?,
|
||||
)
|
||||
CollabBroadcast::new(&object_id, 10, edit_state.clone(), &lock)
|
||||
};
|
||||
let (destroy_group_tx, rx) = mpsc::channel(1);
|
||||
|
||||
|
@ -382,82 +370,6 @@ impl EditState {
|
|||
}
|
||||
}
|
||||
|
||||
struct CollabUpdateStreamingImpl {
|
||||
sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
stopped: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl CollabUpdateStreamingImpl {
|
||||
async fn new(
|
||||
workspace_id: &str,
|
||||
object_id: &str,
|
||||
collab_redis_stream: &CollabRedisStream,
|
||||
) -> Result<Self, StreamError> {
|
||||
let stream = collab_redis_stream
|
||||
.collab_update_stream(workspace_id, object_id, "collaborate_update_producer")
|
||||
.await?;
|
||||
let stopped = Arc::new(AtomicBool::new(false));
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
let cloned_stopped = stopped.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = Self::consume_messages(receiver, stream).await {
|
||||
error!("Failed to consume incoming updates: {}", err);
|
||||
}
|
||||
cloned_stopped.store(true, Ordering::SeqCst);
|
||||
});
|
||||
Ok(Self { sender, stopped })
|
||||
}
|
||||
|
||||
async fn consume_messages(
|
||||
mut receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
mut stream: StreamGroup,
|
||||
) -> Result<(), RealtimeError> {
|
||||
while let Some(update) = receiver.recv().await {
|
||||
let mut update_count = 1;
|
||||
let update = {
|
||||
let mut updates = VecDeque::new();
|
||||
// there may be already more messages inside waiting, try to read them all right away
|
||||
while let Ok(update) = receiver.try_recv() {
|
||||
updates.push_back(Update::decode_v1(&update)?);
|
||||
}
|
||||
if updates.is_empty() {
|
||||
update // no following messages
|
||||
} else {
|
||||
update_count += updates.len();
|
||||
// prepend first update and merge them all together
|
||||
updates.push_front(Update::decode_v1(&update)?);
|
||||
Update::merge_updates(updates).encode_v1()
|
||||
}
|
||||
};
|
||||
|
||||
let msg = StreamBinary::try_from(CollabUpdateEvent::UpdateV1 {
|
||||
encode_update: update,
|
||||
})?;
|
||||
stream.insert_messages(vec![msg]).await?;
|
||||
trace!("Sent cumulative ({}) collab update to redis", update_count);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_stopped(&self) -> bool {
|
||||
self.stopped.load(Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
impl CollabUpdateStreaming for CollabUpdateStreamingImpl {
|
||||
fn send_update(&self, update: Vec<u8>) -> Result<(), RealtimeError> {
|
||||
if self.is_stopped() {
|
||||
Err(RealtimeError::Internal(anyhow::anyhow!(
|
||||
"stream stopped processing incoming updates"
|
||||
)))
|
||||
} else if let Err(err) = self.sender.send(update) {
|
||||
Err(RealtimeError::Internal(err.into()))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::group::group_init::EditState;
|
||||
|
|
|
@ -4,18 +4,16 @@ use std::time::Duration;
|
|||
use collab::core::collab::DataSource;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab::lock::{Mutex, RwLock};
|
||||
use collab::lock::RwLock;
|
||||
use collab::preclude::Collab;
|
||||
use collab_entity::CollabType;
|
||||
use tracing::{error, instrument, trace};
|
||||
use tracing::{instrument, trace};
|
||||
|
||||
use access_control::collab::RealtimeAccessControl;
|
||||
use app_error::AppError;
|
||||
use collab_rt_entity::user::RealtimeUser;
|
||||
use collab_rt_entity::CollabMessage;
|
||||
use collab_stream::client::{CollabRedisStream, CONTROL_STREAM_KEY};
|
||||
use collab_stream::model::CollabControlEvent;
|
||||
use collab_stream::stream_group::StreamGroup;
|
||||
|
||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||
use database_entity::dto::QueryCollabParams;
|
||||
|
||||
|
@ -31,8 +29,6 @@ pub struct GroupManager<S> {
|
|||
storage: Arc<S>,
|
||||
access_control: Arc<dyn RealtimeAccessControl>,
|
||||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
control_event_stream: Arc<Mutex<StreamGroup>>,
|
||||
persistence_interval: Duration,
|
||||
edit_state_max_count: u32,
|
||||
edit_state_max_secs: i64,
|
||||
|
@ -48,25 +44,16 @@ where
|
|||
storage: Arc<S>,
|
||||
access_control: Arc<dyn RealtimeAccessControl>,
|
||||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
collab_stream: CollabRedisStream,
|
||||
persistence_interval: Duration,
|
||||
edit_state_max_count: u32,
|
||||
edit_state_max_secs: i64,
|
||||
indexer_provider: Arc<IndexerProvider>,
|
||||
) -> Result<Self, RealtimeError> {
|
||||
let collab_stream = Arc::new(collab_stream);
|
||||
let control_event_stream = collab_stream
|
||||
.collab_control_stream(CONTROL_STREAM_KEY, "collaboration")
|
||||
.await
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
let control_event_stream = Arc::new(Mutex::from(control_event_stream));
|
||||
Ok(Self {
|
||||
state: GroupManagementState::new(metrics_calculate.clone(), control_event_stream.clone()),
|
||||
state: GroupManagementState::new(metrics_calculate.clone()),
|
||||
storage,
|
||||
access_control,
|
||||
metrics_calculate,
|
||||
collab_redis_stream: collab_stream,
|
||||
control_event_stream,
|
||||
persistence_interval,
|
||||
edit_state_max_count,
|
||||
edit_state_max_secs,
|
||||
|
@ -162,7 +149,7 @@ where
|
|||
}
|
||||
|
||||
let result = load_collab(user.uid, object_id, params, self.storage.clone()).await;
|
||||
let (collab, encode_collab) = {
|
||||
let (collab, _encode_collab) = {
|
||||
let (mut collab, encode_collab) = match result {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
|
@ -184,26 +171,6 @@ where
|
|||
(collab, encode_collab)
|
||||
};
|
||||
|
||||
let cloned_control_event_stream = self.control_event_stream.clone();
|
||||
let open_event = CollabControlEvent::Open {
|
||||
workspace_id: workspace_id.to_string(),
|
||||
object_id: object_id.to_string(),
|
||||
collab_type: collab_type.clone(),
|
||||
doc_state: encode_collab.doc_state.to_vec(),
|
||||
};
|
||||
trace!("Send control event: {}", open_event);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = cloned_control_event_stream
|
||||
.lock()
|
||||
.await
|
||||
.insert_message(open_event)
|
||||
.await
|
||||
{
|
||||
error!("Failed to insert open event to control stream: {}", err);
|
||||
}
|
||||
});
|
||||
|
||||
trace!(
|
||||
"[realtime]: create group: uid:{},workspace_id:{},object_id:{}:{}",
|
||||
user.uid,
|
||||
|
@ -233,7 +200,6 @@ where
|
|||
self.metrics_calculate.clone(),
|
||||
self.storage.clone(),
|
||||
is_new_collab,
|
||||
self.collab_redis_stream.clone(),
|
||||
self.persistence_interval,
|
||||
self.edit_state_max_count,
|
||||
self.edit_state_max_secs,
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use collab::lock::Mutex;
|
||||
use dashmap::mapref::one::RefMut;
|
||||
use dashmap::try_result::TryResult;
|
||||
use dashmap::DashMap;
|
||||
|
@ -13,8 +12,6 @@ use crate::error::RealtimeError;
|
|||
use crate::group::group_init::CollabGroup;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
use collab_rt_entity::user::RealtimeUser;
|
||||
use collab_stream::model::CollabControlEvent;
|
||||
use collab_stream::stream_group::StreamGroup;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct GroupManagementState {
|
||||
|
@ -24,14 +21,10 @@ pub(crate) struct GroupManagementState {
|
|||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
/// By default, the number of groups to remove in a single batch is 50.
|
||||
remove_batch_size: usize,
|
||||
control_event_stream: Arc<Mutex<StreamGroup>>,
|
||||
}
|
||||
|
||||
impl GroupManagementState {
|
||||
pub(crate) fn new(
|
||||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
control_event_stream: Arc<Mutex<StreamGroup>>,
|
||||
) -> Self {
|
||||
pub(crate) fn new(metrics_calculate: Arc<CollabRealtimeMetrics>) -> Self {
|
||||
let remove_batch_size = get_env_var("APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE", "50")
|
||||
.parse::<usize>()
|
||||
.unwrap_or(50);
|
||||
|
@ -40,7 +33,6 @@ impl GroupManagementState {
|
|||
editing_by_user: Arc::new(DashMap::new()),
|
||||
metrics_calculate,
|
||||
remove_batch_size,
|
||||
control_event_stream,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,18 +119,6 @@ impl GroupManagementState {
|
|||
pub(crate) async fn remove_group(&self, object_id: &str) {
|
||||
let entry = self.group_by_object_id.remove(object_id);
|
||||
|
||||
if let Err(err) = self
|
||||
.control_event_stream
|
||||
.lock()
|
||||
.await
|
||||
.insert_message(CollabControlEvent::Close {
|
||||
object_id: object_id.to_string(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("Failed to insert close event to control stream: {}", err);
|
||||
}
|
||||
|
||||
if let Some(entry) = entry {
|
||||
let group = entry.1;
|
||||
group.stop().await;
|
||||
|
|
|
@ -13,7 +13,7 @@ use tracing::{error, info, trace};
|
|||
use access_control::collab::RealtimeAccessControl;
|
||||
use collab_rt_entity::user::{RealtimeUser, UserDevice};
|
||||
use collab_rt_entity::MessageByObjectId;
|
||||
use collab_stream::client::CollabRedisStream;
|
||||
|
||||
use database::collab::CollabStorage;
|
||||
|
||||
use crate::client::client_msg_router::ClientMessageRouter;
|
||||
|
@ -26,7 +26,7 @@ use crate::group::manager::GroupManager;
|
|||
use crate::indexer::IndexerProvider;
|
||||
use crate::metrics::spawn_metrics;
|
||||
use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME;
|
||||
use crate::state::RedisConnectionManager;
|
||||
|
||||
use crate::{CollabRealtimeMetrics, RealtimeClientWebsocketSink};
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -50,7 +50,6 @@ where
|
|||
access_control: Arc<dyn RealtimeAccessControl>,
|
||||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
command_recv: CLCommandReceiver,
|
||||
redis_connection_manager: RedisConnectionManager,
|
||||
group_persistence_interval: Duration,
|
||||
edit_state_max_count: u32,
|
||||
edit_state_max_secs: i64,
|
||||
|
@ -67,13 +66,11 @@ where
|
|||
}
|
||||
|
||||
let connect_state = ConnectState::new();
|
||||
let collab_stream = CollabRedisStream::new_with_connection_manager(redis_connection_manager);
|
||||
let group_manager = Arc::new(
|
||||
GroupManager::new(
|
||||
storage.clone(),
|
||||
access_control.clone(),
|
||||
metrics.clone(),
|
||||
collab_stream,
|
||||
group_persistence_interval,
|
||||
edit_state_max_count,
|
||||
edit_state_max_secs,
|
||||
|
|
|
@ -132,7 +132,6 @@ pub async fn run_actix_server(
|
|||
state.realtime_access_control.clone(),
|
||||
state.metrics.realtime_metrics.clone(),
|
||||
rt_cmd_recv,
|
||||
state.redis_connection_manager.clone(),
|
||||
Duration::from_secs(config.collab.group_persistence_interval_secs),
|
||||
config.collab.edit_state_max_count,
|
||||
config.collab.edit_state_max_secs,
|
||||
|
|
|
@ -24,16 +24,16 @@ async fn main() -> Result<()> {
|
|||
.spawn()
|
||||
.context("Failed to start AppFlowy-Cloud process")?;
|
||||
|
||||
let mut appflowy_history_cmd = Command::new("cargo")
|
||||
.args([
|
||||
"run",
|
||||
// "--features",
|
||||
// "verbose_log",
|
||||
"--manifest-path",
|
||||
"./services/appflowy-history/Cargo.toml",
|
||||
])
|
||||
.spawn()
|
||||
.context("Failed to start AppFlowy-History process")?;
|
||||
// let mut appflowy_history_cmd = Command::new("cargo")
|
||||
// .args([
|
||||
// "run",
|
||||
// // "--features",
|
||||
// // "verbose_log",
|
||||
// "--manifest-path",
|
||||
// "./services/appflowy-history/Cargo.toml",
|
||||
// ])
|
||||
// .spawn()
|
||||
// .context("Failed to start AppFlowy-History process")?;
|
||||
|
||||
let mut appflowy_worker_cmd = Command::new("cargo")
|
||||
.args([
|
||||
|
@ -48,9 +48,9 @@ async fn main() -> Result<()> {
|
|||
status = appflowy_cloud_cmd.wait() => {
|
||||
handle_process_exit(status?, appflowy_cloud_bin_name)?;
|
||||
},
|
||||
status = appflowy_history_cmd.wait() => {
|
||||
handle_process_exit(status?, history)?;
|
||||
}
|
||||
// status = appflowy_history_cmd.wait() => {
|
||||
// handle_process_exit(status?, history)?;
|
||||
// }
|
||||
status = appflowy_worker_cmd.wait() => {
|
||||
handle_process_exit(status?, worker)?;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue