mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
Merge pull request #1273 from AppFlowy-IO/awareness-pubsub
chore: change awareness broadcasting from stream to pubsub
This commit is contained in:
commit
b2ceb30037
12 changed files with 216 additions and 148 deletions
107
libs/collab-stream/src/awareness_gossip.rs
Normal file
107
libs/collab-stream/src/awareness_gossip.rs
Normal file
|
@ -0,0 +1,107 @@
|
|||
use crate::error::StreamError;
|
||||
use crate::model::AwarenessStreamUpdate;
|
||||
use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
use redis::aio::MultiplexedConnection;
|
||||
use redis::{AsyncCommands, Client};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
pub struct AwarenessGossip {
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl AwarenessGossip {
|
||||
/// Returns Redis stream key, that's storing entries mapped to/from [AwarenessStreamUpdate].
|
||||
pub fn publish_key(workspace_id: &str, object_id: &str) -> String {
|
||||
format!("af:awareness:{}:{}", workspace_id, object_id)
|
||||
}
|
||||
|
||||
pub fn state_key(workspace_id: &str, object_id: &str) -> String {
|
||||
format!("af:awareness_state:{}:{}", workspace_id, object_id)
|
||||
}
|
||||
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub async fn sink(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
object_id: &str,
|
||||
) -> Result<AwarenessUpdateSink, StreamError> {
|
||||
tracing::trace!("publishing awareness state for object {}", object_id);
|
||||
let mut conn = self.client.get_multiplexed_async_connection().await?;
|
||||
// delete existing redis stream from previous versions
|
||||
let _: redis::Value = conn
|
||||
.del(format!("af:{}:{}:awareness", workspace_id, object_id))
|
||||
.await?;
|
||||
let sink = AwarenessUpdateSink::new(conn, workspace_id, object_id);
|
||||
Ok(sink)
|
||||
}
|
||||
|
||||
pub fn awareness_stream(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
object_id: &str,
|
||||
) -> impl Stream<Item = Result<AwarenessStreamUpdate, StreamError>> + 'static {
|
||||
let client = self.client.clone();
|
||||
let pubsub_key = Self::publish_key(workspace_id, object_id);
|
||||
try_stream! {
|
||||
let mut pubsub = client.get_async_pubsub().await?;
|
||||
pubsub.psubscribe(pubsub_key.clone()).await?; // try to open subscription
|
||||
{
|
||||
// from now on, we shouldn't throw any errors here, otherwise punsubscribe won't be called
|
||||
let mut stream = pubsub.on_message();
|
||||
while let Some(msg) = stream.next().await {
|
||||
let update = Self::parse_update(msg)?;
|
||||
yield update;
|
||||
}
|
||||
}
|
||||
tracing::trace!("unsubscribing from awareness stream {}", pubsub_key);
|
||||
pubsub.punsubscribe(pubsub_key).await?; // close subscription gracefully
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_update(msg: redis::Msg) -> Result<AwarenessStreamUpdate, StreamError> {
|
||||
let channel_name = msg.get_channel_name();
|
||||
let payload = msg.get_payload_bytes();
|
||||
let update = serde_json::from_slice::<AwarenessStreamUpdate>(payload)
|
||||
.map_err(StreamError::SerdeJsonError)?;
|
||||
tracing::trace!("received awareness stream update for {}", channel_name);
|
||||
Ok(update)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AwarenessUpdateSink {
|
||||
conn: Mutex<MultiplexedConnection>,
|
||||
publish_key: String,
|
||||
}
|
||||
|
||||
impl AwarenessUpdateSink {
|
||||
pub fn new(conn: MultiplexedConnection, workspace_id: &str, object_id: &str) -> Self {
|
||||
let publish_key = AwarenessGossip::publish_key(workspace_id, object_id);
|
||||
AwarenessUpdateSink {
|
||||
conn: conn.into(),
|
||||
publish_key,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&self, msg: &AwarenessStreamUpdate) -> Result<(), StreamError> {
|
||||
let mut conn = self.conn.lock().await;
|
||||
Self::notify_awareness_change(&mut conn, &self.publish_key, msg).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a Redis pub-sub message to notify other clients about the awareness change.
|
||||
async fn notify_awareness_change(
|
||||
conn: &mut MultiplexedConnection,
|
||||
pubsub_key: &str,
|
||||
update: &AwarenessStreamUpdate,
|
||||
) -> Result<(), StreamError> {
|
||||
tracing::trace!("notify awareness change for {}: {:?}", pubsub_key, update);
|
||||
let json = serde_json::to_string(update)?;
|
||||
let _: redis::Value = conn.publish(pubsub_key, json).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
use crate::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
|
||||
use crate::awareness_gossip::{AwarenessGossip, AwarenessUpdateSink};
|
||||
use crate::collab_update_sink::CollabUpdateSink;
|
||||
use crate::error::{internal, StreamError};
|
||||
use crate::lease::{Lease, LeaseAcquisition};
|
||||
use crate::metrics::CollabStreamMetrics;
|
||||
|
@ -17,6 +18,7 @@ use tracing::error;
|
|||
pub struct CollabRedisStream {
|
||||
connection_manager: ConnectionManager,
|
||||
stream_router: Arc<StreamRouter>,
|
||||
awareness_gossip: Arc<AwarenessGossip>,
|
||||
}
|
||||
|
||||
impl CollabRedisStream {
|
||||
|
@ -37,20 +39,24 @@ impl CollabRedisStream {
|
|||
metrics,
|
||||
router_options,
|
||||
)?);
|
||||
let awareness_gossip = Arc::new(AwarenessGossip::new(redis_client.clone()));
|
||||
let connection_manager = redis_client.get_connection_manager().await?;
|
||||
Ok(Self::new_with_connection_manager(
|
||||
connection_manager,
|
||||
stream_router,
|
||||
awareness_gossip,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn new_with_connection_manager(
|
||||
connection_manager: ConnectionManager,
|
||||
stream_router: Arc<StreamRouter>,
|
||||
awareness_gossip: Arc<AwarenessGossip>,
|
||||
) -> Self {
|
||||
Self {
|
||||
connection_manager,
|
||||
stream_router,
|
||||
awareness_gossip,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,9 +118,12 @@ impl CollabRedisStream {
|
|||
CollabUpdateSink::new(self.connection_manager.clone(), stream_key)
|
||||
}
|
||||
|
||||
pub fn awareness_update_sink(&self, workspace_id: &str, object_id: &str) -> AwarenessUpdateSink {
|
||||
let stream_key = AwarenessStreamUpdate::stream_key(workspace_id, object_id);
|
||||
AwarenessUpdateSink::new(self.connection_manager.clone(), stream_key)
|
||||
pub async fn awareness_update_sink(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
object_id: &str,
|
||||
) -> Result<AwarenessUpdateSink, StreamError> {
|
||||
self.awareness_gossip.sink(workspace_id, object_id).await
|
||||
}
|
||||
|
||||
/// Reads all collab updates for a given `workspace_id`:`object_id` entry, starting
|
||||
|
@ -168,18 +177,10 @@ impl CollabRedisStream {
|
|||
&self,
|
||||
workspace_id: &str,
|
||||
object_id: &str,
|
||||
since: Option<MessageId>,
|
||||
) -> impl Stream<Item = Result<AwarenessStreamUpdate, StreamError>> {
|
||||
let stream_key = AwarenessStreamUpdate::stream_key(workspace_id, object_id);
|
||||
let since = since.map(|id| id.to_string());
|
||||
let mut reader = self.stream_router.observe(stream_key, since);
|
||||
async_stream::try_stream! {
|
||||
while let Some((message_id, fields)) = reader.recv().await {
|
||||
tracing::trace!("incoming awareness update `{}`", message_id);
|
||||
let awareness_update = AwarenessStreamUpdate::try_from(fields)?;
|
||||
yield awareness_update;
|
||||
}
|
||||
}
|
||||
self
|
||||
.awareness_gossip
|
||||
.awareness_stream(workspace_id, object_id)
|
||||
}
|
||||
|
||||
pub async fn prune_update_stream(
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::error::StreamError;
|
||||
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
|
||||
use crate::model::{CollabStreamUpdate, MessageId};
|
||||
use redis::aio::ConnectionManager;
|
||||
use redis::cmd;
|
||||
use tokio::sync::Mutex;
|
||||
|
@ -33,34 +33,3 @@ impl CollabUpdateSink {
|
|||
Ok(msg_id)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AwarenessUpdateSink {
|
||||
conn: Mutex<ConnectionManager>,
|
||||
stream_key: String,
|
||||
}
|
||||
|
||||
impl AwarenessUpdateSink {
|
||||
pub fn new(conn: ConnectionManager, stream_key: String) -> Self {
|
||||
AwarenessUpdateSink {
|
||||
conn: conn.into(),
|
||||
stream_key,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&self, msg: &AwarenessStreamUpdate) -> Result<MessageId, StreamError> {
|
||||
let mut lock = self.conn.lock().await;
|
||||
let msg_id: MessageId = cmd("XADD")
|
||||
.arg(&self.stream_key)
|
||||
.arg("MAXLEN")
|
||||
.arg("~")
|
||||
.arg(100) // we cap awareness stream to at most 20 awareness updates
|
||||
.arg("*")
|
||||
.arg("sender")
|
||||
.arg(msg.sender.to_string())
|
||||
.arg("data")
|
||||
.arg(&*msg.data)
|
||||
.query_async(&mut *lock)
|
||||
.await?;
|
||||
Ok(msg_id)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
pub mod awareness_gossip;
|
||||
pub mod client;
|
||||
pub mod collab_update_sink;
|
||||
pub mod error;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::error::{internal, StreamError};
|
||||
use bytes::Bytes;
|
||||
use collab::core::awareness::AwarenessUpdate;
|
||||
use collab::core::origin::{CollabClient, CollabOrigin};
|
||||
use collab::preclude::updates::decoder::Decode;
|
||||
use collab_entity::proto::collab::collab_update_event::Update;
|
||||
|
@ -433,39 +434,14 @@ impl TryFrom<HashMap<String, redis::Value>> for CollabStreamUpdate {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AwarenessStreamUpdate {
|
||||
pub data: Vec<u8>, // AwarenessUpdate::encode_v1
|
||||
pub data: AwarenessUpdate,
|
||||
pub sender: CollabOrigin,
|
||||
}
|
||||
|
||||
impl AwarenessStreamUpdate {
|
||||
/// Returns Redis stream key, that's storing entries mapped to/from [AwarenessStreamUpdate].
|
||||
pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
|
||||
format!("af:{}:{}:awareness", workspace_id, object_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<HashMap<String, redis::Value>> for AwarenessStreamUpdate {
|
||||
type Error = StreamError;
|
||||
|
||||
fn try_from(fields: HashMap<String, Value>) -> Result<Self, Self::Error> {
|
||||
let sender = match fields.get("sender") {
|
||||
None => CollabOrigin::Empty,
|
||||
Some(sender) => {
|
||||
let raw_origin = String::from_redis_value(sender)?;
|
||||
collab_origin_from_str(&raw_origin)?
|
||||
},
|
||||
};
|
||||
let data_raw = fields
|
||||
.get("data")
|
||||
.ok_or_else(|| internal("expecting field `data`"))?;
|
||||
let data: Vec<u8> = FromRedisValue::from_redis_value(data_raw)?;
|
||||
Ok(AwarenessStreamUpdate { data, sender })
|
||||
}
|
||||
}
|
||||
|
||||
//FIXME: this should be `impl FromStr for CollabOrigin`
|
||||
fn collab_origin_from_str(value: &str) -> RedisResult<CollabOrigin> {
|
||||
pub fn collab_origin_from_str(value: &str) -> RedisResult<CollabOrigin> {
|
||||
match value {
|
||||
"" => Ok(CollabOrigin::Empty),
|
||||
"server" => Ok(CollabOrigin::Server),
|
||||
|
|
|
@ -24,6 +24,7 @@ use crate::actix_ws::server::RealtimeServerActor;
|
|||
use crate::api::{collab_scope, ws_scope};
|
||||
use crate::collab::access_control::CollabStorageAccessControlImpl;
|
||||
use access_control::casbin::access::AccessControl;
|
||||
use collab_stream::awareness_gossip::AwarenessGossip;
|
||||
use collab_stream::metrics::CollabStreamMetrics;
|
||||
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
|
||||
use database::file::s3_client_impl::AwsS3BucketClientImpl;
|
||||
|
@ -82,6 +83,7 @@ pub async fn run_actix_server(
|
|||
state.metrics.realtime_metrics.clone(),
|
||||
rt_cmd_recv,
|
||||
state.redis_stream_router.clone(),
|
||||
state.awareness_gossip.clone(),
|
||||
state.redis_connection_manager.clone(),
|
||||
Duration::from_secs(config.collab.group_persistence_interval_secs),
|
||||
Duration::from_secs(config.collab.group_prune_grace_period_secs),
|
||||
|
@ -111,7 +113,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
|||
let user_cache = UserCache::new(pg_pool.clone()).await;
|
||||
|
||||
info!("Connecting to Redis...");
|
||||
let (redis_conn_manager, redis_stream_router) = get_redis_client(
|
||||
let (redis_conn_manager, redis_stream_router, awareness_gossip) = get_redis_client(
|
||||
config.redis_uri.expose_secret(),
|
||||
config.redis_worker_count,
|
||||
metrics.collab_stream_metrics.clone(),
|
||||
|
@ -184,6 +186,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
|||
pg_listeners,
|
||||
user_cache,
|
||||
redis_stream_router,
|
||||
awareness_gossip,
|
||||
redis_connection_manager: redis_conn_manager,
|
||||
access_control,
|
||||
collab_access_control_storage: collab_storage,
|
||||
|
@ -197,10 +200,19 @@ async fn get_redis_client(
|
|||
redis_uri: &str,
|
||||
worker_count: usize,
|
||||
metrics: Arc<CollabStreamMetrics>,
|
||||
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
|
||||
) -> Result<
|
||||
(
|
||||
redis::aio::ConnectionManager,
|
||||
Arc<StreamRouter>,
|
||||
Arc<AwarenessGossip>,
|
||||
),
|
||||
Error,
|
||||
> {
|
||||
info!("Connecting to redis with uri: {}", redis_uri);
|
||||
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
|
||||
|
||||
let awareness_gossip = Arc::new(AwarenessGossip::new(client.clone()));
|
||||
|
||||
let router = StreamRouter::with_options(
|
||||
&client,
|
||||
metrics,
|
||||
|
@ -216,7 +228,7 @@ async fn get_redis_client(
|
|||
.get_connection_manager()
|
||||
.await
|
||||
.context("failed to get the connection manager")?;
|
||||
Ok((manager, router.into()))
|
||||
Ok((manager, router.into(), awareness_gossip))
|
||||
}
|
||||
|
||||
async fn get_connection_pool(setting: &DatabaseSetting) -> Result<PgPool, Error> {
|
||||
|
|
|
@ -15,11 +15,12 @@ use collab_rt_entity::{
|
|||
use collab_rt_entity::{ClientCollabMessage, CollabMessage};
|
||||
use collab_rt_protocol::{Message, MessageReader, RTProtocolError, SyncMessage};
|
||||
use collab_stream::client::CollabRedisStream;
|
||||
use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
|
||||
use collab_stream::collab_update_sink::CollabUpdateSink;
|
||||
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
use bytes::Bytes;
|
||||
use collab_document::document::DocumentBody;
|
||||
use collab_stream::awareness_gossip::AwarenessUpdateSink;
|
||||
use collab_stream::error::StreamError;
|
||||
use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags};
|
||||
use dashmap::DashMap;
|
||||
|
@ -35,6 +36,7 @@ use tokio::time::MissedTickBehavior;
|
|||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, trace, warn};
|
||||
use uuid::Uuid;
|
||||
use yrs::sync::AwarenessUpdate;
|
||||
use yrs::updates::decoder::{Decode, DecoderV1};
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::{ReadTxn, StateVector, Update};
|
||||
|
@ -73,7 +75,7 @@ impl Drop for CollabGroup {
|
|||
|
||||
impl CollabGroup {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new<S>(
|
||||
pub async fn new<S>(
|
||||
uid: i64,
|
||||
workspace_id: String,
|
||||
object_id: String,
|
||||
|
@ -100,7 +102,8 @@ impl CollabGroup {
|
|||
indexer_scheduler,
|
||||
metrics.clone(),
|
||||
prune_grace_period,
|
||||
);
|
||||
)
|
||||
.await?;
|
||||
|
||||
let state = Arc::new(CollabGroupState {
|
||||
workspace_id,
|
||||
|
@ -182,10 +185,6 @@ impl CollabGroup {
|
|||
loop {
|
||||
tokio::select! {
|
||||
_ = state.shutdown.cancelled() => {
|
||||
match state.persister.trim_awareness().await {
|
||||
Ok(_) => (),
|
||||
Err(err) => warn!("unable to trim awareness due to {}", err),
|
||||
};
|
||||
break;
|
||||
}
|
||||
res = updates.next() => {
|
||||
|
@ -256,11 +255,10 @@ impl CollabGroup {
|
|||
|
||||
/// Task used to receive awareness updates from Redis.
|
||||
async fn inbound_awareness_task(state: Arc<CollabGroupState>) -> Result<(), RealtimeError> {
|
||||
let updates = state.persister.collab_redis_stream.awareness_updates(
|
||||
&state.workspace_id,
|
||||
&state.object_id,
|
||||
None,
|
||||
);
|
||||
let updates = state
|
||||
.persister
|
||||
.collab_redis_stream
|
||||
.awareness_updates(&state.workspace_id, &state.object_id);
|
||||
pin_mut!(updates);
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
@ -288,14 +286,14 @@ impl CollabGroup {
|
|||
|
||||
async fn handle_inbound_awareness(state: &CollabGroupState, update: AwarenessStreamUpdate) {
|
||||
tracing::trace!(
|
||||
"broadcasting awareness update from {} ({} bytes)",
|
||||
"broadcasting awareness update from {} (contains {} clients)",
|
||||
update.sender,
|
||||
update.data.len()
|
||||
update.data.clients.len()
|
||||
);
|
||||
let sender = update.sender;
|
||||
let message = AwarenessSync::new(
|
||||
state.object_id.clone(),
|
||||
Message::Awareness(update.data).encode_v1(),
|
||||
Message::Awareness(update.data.encode_v1()).encode_v1(),
|
||||
CollabOrigin::Empty,
|
||||
);
|
||||
for mut e in state.subscribers.iter_mut() {
|
||||
|
@ -788,9 +786,10 @@ impl CollabGroup {
|
|||
origin: &CollabOrigin,
|
||||
update: Vec<u8>,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
let awareness_update = AwarenessUpdate::decode_v1(&update)?;
|
||||
state
|
||||
.persister
|
||||
.send_awareness(origin, update)
|
||||
.send_awareness(origin, awareness_update)
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
||||
Ok(None)
|
||||
|
@ -888,7 +887,7 @@ struct CollabPersister {
|
|||
|
||||
impl CollabPersister {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
uid: i64,
|
||||
workspace_id: String,
|
||||
object_id: String,
|
||||
|
@ -898,10 +897,12 @@ impl CollabPersister {
|
|||
indexer_scheduler: Arc<IndexerScheduler>,
|
||||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
prune_grace_period: Duration,
|
||||
) -> Self {
|
||||
) -> Result<Self, StreamError> {
|
||||
let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id);
|
||||
let awareness_sink = collab_redis_stream.awareness_update_sink(&workspace_id, &object_id);
|
||||
Self {
|
||||
let awareness_sink = collab_redis_stream
|
||||
.awareness_update_sink(&workspace_id, &object_id)
|
||||
.await?;
|
||||
Ok(Self {
|
||||
uid,
|
||||
workspace_id,
|
||||
object_id,
|
||||
|
@ -913,7 +914,7 @@ impl CollabPersister {
|
|||
update_sink,
|
||||
awareness_sink,
|
||||
prune_grace_period,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn send_update(
|
||||
|
@ -937,23 +938,15 @@ impl CollabPersister {
|
|||
async fn send_awareness(
|
||||
&self,
|
||||
sender_session: &CollabOrigin,
|
||||
awareness_update: Vec<u8>,
|
||||
) -> Result<MessageId, StreamError> {
|
||||
// send awareness updates to redis queue:
|
||||
// QUESTION: is it needed? Maybe we could reuse update_sink?
|
||||
let len = awareness_update.len();
|
||||
awareness_update: AwarenessUpdate,
|
||||
) -> Result<(), StreamError> {
|
||||
let update = AwarenessStreamUpdate {
|
||||
data: awareness_update,
|
||||
sender: sender_session.clone(),
|
||||
};
|
||||
let msg_id = self.awareness_sink.send(&update).await?;
|
||||
tracing::trace!(
|
||||
"persisted awareness from {} ({} bytes) - msg id: {}",
|
||||
update.sender,
|
||||
len,
|
||||
msg_id
|
||||
);
|
||||
Ok(msg_id)
|
||||
self.awareness_sink.send(&update).await?;
|
||||
tracing::trace!("broadcasted awareness from {}", update.sender);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Loads collab without its history. Used for handling y-sync protocol messages.
|
||||
|
@ -1149,15 +1142,6 @@ impl CollabPersister {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn trim_awareness(&self) -> Result<(), RealtimeError> {
|
||||
let stream_key = AwarenessStreamUpdate::stream_key(&self.workspace_id, &self.object_id);
|
||||
self
|
||||
.collab_redis_stream
|
||||
.prune_awareness_stream(&stream_key)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_collab(&self, doc_state_v1: Vec<u8>) -> Result<(), RealtimeError> {
|
||||
let encoded_collab = EncodedCollab::new_v1(Default::default(), doc_state_v1)
|
||||
.encode_to_bytes()
|
||||
|
@ -1188,11 +1172,9 @@ impl CollabPersister {
|
|||
.indexer_scheduler
|
||||
.index_pending_collab_one(indexed_collab, false)
|
||||
{
|
||||
tracing::warn!(
|
||||
warn!(
|
||||
"failed to index collab `{}/{}`: {}",
|
||||
self.workspace_id,
|
||||
self.object_id,
|
||||
err
|
||||
self.workspace_id, self.object_id, err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -157,7 +157,8 @@ where
|
|||
self.prune_grace_period,
|
||||
state_vector,
|
||||
self.indexer_scheduler.clone(),
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
self.state.insert_group(object_id, group);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,23 +1,6 @@
|
|||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use access_control::collab::RealtimeAccessControl;
|
||||
use anyhow::{anyhow, Result};
|
||||
use app_error::AppError;
|
||||
use collab_rt_entity::user::{RealtimeUser, UserDevice};
|
||||
use collab_rt_entity::MessageByObjectId;
|
||||
use collab_stream::client::CollabRedisStream;
|
||||
use collab_stream::stream_router::StreamRouter;
|
||||
use dashmap::mapref::entry::Entry;
|
||||
use dashmap::DashMap;
|
||||
use redis::aio::ConnectionManager;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::task::yield_now;
|
||||
use tokio::time::interval;
|
||||
use tracing::{error, info, trace, warn};
|
||||
use yrs::updates::decoder::Decode;
|
||||
use yrs::StateVector;
|
||||
|
||||
use crate::client::client_msg_router::ClientMessageRouter;
|
||||
use crate::command::{spawn_collaboration_command, CLCommandReceiver};
|
||||
use crate::config::get_env_var;
|
||||
|
@ -26,8 +9,25 @@ use crate::error::{CreateGroupFailedReason, RealtimeError};
|
|||
use crate::group::cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender};
|
||||
use crate::group::manager::GroupManager;
|
||||
use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME;
|
||||
use access_control::collab::RealtimeAccessControl;
|
||||
use anyhow::{anyhow, Result};
|
||||
use app_error::AppError;
|
||||
use collab_rt_entity::user::{RealtimeUser, UserDevice};
|
||||
use collab_rt_entity::MessageByObjectId;
|
||||
use collab_stream::awareness_gossip::AwarenessGossip;
|
||||
use collab_stream::client::CollabRedisStream;
|
||||
use collab_stream::stream_router::StreamRouter;
|
||||
use dashmap::mapref::entry::Entry;
|
||||
use dashmap::DashMap;
|
||||
use database::collab::CollabStorage;
|
||||
use indexer::scheduler::IndexerScheduler;
|
||||
use redis::aio::ConnectionManager;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::task::yield_now;
|
||||
use tokio::time::interval;
|
||||
use tracing::{error, info, trace, warn};
|
||||
use yrs::updates::decoder::Decode;
|
||||
use yrs::StateVector;
|
||||
|
||||
use crate::actix_ws::entities::{ClientGenerateEmbeddingMessage, ClientHttpUpdateMessage};
|
||||
use crate::{CollabRealtimeMetrics, RealtimeClientWebsocketSink};
|
||||
|
@ -54,6 +54,7 @@ where
|
|||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
command_recv: CLCommandReceiver,
|
||||
redis_stream_router: Arc<StreamRouter>,
|
||||
awareness_gossip: Arc<AwarenessGossip>,
|
||||
redis_connection_manager: ConnectionManager,
|
||||
group_persistence_interval: Duration,
|
||||
prune_grace_period: Duration,
|
||||
|
@ -70,8 +71,11 @@ where
|
|||
}
|
||||
|
||||
let connect_state = ConnectState::new();
|
||||
let collab_stream =
|
||||
CollabRedisStream::new_with_connection_manager(redis_connection_manager, redis_stream_router);
|
||||
let collab_stream = CollabRedisStream::new_with_connection_manager(
|
||||
redis_connection_manager,
|
||||
redis_stream_router,
|
||||
awareness_gossip,
|
||||
);
|
||||
let group_manager = Arc::new(
|
||||
GroupManager::new(
|
||||
storage.clone(),
|
||||
|
|
|
@ -13,6 +13,7 @@ use crate::pg_listener::PgListeners;
|
|||
use crate::CollabRealtimeMetrics;
|
||||
use access_control::metrics::AccessControlMetrics;
|
||||
use app_error::AppError;
|
||||
use collab_stream::awareness_gossip::AwarenessGossip;
|
||||
use collab_stream::metrics::CollabStreamMetrics;
|
||||
use collab_stream::stream_router::StreamRouter;
|
||||
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
|
||||
|
@ -27,6 +28,7 @@ pub struct AppState {
|
|||
pub pg_listeners: Arc<PgListeners>,
|
||||
pub user_cache: UserCache,
|
||||
pub redis_stream_router: Arc<StreamRouter>,
|
||||
pub awareness_gossip: Arc<AwarenessGossip>,
|
||||
pub redis_connection_manager: RedisConnectionManager,
|
||||
pub access_control: AccessControl,
|
||||
pub collab_access_control_storage: Arc<CollabAccessControlStorage>,
|
||||
|
|
|
@ -39,6 +39,7 @@ use appflowy_collaborate::collab::storage::CollabStorageImpl;
|
|||
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
|
||||
use appflowy_collaborate::snapshot::SnapshotControl;
|
||||
use appflowy_collaborate::CollaborationServer;
|
||||
use collab_stream::awareness_gossip::AwarenessGossip;
|
||||
use collab_stream::metrics::CollabStreamMetrics;
|
||||
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
|
||||
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
|
||||
|
@ -126,6 +127,7 @@ pub async fn run_actix_server(
|
|||
state.metrics.realtime_metrics.clone(),
|
||||
rt_cmd_recv,
|
||||
state.redis_stream_router.clone(),
|
||||
state.awareness_gossip.clone(),
|
||||
state.redis_connection_manager.clone(),
|
||||
Duration::from_secs(config.collab.group_persistence_interval_secs),
|
||||
Duration::from_secs(config.collab.group_prune_grace_period_secs),
|
||||
|
@ -227,7 +229,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
|||
|
||||
// Redis
|
||||
info!("Connecting to Redis...");
|
||||
let (redis_conn_manager, redis_stream_router) = get_redis_client(
|
||||
let (redis_conn_manager, redis_stream_router, awareness_gossip) = get_redis_client(
|
||||
config.redis_uri.expose_secret(),
|
||||
config.redis_worker_count,
|
||||
metrics.collab_stream_metrics.clone(),
|
||||
|
@ -325,6 +327,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
|||
id_gen: Arc::new(RwLock::new(Snowflake::new(1))),
|
||||
gotrue_client,
|
||||
redis_stream_router,
|
||||
awareness_gossip,
|
||||
redis_connection_manager: redis_conn_manager,
|
||||
collab_cache,
|
||||
collab_access_control_storage,
|
||||
|
@ -360,10 +363,18 @@ async fn get_redis_client(
|
|||
redis_uri: &str,
|
||||
worker_count: usize,
|
||||
metrics: Arc<CollabStreamMetrics>,
|
||||
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
|
||||
) -> Result<
|
||||
(
|
||||
redis::aio::ConnectionManager,
|
||||
Arc<StreamRouter>,
|
||||
Arc<AwarenessGossip>,
|
||||
),
|
||||
Error,
|
||||
> {
|
||||
info!("Connecting to redis with uri: {}", redis_uri);
|
||||
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
|
||||
|
||||
let awareness_gossip = AwarenessGossip::new(client.clone());
|
||||
let router = StreamRouter::with_options(
|
||||
&client,
|
||||
metrics,
|
||||
|
@ -379,7 +390,7 @@ async fn get_redis_client(
|
|||
.get_connection_manager()
|
||||
.await
|
||||
.context("failed to get the connection manager")?;
|
||||
Ok((manager, router.into()))
|
||||
Ok((manager, router.into(), awareness_gossip.into()))
|
||||
}
|
||||
|
||||
pub async fn get_aws_s3_client(s3_setting: &S3Setting) -> Result<aws_sdk_s3::Client, Error> {
|
||||
|
|
|
@ -16,6 +16,7 @@ use appflowy_collaborate::collab::cache::CollabCache;
|
|||
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
|
||||
use appflowy_collaborate::metrics::CollabMetrics;
|
||||
use appflowy_collaborate::CollabRealtimeMetrics;
|
||||
use collab_stream::awareness_gossip::AwarenessGossip;
|
||||
use collab_stream::metrics::CollabStreamMetrics;
|
||||
use collab_stream::stream_router::StreamRouter;
|
||||
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
|
||||
|
@ -41,6 +42,7 @@ pub struct AppState {
|
|||
pub id_gen: Arc<RwLock<Snowflake>>,
|
||||
pub gotrue_client: gotrue::api::Client,
|
||||
pub redis_stream_router: Arc<StreamRouter>,
|
||||
pub awareness_gossip: Arc<AwarenessGossip>,
|
||||
pub redis_connection_manager: RedisConnectionManager,
|
||||
pub collab_cache: CollabCache,
|
||||
pub collab_access_control_storage: Arc<CollabAccessControlStorage>,
|
||||
|
|
Loading…
Add table
Reference in a new issue