chore: create method for receiving the most recent snapshot

This commit is contained in:
Bartosz Sypytkowski 2024-10-11 13:06:26 +02:00
parent 9c674d5777
commit 3e612e2db6
6 changed files with 172 additions and 18 deletions

View file

@ -0,0 +1,65 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM af_collab_snapshot\n WHERE workspace_id = $1 AND oid = $2 AND deleted_at IS NULL\n ORDER BY created_at DESC\n LIMIT 1;\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "sid",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "oid",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "blob",
"type_info": "Bytea"
},
{
"ordinal": 3,
"name": "len",
"type_info": "Int4"
},
{
"ordinal": 4,
"name": "encrypt",
"type_info": "Int4"
},
{
"ordinal": 5,
"name": "deleted_at",
"type_info": "Timestamptz"
},
{
"ordinal": 6,
"name": "workspace_id",
"type_info": "Uuid"
},
{
"ordinal": 7,
"name": "created_at",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": [
false,
false,
false,
false,
true,
true,
false,
false
]
},
"hash": "88516b9a2a424bc7697337d6f16b0d6e94b919597d709f930467423c5b4c0ec2"
}

View file

@ -551,6 +551,28 @@ pub async fn select_snapshot(
Ok(row)
}
#[inline]
pub async fn select_latest_snapshot(
pg_pool: &PgPool,
workspace_id: &Uuid,
object_id: &str,
) -> Result<Option<AFSnapshotRow>, Error> {
let row = sqlx::query_as!(
AFSnapshotRow,
r#"
SELECT * FROM af_collab_snapshot
WHERE workspace_id = $1 AND oid = $2 AND deleted_at IS NULL
ORDER BY created_at DESC
LIMIT 1;
"#,
workspace_id,
object_id
)
.fetch_optional(pg_pool)
.await?;
Ok(row)
}
/// Returns list of snapshots for given object_id in descending order of creation time.
pub async fn get_all_collab_snapshot_meta(
pg_pool: &PgPool,

View file

@ -165,6 +165,12 @@ pub trait CollabStorage: Send + Sync + 'static {
snapshot_id: &i64,
) -> AppResult<SnapshotData>;
async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
) -> AppResult<Option<SnapshotData>>;
/// Returns list of snapshots for given object_id in descending order of creation time.
async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult<AFSnapshotMetas>;
}
@ -300,6 +306,17 @@ where
.await
}
async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
) -> AppResult<Option<SnapshotData>> {
self
.as_ref()
.get_latest_snapshot(workspace_id, object_id)
.await
}
async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult<AFSnapshotMetas> {
self.as_ref().get_collab_snapshot_list(oid).await
}

View file

@ -480,6 +480,17 @@ where
.await
}
async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
) -> AppResult<Option<SnapshotData>> {
self
.snapshot_control
.get_latest_snapshot(workspace_id, object_id)
.await
}
async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult<AFSnapshotMetas> {
self.snapshot_control.get_collab_snapshot_list(oid).await
}

View file

@ -29,7 +29,7 @@ use collab_stream::stream_group::StreamGroup;
use dashmap::DashMap;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::{
AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams,
AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams, SnapshotData,
};
use futures::{pin_mut, Sink, Stream};
use futures_util::{SinkExt, StreamExt};
@ -1077,25 +1077,36 @@ impl CollabPersister {
// if we want history-keeping variant, we need to get a snapshot
let snapshot = self
.storage
.get_collab_snapshot(&self.workspace_id, &self.object_id, &1)
.get_latest_snapshot(&self.workspace_id, &self.object_id)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
let encoded_collab = EncodedCollab::decode_from_bytes(&snapshot.encoded_collab_v1)
.map_err(|err| RealtimeError::Internal(err.into()))?;
encoded_collab.doc_state
match snapshot {
None => None,
Some(snapshot) => {
let encoded_collab = EncodedCollab::decode_from_bytes(&snapshot.encoded_collab_v1)
.map_err(|err| RealtimeError::Internal(err.into()))?;
Some(encoded_collab.doc_state)
},
}
} else {
// if we want a lightweight variant, we need to get a collab
let params = QueryCollabParams::new(
self.object_id.clone(),
self.collab_type.clone(),
self.workspace_id.clone(),
);
self
.storage
.get_encode_collab(GetCollabOrigin::Server, params, false)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?
.doc_state
None // if we want a lightweight variant, we'll fallback to default
};
let doc_state = match doc_state {
Some(doc_state) => doc_state,
None => {
// we didn't find a snapshot, or we want a lightweight collab version
let params = QueryCollabParams::new(
self.object_id.clone(),
self.collab_type.clone(),
self.workspace_id.clone(),
);
self
.storage
.get_encode_collab(GetCollabOrigin::Server, params, false)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?
.doc_state
},
};
let collab: Collab = Collab::new_with_source(

View file

@ -10,13 +10,14 @@ use futures_util::StreamExt;
use sqlx::PgPool;
use tokio::time::interval;
use tracing::{debug, error, trace, warn};
use uuid::Uuid;
use validator::Validate;
use app_error::AppError;
use collab_rt_protocol::spawn_blocking_validate_encode_collab;
use database::collab::{
create_snapshot_and_maintain_limit, get_all_collab_snapshot_meta, latest_snapshot_time,
select_snapshot, AppResult, COLLAB_SNAPSHOT_LIMIT, SNAPSHOT_PER_HOUR,
select_latest_snapshot, select_snapshot, AppResult, COLLAB_SNAPSHOT_LIMIT, SNAPSHOT_PER_HOUR,
};
use database_entity::dto::{AFSnapshotMeta, AFSnapshotMetas, InsertSnapshotParams, SnapshotData};
@ -181,6 +182,33 @@ impl SnapshotControl {
}
}
pub async fn get_latest_snapshot(
&self,
workspace_id: &str,
object_id: &str,
) -> Result<Option<SnapshotData>, AppError> {
let key = SnapshotKey::from_object_id(object_id);
match self.cache.try_get(&key.0).await.unwrap_or(None) {
None => {
let wid = Uuid::parse_str(workspace_id)?;
let snapshot = select_latest_snapshot(&self.pg_pool, &wid, object_id).await?;
match snapshot {
None => Ok(None),
Some(row) => Ok(Some(SnapshotData {
object_id: row.oid,
encoded_collab_v1: row.blob,
workspace_id: row.workspace_id.to_string(),
})),
}
},
Some(snapshot) => Ok(Some(SnapshotData {
encoded_collab_v1: snapshot,
workspace_id: workspace_id.to_string(),
object_id: object_id.to_string(),
})),
}
}
async fn latest_snapshot_time(&self, oid: &str) -> Result<Option<DateTime<Utc>>, AppError> {
let time = latest_snapshot_time(oid, &self.pg_pool).await?;
Ok(time)