add folder sync test

This commit is contained in:
appflowy 2022-01-22 18:48:43 +08:00
parent 1e66aae051
commit ccb7d0181f
66 changed files with 1190 additions and 786 deletions

View file

@ -16,7 +16,7 @@ use flowy_collaboration::{
use flowy_core::module::{FolderCouldServiceV1, FolderCouldServiceV2};
use flowy_error::{internal_error, FlowyError};
use futures_util::stream::StreamExt;
use lib_ws::{WSModule, WebSocketRawMessage};
use lib_ws::{WSChannel, WebSocketRawMessage};
use parking_lot::RwLock;
use std::{
convert::{TryFrom, TryInto},
@ -113,12 +113,12 @@ impl LocalWebSocketRunner {
async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> {
let bytes = Bytes::from(message.data);
let client_data = ClientRevisionWSData::try_from(bytes).map_err(internal_error)?;
match message.module {
WSModule::Doc => {
match message.channel {
WSChannel::Document => {
let _ = self.handle_document_client_data(client_data, "".to_owned()).await?;
Ok(())
},
WSModule::Folder => {
WSChannel::Folder => {
let _ = self.handle_folder_client_data(client_data, "".to_owned()).await?;
Ok(())
},
@ -140,6 +140,7 @@ impl LocalWebSocketRunner {
let user = Arc::new(LocalRevisionUser {
user_id,
client_ws_sender,
channel: WSChannel::Folder,
});
let ty = client_data.ty.clone();
let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
@ -175,6 +176,7 @@ impl LocalWebSocketRunner {
let user = Arc::new(LocalRevisionUser {
user_id,
client_ws_sender,
channel: WSChannel::Document,
});
let ty = client_data.ty.clone();
let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
@ -197,6 +199,7 @@ impl LocalWebSocketRunner {
struct LocalRevisionUser {
user_id: String,
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
channel: WSChannel,
}
impl RevisionUser for LocalRevisionUser {
@ -210,13 +213,14 @@ impl RevisionUser for LocalRevisionUser {
tracing::error!("LocalDocumentUser send message failed: {}", e);
},
};
let channel = self.channel.clone();
tokio::spawn(async move {
match resp {
RevisionSyncResponse::Pull(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
channel,
data: bytes.to_vec(),
};
send_fn(sender, msg);
@ -224,7 +228,7 @@ impl RevisionUser for LocalRevisionUser {
RevisionSyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
channel,
data: bytes.to_vec(),
};
send_fn(sender, msg);
@ -232,7 +236,7 @@ impl RevisionUser for LocalRevisionUser {
RevisionSyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
module: WSModule::Doc,
channel,
data: bytes.to_vec(),
};
send_fn(sender, msg);