mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
feat: notify user profile change (#163)
* feat: send user profile changes via ws * test: add tests
This commit is contained in:
parent
29b39f9ba3
commit
54ef875f5f
18 changed files with 267 additions and 98 deletions
|
@ -15,6 +15,7 @@ use tokio::sync::broadcast::{channel, Receiver, Sender};
|
|||
|
||||
use realtime_entity::collab_msg::CollabMessage;
|
||||
use realtime_entity::message::RealtimeMessage;
|
||||
use realtime_entity::user::UserMessage;
|
||||
use tokio::sync::{oneshot, Mutex};
|
||||
use tokio_retry::strategy::FixedInterval;
|
||||
use tokio_retry::{Condition, RetryIf};
|
||||
|
@ -59,7 +60,8 @@ pub struct WSClient {
|
|||
/// Sender used to send messages to the websocket.
|
||||
sender: Sender<Message>,
|
||||
http_sender: Arc<dyn WSClientHttpSender>,
|
||||
channels: Arc<RwLock<ChannelByObjectId>>,
|
||||
user_channel: Arc<Sender<UserMessage>>,
|
||||
collab_channels: Arc<RwLock<ChannelByObjectId>>,
|
||||
ping: Arc<Mutex<Option<ServerFixIntervalPing>>>,
|
||||
stop_tx: Mutex<Option<oneshot::Sender<()>>>,
|
||||
}
|
||||
|
@ -71,16 +73,18 @@ impl WSClient {
|
|||
{
|
||||
let (sender, _) = channel(config.buffer_capacity);
|
||||
let state_notify = Arc::new(parking_lot::Mutex::new(ConnectStateNotify::new()));
|
||||
let channels = Arc::new(RwLock::new(HashMap::new()));
|
||||
let collab_channels = Arc::new(RwLock::new(HashMap::new()));
|
||||
let ping = Arc::new(Mutex::new(None));
|
||||
let http_sender = Arc::new(http_sender);
|
||||
let (user_channel, _) = channel(1);
|
||||
WSClient {
|
||||
addr: Arc::new(parking_lot::Mutex::new(None)),
|
||||
config,
|
||||
state_notify,
|
||||
sender,
|
||||
http_sender,
|
||||
channels,
|
||||
user_channel: Arc::new(user_channel),
|
||||
collab_channels,
|
||||
ping,
|
||||
stop_tx: Mutex::new(None),
|
||||
}
|
||||
|
@ -136,7 +140,7 @@ impl WSClient {
|
|||
|
||||
self.set_state(ConnectState::Connected).await;
|
||||
let (mut sink, mut stream) = ws_stream.split();
|
||||
let weak_channels = Arc::downgrade(&self.channels);
|
||||
let weak_collab_channels = Arc::downgrade(&self.collab_channels);
|
||||
let sender = self.sender.clone();
|
||||
|
||||
let ping_sender = sender.clone();
|
||||
|
@ -151,50 +155,57 @@ impl WSClient {
|
|||
ping.run();
|
||||
*self.ping.lock().await = Some(ping);
|
||||
|
||||
let user_message_tx = self.user_channel.as_ref().clone();
|
||||
// Receive messages from the websocket, and send them to the channels.
|
||||
tokio::spawn(async move {
|
||||
while let Some(Ok(msg)) = stream.next().await {
|
||||
match msg {
|
||||
while let Some(Ok(ws_msg)) = stream.next().await {
|
||||
match ws_msg {
|
||||
Message::Binary(_) => {
|
||||
if let Ok(msg) = RealtimeMessage::try_from(&msg) {
|
||||
match msg {
|
||||
RealtimeMessage::Collab(collab_msg) => {
|
||||
if let Some(channels) = weak_channels.upgrade() {
|
||||
let object_id = collab_msg.object_id().to_owned();
|
||||
let is_channel_dropped = if let Some(channel) = channels.read().get(&object_id)
|
||||
{
|
||||
match channel.upgrade() {
|
||||
None => {
|
||||
// when calling [WSClient::subscribe], the caller is responsible for keeping
|
||||
// the channel alive as long as it wants to receive messages from the websocket.
|
||||
warn!("channel is dropped");
|
||||
true
|
||||
},
|
||||
Some(channel) => {
|
||||
trace!("receive remote message: {}", collab_msg);
|
||||
channel.forward_to_stream(collab_msg);
|
||||
match RealtimeMessage::try_from(&ws_msg) {
|
||||
Ok(msg) => {
|
||||
match msg {
|
||||
RealtimeMessage::Collab(collab_msg) => {
|
||||
if let Some(collab_channels) = weak_collab_channels.upgrade() {
|
||||
let object_id = collab_msg.object_id().to_owned();
|
||||
let is_channel_dropped =
|
||||
if let Some(channel) = collab_channels.read().get(&object_id) {
|
||||
match channel.upgrade() {
|
||||
None => {
|
||||
// when calling [WSClient::subscribe], the caller is responsible for keeping
|
||||
// the channel alive as long as it wants to receive messages from the websocket.
|
||||
warn!("channel is dropped");
|
||||
true
|
||||
},
|
||||
Some(channel) => {
|
||||
trace!("receive remote message: {}", collab_msg);
|
||||
channel.forward_to_stream(collab_msg);
|
||||
false
|
||||
},
|
||||
}
|
||||
} else {
|
||||
false
|
||||
},
|
||||
};
|
||||
|
||||
// Try to remove the channel if it is dropped. If failed, will try again next time.
|
||||
if is_channel_dropped {
|
||||
if let Some(mut w) = collab_channels.try_write() {
|
||||
trace!("remove channel: {}", object_id);
|
||||
let _ = w.remove(&object_id);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
// Try to remove the channel if it is dropped. If failed, will try again next time.
|
||||
if is_channel_dropped {
|
||||
if let Some(mut w) = channels.try_write() {
|
||||
trace!("remove channel: {}", object_id);
|
||||
let _ = w.remove(&object_id);
|
||||
}
|
||||
warn!("channels are closed");
|
||||
}
|
||||
} else {
|
||||
warn!("channels are closed");
|
||||
}
|
||||
},
|
||||
RealtimeMessage::ServerKickedOff => {},
|
||||
}
|
||||
} else {
|
||||
error!("parser RealtimeMessage failed");
|
||||
},
|
||||
RealtimeMessage::User(user_message) => {
|
||||
let _ = user_message_tx.send(user_message);
|
||||
},
|
||||
RealtimeMessage::ServerKickedOff => {},
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("parser RealtimeMessage failed: {:?}", err);
|
||||
},
|
||||
}
|
||||
},
|
||||
// ping from server
|
||||
|
@ -210,7 +221,7 @@ impl WSClient {
|
|||
Message::Pong(_) => {
|
||||
let _ = pong_tx.send(()).await;
|
||||
},
|
||||
_ => warn!("received unexpected message from websocket: {:?}", msg),
|
||||
_ => warn!("received unexpected message from websocket: {:?}", ws_msg),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -251,18 +262,22 @@ impl WSClient {
|
|||
|
||||
/// Return a [WebSocketChannel] that can be used to send messages to the websocket. Caller should
|
||||
/// keep the channel alive as long as it wants to receive messages from the websocket.
|
||||
pub fn subscribe(
|
||||
pub fn subscribe_collab(
|
||||
&self,
|
||||
object_id: String,
|
||||
) -> Result<Arc<WebSocketChannel<CollabMessage>>, WSError> {
|
||||
let channel = Arc::new(WebSocketChannel::new(&object_id, self.sender.clone()));
|
||||
self
|
||||
.channels
|
||||
.collab_channels
|
||||
.write()
|
||||
.insert(object_id, Arc::downgrade(&channel));
|
||||
Ok(channel)
|
||||
}
|
||||
|
||||
pub fn subscribe_user_changed(&self) -> Receiver<UserMessage> {
|
||||
self.user_channel.subscribe()
|
||||
}
|
||||
|
||||
pub fn subscribe_connect_state(&self) -> WSConnectStateReceiver {
|
||||
self.state_notify.lock().subscribe()
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
|
|||
use sqlx::FromRow;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Represent the row of the af_workspace table
|
||||
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
|
||||
pub struct AFWorkspaceRow {
|
||||
pub workspace_id: Uuid,
|
||||
|
@ -15,6 +16,22 @@ pub struct AFWorkspaceRow {
|
|||
pub workspace_name: Option<String>,
|
||||
}
|
||||
|
||||
/// Represent the row of the af_user table
|
||||
#[derive(Debug, FromRow, Deserialize, Serialize, Clone)]
|
||||
pub struct AFUserRow {
|
||||
pub uid: Option<i64>,
|
||||
pub uuid: Option<Uuid>,
|
||||
pub email: Option<String>,
|
||||
pub password: Option<String>,
|
||||
pub name: Option<String>,
|
||||
pub metadata: Option<serde_json::Value>,
|
||||
pub encryption_sign: Option<String>,
|
||||
pub deleted_at: Option<DateTime<Utc>>,
|
||||
pub updated_at: Option<DateTime<Utc>>,
|
||||
pub created_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
/// Represent the row of the af_user_profile_view
|
||||
#[derive(Debug, FromRow, Deserialize, Serialize)]
|
||||
pub struct AFUserProfileRow {
|
||||
pub uid: Option<i64>,
|
||||
|
@ -52,3 +69,8 @@ pub struct AFBlobMetadataRow {
|
|||
pub file_size: i64,
|
||||
pub modified_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
pub struct AFUserNotification {
|
||||
pub payload: Option<AFUserRow>,
|
||||
}
|
||||
|
|
|
@ -11,14 +11,16 @@ use std::fmt::Display;
|
|||
)]
|
||||
pub enum RealtimeMessage {
|
||||
Collab(CollabMessage),
|
||||
User(UserMessage),
|
||||
ServerKickedOff,
|
||||
}
|
||||
|
||||
impl Display for RealtimeMessage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
RealtimeMessage::Collab(_msg) => f.write_fmt(format_args!("CollabMessage")),
|
||||
RealtimeMessage::Collab(_msg) => f.write_fmt(format_args!("Collab")),
|
||||
RealtimeMessage::ServerKickedOff => f.write_fmt(format_args!("ServerKickedOff")),
|
||||
RealtimeMessage::User(_) => f.write_fmt(format_args!("User")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -68,6 +70,7 @@ impl TryFrom<Vec<u8>> for RealtimeMessage {
|
|||
}
|
||||
}
|
||||
|
||||
use crate::user::UserMessage;
|
||||
#[cfg(feature = "tungstenite")]
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
|
|
|
@ -1,22 +1,13 @@
|
|||
use std::fmt::{Display, Formatter};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct RealtimeUserImpl {
|
||||
pub uid: i64,
|
||||
pub device_id: String,
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UserMessage {
|
||||
ProfileChange(AFUserChange),
|
||||
}
|
||||
|
||||
impl RealtimeUserImpl {
|
||||
pub fn new(uid: i64, device_id: String) -> Self {
|
||||
Self { uid, device_id }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for RealtimeUserImpl {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_fmt(format_args!(
|
||||
"uid:{}|device_id:{}",
|
||||
self.uid, self.device_id,
|
||||
))
|
||||
}
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AFUserChange {
|
||||
pub name: Option<String>,
|
||||
pub email: Option<String>,
|
||||
pub metadata: Option<String>,
|
||||
}
|
||||
|
|
|
@ -9,10 +9,14 @@ use actix_web_actors::ws;
|
|||
use actix_web_actors::ws::ProtocolError;
|
||||
use bytes::Bytes;
|
||||
use database::collab::CollabStorage;
|
||||
pub use realtime_entity::user::RealtimeUserImpl;
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::error;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
|
||||
use database_entity::pg_row::AFUserNotification;
|
||||
use realtime_entity::user::{AFUserChange, UserMessage};
|
||||
use tracing::{error, trace};
|
||||
|
||||
pub struct ClientSession<
|
||||
U: Unpin + RealtimeUser,
|
||||
|
@ -24,6 +28,7 @@ pub struct ClientSession<
|
|||
pub server: Addr<CollabServer<S, U, AC>>,
|
||||
heartbeat_interval: Duration,
|
||||
client_timeout: Duration,
|
||||
user_change_recv: Option<Receiver<AFUserNotification>>,
|
||||
}
|
||||
|
||||
impl<U, S, AC> ClientSession<U, S, AC>
|
||||
|
@ -34,6 +39,7 @@ where
|
|||
{
|
||||
pub fn new(
|
||||
user: U,
|
||||
user_change_recv: Receiver<AFUserNotification>,
|
||||
server: Addr<CollabServer<S, U, AC>>,
|
||||
heartbeat_interval: Duration,
|
||||
client_timeout: Duration,
|
||||
|
@ -44,6 +50,7 @@ where
|
|||
server,
|
||||
heartbeat_interval,
|
||||
client_timeout,
|
||||
user_change_recv: Some(user_change_recv),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,8 +93,29 @@ where
|
|||
type Context = ws::WebsocketContext<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
// start heartbeats otherwise server disconnects in 10 seconds
|
||||
self.hb(ctx);
|
||||
let recipient = ctx.address().recipient();
|
||||
if let Some(mut recv) = self.user_change_recv.take() {
|
||||
actix::spawn(async move {
|
||||
while let Ok(notification) = recv.recv().await {
|
||||
if let Some(user) = notification.payload {
|
||||
trace!("Receive user change: {:?}", user);
|
||||
|
||||
// The RealtimeMessage uses bincode to do serde. But bincode doesn't support the Serde
|
||||
// deserialize_any method. So it needs to serialize the metadata to json string.
|
||||
let metadata = serde_json::to_string(&user.metadata).ok();
|
||||
let msg = UserMessage::ProfileChange(AFUserChange {
|
||||
name: user.name,
|
||||
email: user.email,
|
||||
metadata,
|
||||
});
|
||||
if let Err(err) = recipient.send(RealtimeMessage::User(msg)).await {
|
||||
error!("Send user change message error: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(user) = self.user.clone() {
|
||||
self
|
||||
|
@ -133,7 +161,8 @@ where
|
|||
|
||||
fn handle(&mut self, msg: RealtimeMessage, ctx: &mut Self::Context) {
|
||||
match &msg {
|
||||
RealtimeMessage::Collab(_collab_msg) => ctx.binary(msg),
|
||||
RealtimeMessage::Collab(_) => ctx.binary(msg),
|
||||
RealtimeMessage::User(_) => ctx.binary(msg),
|
||||
RealtimeMessage::ServerKickedOff => {
|
||||
// The server will send this message to the client when the client is kicked out. So
|
||||
// set the current user to None and stop the session.
|
||||
|
@ -144,7 +173,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// WebSocket message handler
|
||||
/// Handle the messages sent from the client
|
||||
impl<U, S, AC> StreamHandler<Result<ws::Message, ws::ProtocolError>> for ClientSession<U, S, AC>
|
||||
where
|
||||
U: Unpin + RealtimeUser + Clone,
|
||||
|
@ -191,9 +220,3 @@ impl Deref for ClientWSSink {
|
|||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl RealtimeUser for RealtimeUserImpl {
|
||||
fn uid(&self) -> i64 {
|
||||
self.uid
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use crate::error::RealtimeError;
|
||||
use actix::{Message, Recipient};
|
||||
use collab::core::origin::CollabOrigin;
|
||||
|
||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::hash::Hash;
|
||||
|
|
25
migrations/20231113074418_user_change.sql
Normal file
25
migrations/20231113074418_user_change.sql
Normal file
|
@ -0,0 +1,25 @@
|
|||
-- Add migration script here
|
||||
-- Drop the existing trigger if it exists
|
||||
DROP TRIGGER IF EXISTS af_user_change_trigger ON af_user;
|
||||
|
||||
-- Create or replace the function
|
||||
CREATE OR REPLACE FUNCTION notify_af_user_change() RETURNS TRIGGER AS $$
|
||||
DECLARE
|
||||
payload TEXT;
|
||||
BEGIN
|
||||
payload := json_build_object(
|
||||
'payload', row_to_json(NEW),
|
||||
'action_type', TG_OP
|
||||
)::text;
|
||||
|
||||
PERFORM pg_notify('af_user_channel', payload);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create the trigger
|
||||
CREATE TRIGGER af_user_change_trigger
|
||||
AFTER UPDATE ON af_user
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION notify_af_user_change();
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
use crate::api::ws::CollabServerImpl;
|
||||
use crate::biz;
|
||||
use crate::biz::user::RealtimeUserImpl;
|
||||
use crate::biz::workspace;
|
||||
use crate::component::auth::jwt::UserUuid;
|
||||
use crate::state::AppState;
|
||||
|
@ -13,7 +14,6 @@ use database::collab::CollabStorage;
|
|||
use database::user::{select_uid_from_email, select_uid_from_uuid};
|
||||
use database_entity::dto::*;
|
||||
use prost::Message as ProstMessage;
|
||||
use realtime::client::RealtimeUserImpl;
|
||||
use realtime::collaborate::CollabAccessControl;
|
||||
use realtime::entities::{ClientMessage, RealtimeMessage};
|
||||
use realtime_entity::realtime_proto::HttpRealtimeMessage;
|
||||
|
@ -25,6 +25,7 @@ use std::sync::Arc;
|
|||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::{event, instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub const WORKSPACE_ID_PATH: &str = "workspace_id";
|
||||
pub const COLLAB_OBJECT_ID_PATH: &str = "object_id";
|
||||
|
||||
|
|
|
@ -5,11 +5,12 @@ use actix_web::{get, web, HttpRequest, HttpResponse, Result, Scope};
|
|||
use actix_web_actors::ws;
|
||||
use std::sync::Arc;
|
||||
|
||||
use realtime::client::{ClientSession, RealtimeUserImpl};
|
||||
use realtime::client::ClientSession;
|
||||
use realtime::collaborate::CollabServer;
|
||||
|
||||
use crate::biz::collab::access_control::CollabAccessControlImpl;
|
||||
use crate::biz::collab::storage::CollabPostgresDBStorage;
|
||||
use crate::biz::user::RealtimeUserImpl;
|
||||
use crate::component::auth::jwt::{authorization_from_token, UserUuid};
|
||||
use database::user::select_uid_from_uuid;
|
||||
use shared_entity::response::AppResponseError;
|
||||
|
@ -41,9 +42,11 @@ pub async fn establish_ws_connection(
|
|||
|
||||
match result {
|
||||
Ok(uid) => {
|
||||
let user_change_recv = state.pg_listeners.subscribe_user_change();
|
||||
let realtime_user = Arc::new(RealtimeUserImpl::new(uid, device_id));
|
||||
let client = ClientSession::new(
|
||||
realtime_user,
|
||||
user_change_recv,
|
||||
server.get_ref().clone(),
|
||||
Duration::from_secs(state.config.websocket.heartbeat_interval as u64),
|
||||
Duration::from_secs(state.config.websocket.client_timeout as u64),
|
||||
|
|
|
@ -32,15 +32,13 @@ use crate::api::ws::ws_scope;
|
|||
use crate::biz::collab::access_control::{CollabAccessControlImpl, CollabHttpAccessControl};
|
||||
use crate::biz::collab::storage::init_collab_storage;
|
||||
use crate::biz::pg_listener::PgListeners;
|
||||
use crate::biz::user::RealtimeUserImpl;
|
||||
use crate::biz::workspace::access_control::{
|
||||
WorkspaceAccessControlImpl, WorkspaceHttpAccessControl,
|
||||
};
|
||||
|
||||
use crate::middleware::access_control_mw::WorkspaceAccessControl;
|
||||
use crate::middleware::metrics_mw::MetricsMiddleware;
|
||||
|
||||
use database::file::bucket_s3_impl::S3BucketStorage;
|
||||
use realtime::client::RealtimeUserImpl;
|
||||
use realtime::collaborate::CollabServer;
|
||||
|
||||
pub struct Application {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::biz::collab::member_listener::{CollabMemberAction, CollabMemberChange};
|
||||
use crate::biz::collab::member_listener::{CollabMemberAction, CollabMemberNotification};
|
||||
use crate::biz::workspace::access_control::WorkspaceAccessControl;
|
||||
use crate::middleware::access_control_mw::{AccessResource, HttpAccessControlService};
|
||||
use actix_router::{Path, Url};
|
||||
|
@ -53,7 +53,7 @@ enum MemberStatus {
|
|||
}
|
||||
|
||||
impl CollabAccessControlImpl {
|
||||
pub fn new(pg_pool: PgPool, listener: broadcast::Receiver<CollabMemberChange>) -> Self {
|
||||
pub fn new(pg_pool: PgPool, listener: broadcast::Receiver<CollabMemberNotification>) -> Self {
|
||||
let member_status_by_uid = Arc::new(RwLock::new(HashMap::new()));
|
||||
|
||||
// Listen to the changes of the collab member and update the memory cache
|
||||
|
@ -111,7 +111,7 @@ impl CollabAccessControlImpl {
|
|||
}
|
||||
|
||||
fn spawn_listen_on_collab_member_change(
|
||||
mut listener: broadcast::Receiver<CollabMemberChange>,
|
||||
mut listener: broadcast::Receiver<CollabMemberNotification>,
|
||||
pg_pool: PgPool,
|
||||
member_status_by_uid: Arc<RwLock<MemberStatusByUid>>,
|
||||
) {
|
||||
|
|
|
@ -11,7 +11,7 @@ pub enum CollabMemberAction {
|
|||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct CollabMemberChange {
|
||||
pub struct CollabMemberNotification {
|
||||
/// The old will be None if the row does not exist before
|
||||
pub old: Option<AFCollabMemberRow>,
|
||||
/// The new will be None if the row is deleted
|
||||
|
@ -20,7 +20,7 @@ pub struct CollabMemberChange {
|
|||
pub action_type: CollabMemberAction,
|
||||
}
|
||||
|
||||
impl CollabMemberChange {
|
||||
impl CollabMemberNotification {
|
||||
pub fn old_uid(&self) -> Option<&i64> {
|
||||
self.old.as_ref().map(|o| &o.uid)
|
||||
}
|
||||
|
@ -36,4 +36,4 @@ impl CollabMemberChange {
|
|||
}
|
||||
}
|
||||
|
||||
pub type CollabMemberListener = PostgresDBListener<CollabMemberChange>;
|
||||
pub type CollabMemberListener = PostgresDBListener<CollabMemberNotification>;
|
||||
|
|
|
@ -1,19 +1,26 @@
|
|||
use crate::biz::collab::member_listener::{CollabMemberChange, CollabMemberListener};
|
||||
use crate::biz::workspace::member_listener::{WorkspaceMemberChange, WorkspaceMemberListener};
|
||||
use crate::biz::collab::member_listener::{CollabMemberListener, CollabMemberNotification};
|
||||
use crate::biz::user::UserListener;
|
||||
use crate::biz::workspace::member_listener::{
|
||||
WorkspaceMemberListener, WorkspaceMemberNotification,
|
||||
};
|
||||
use anyhow::Error;
|
||||
use database_entity::pg_row::AFUserNotification;
|
||||
use serde::de::DeserializeOwned;
|
||||
use sqlx::postgres::PgListener;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::error;
|
||||
use tracing::{error, trace};
|
||||
|
||||
pub struct PgListeners {
|
||||
user_listener: UserListener,
|
||||
workspace_member_listener: WorkspaceMemberListener,
|
||||
collab_member_listener: CollabMemberListener,
|
||||
}
|
||||
|
||||
impl PgListeners {
|
||||
pub async fn new(pg_pool: &PgPool) -> Result<Self, Error> {
|
||||
let user_listener = UserListener::new(pg_pool, "af_user_channel").await?;
|
||||
|
||||
let workspace_member_listener =
|
||||
WorkspaceMemberListener::new(pg_pool, "af_workspace_member_channel").await?;
|
||||
|
||||
|
@ -21,18 +28,25 @@ impl PgListeners {
|
|||
CollabMemberListener::new(pg_pool, "af_collab_member_channel").await?;
|
||||
|
||||
Ok(Self {
|
||||
user_listener,
|
||||
workspace_member_listener,
|
||||
collab_member_listener,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn subscribe_workspace_member_change(&self) -> broadcast::Receiver<WorkspaceMemberChange> {
|
||||
pub fn subscribe_workspace_member_change(
|
||||
&self,
|
||||
) -> broadcast::Receiver<WorkspaceMemberNotification> {
|
||||
self.workspace_member_listener.notify.subscribe()
|
||||
}
|
||||
|
||||
pub fn subscribe_collab_member_change(&self) -> broadcast::Receiver<CollabMemberChange> {
|
||||
pub fn subscribe_collab_member_change(&self) -> broadcast::Receiver<CollabMemberNotification> {
|
||||
self.collab_member_listener.notify.subscribe()
|
||||
}
|
||||
|
||||
pub fn subscribe_user_change(&self) -> broadcast::Receiver<AFUserNotification> {
|
||||
self.user_listener.notify.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresDBListener<T: Clone> {
|
||||
|
@ -45,12 +59,14 @@ where
|
|||
{
|
||||
pub async fn new(pg_pool: &PgPool, channel: &str) -> Result<Self, Error> {
|
||||
let mut listener = PgListener::connect_with(pg_pool).await?;
|
||||
// TODO(nathan): using listen_all
|
||||
listener.listen(channel).await?;
|
||||
|
||||
let (tx, _) = broadcast::channel(1000);
|
||||
let notify = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(notification) = listener.recv().await {
|
||||
trace!("Received notification: {}", notification.payload());
|
||||
match serde_json::from_str::<T>(notification.payload()) {
|
||||
Ok(change) => {
|
||||
let _ = tx.send(change);
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
use anyhow::{Context, Result};
|
||||
use gotrue::api::Client;
|
||||
|
||||
use serde_json::json;
|
||||
use shared_entity::response::AppResponseError;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
@ -11,6 +13,8 @@ use database_entity::dto::{AFUserProfile, AFUserWorkspaceInfo, AFWorkspace};
|
|||
|
||||
use app_error::AppError;
|
||||
use database::user::{create_user, is_user_exist};
|
||||
use database_entity::pg_row::AFUserNotification;
|
||||
use realtime::entities::RealtimeUser;
|
||||
use shared_entity::dto::auth_dto::UpdateUserParams;
|
||||
use snowflake::Snowflake;
|
||||
use sqlx::{types::uuid, PgPool};
|
||||
|
@ -122,3 +126,31 @@ fn name_from_user_metadata(value: &serde_json::Value) -> String {
|
|||
.map(str::to_string)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub type UserListener = crate::biz::pg_listener::PostgresDBListener<AFUserNotification>;
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct RealtimeUserImpl {
|
||||
pub uid: i64,
|
||||
pub device_id: String,
|
||||
}
|
||||
|
||||
impl RealtimeUserImpl {
|
||||
pub fn new(uid: i64, device_id: String) -> Self {
|
||||
Self { uid, device_id }
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for RealtimeUserImpl {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_fmt(format_args!(
|
||||
"uid:{}|device_id:{}",
|
||||
self.uid, self.device_id,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl RealtimeUser for RealtimeUserImpl {
|
||||
fn uid(&self) -> i64 {
|
||||
self.uid
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::biz::workspace::member_listener::{WorkspaceMemberAction, WorkspaceMemberChange};
|
||||
use crate::biz::workspace::member_listener::{WorkspaceMemberAction, WorkspaceMemberNotification};
|
||||
use crate::component::auth::jwt::UserUuid;
|
||||
use crate::middleware::access_control_mw::{AccessResource, HttpAccessControlService};
|
||||
use actix_http::Method;
|
||||
|
@ -52,7 +52,7 @@ pub struct WorkspaceAccessControlImpl {
|
|||
}
|
||||
|
||||
impl WorkspaceAccessControlImpl {
|
||||
pub fn new(pg_pool: PgPool, listener: broadcast::Receiver<WorkspaceMemberChange>) -> Self {
|
||||
pub fn new(pg_pool: PgPool, listener: broadcast::Receiver<WorkspaceMemberNotification>) -> Self {
|
||||
let member_status_by_uid = Arc::new(RwLock::new(HashMap::new()));
|
||||
spawn_listen_on_workspace_member_change(
|
||||
listener,
|
||||
|
@ -140,7 +140,7 @@ async fn reload_workspace_member_status_from_db(
|
|||
}
|
||||
|
||||
fn spawn_listen_on_workspace_member_change(
|
||||
mut listener: broadcast::Receiver<WorkspaceMemberChange>,
|
||||
mut listener: broadcast::Receiver<WorkspaceMemberNotification>,
|
||||
pg_pool: PgPool,
|
||||
member_status_by_uid: Arc<RwLock<MemberStatusByUid>>,
|
||||
) {
|
||||
|
|
|
@ -11,7 +11,7 @@ pub enum WorkspaceMemberAction {
|
|||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct WorkspaceMemberChange {
|
||||
pub struct WorkspaceMemberNotification {
|
||||
pub old: Option<WorkspaceMemberRow>,
|
||||
pub new: Option<WorkspaceMemberRow>,
|
||||
pub action_type: WorkspaceMemberAction,
|
||||
|
@ -24,4 +24,4 @@ pub struct WorkspaceMemberRow {
|
|||
pub workspace_id: Uuid,
|
||||
}
|
||||
|
||||
pub type WorkspaceMemberListener = PostgresDBListener<WorkspaceMemberChange>;
|
||||
pub type WorkspaceMemberListener = PostgresDBListener<WorkspaceMemberNotification>;
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
use crate::localhost_client;
|
||||
use crate::user::utils::generate_unique_registered_user_client;
|
||||
use app_error::ErrorCode;
|
||||
use client_api::ws::{WSClient, WSClientConfig};
|
||||
use serde_json::json;
|
||||
use shared_entity::dto::auth_dto::{UpdateUserParams, UserMetaData};
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn update_but_not_logged_in() {
|
||||
|
@ -139,3 +141,37 @@ async fn user_empty_metadata_override() {
|
|||
let profile = c.get_profile().await.unwrap();
|
||||
assert_eq!(profile.metadata.unwrap(), json!(metadata_1));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn user_change_notify_test() {
|
||||
let (c, _user) = generate_unique_registered_user_client().await;
|
||||
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
|
||||
let mut user_change_recv = ws_client.subscribe_user_changed();
|
||||
|
||||
let device_id = "fake_device_id";
|
||||
let _ = ws_client
|
||||
.connect(c.ws_url(device_id).unwrap(), device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// After update user, the user_change_recv should receive a user change message via the websocket
|
||||
let fut = Box::pin(async move {
|
||||
c.update_user(UpdateUserParams::new().with_name("lucas"))
|
||||
.await
|
||||
.unwrap();
|
||||
let profile = c.get_profile().await.unwrap();
|
||||
assert_eq!(profile.name.unwrap().as_str(), "lucas");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
result = tokio::time::timeout(Duration::from_secs(5), async {
|
||||
println!("user_change: {:?}", user_change_recv.recv().await.unwrap());
|
||||
}) => {
|
||||
result.unwrap();
|
||||
},
|
||||
_ = fut => {
|
||||
panic!("update user timeout");
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -294,7 +294,7 @@ impl TestClient {
|
|||
let object_id = Uuid::new_v4().to_string();
|
||||
|
||||
// Subscribe to object
|
||||
let handler = self.ws_client.subscribe(object_id.clone()).unwrap();
|
||||
let handler = self.ws_client.subscribe_collab(object_id.clone()).unwrap();
|
||||
|
||||
let (sink, stream) = (handler.sink(), handler.stream());
|
||||
let origin = CollabOrigin::Client(CollabClient::new(self.uid().await, self.device_id.clone()));
|
||||
|
@ -351,7 +351,10 @@ impl TestClient {
|
|||
collab_type: CollabType,
|
||||
) {
|
||||
// Subscribe to object
|
||||
let handler = self.ws_client.subscribe(object_id.to_string()).unwrap();
|
||||
let handler = self
|
||||
.ws_client
|
||||
.subscribe_collab(object_id.to_string())
|
||||
.unwrap();
|
||||
let (sink, stream) = (handler.sink(), handler.stream());
|
||||
let origin = CollabOrigin::Client(CollabClient::new(self.uid().await, self.device_id.clone()));
|
||||
let collab = Arc::new(MutexCollab::new(origin.clone(), object_id, vec![]));
|
||||
|
@ -483,7 +486,7 @@ pub(crate) async fn assert_client_collab(
|
|||
|
||||
#[allow(dead_code)]
|
||||
pub async fn get_collab_json_from_server(
|
||||
client: &mut client_api::Client,
|
||||
client: &client_api::Client,
|
||||
workspace_id: &str,
|
||||
object_id: &str,
|
||||
collab_type: CollabType,
|
||||
|
|
Loading…
Add table
Reference in a new issue