reset document data using transaction

This commit is contained in:
appflowy 2021-12-28 16:56:59 +08:00
parent 049b8828fe
commit e4e40ebe20
4 changed files with 168 additions and 177 deletions

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
context::FlowyPersistence, context::FlowyPersistence,
services::kv::{KVStore, KeyValue}, services::kv::{KVStore, KVTransaction, KeyValue},
util::serde_ext::parse_from_bytes, util::serde_ext::parse_from_bytes,
}; };
use anyhow::Context; use anyhow::Context;
@ -45,11 +45,20 @@ pub(crate) async fn read_document(
#[tracing::instrument(level = "debug", skip(kv_store, params), fields(delta), err)] #[tracing::instrument(level = "debug", skip(kv_store, params), fields(delta), err)]
pub async fn reset_document( pub async fn reset_document(
kv_store: &Arc<DocumentKVPersistence>, kv_store: &Arc<DocumentKVPersistence>,
params: ResetDocumentParams, mut params: ResetDocumentParams,
) -> Result<(), ServerError> { ) -> Result<(), ServerError> {
// TODO: Reset document requires atomic operation let revisions = params.take_revisions().take_items();
// let _ = kv_store.batch_delete_revisions(&doc_id.to_string(), None).await?; let doc_id = params.take_doc_id();
todo!() kv_store
.transaction(|mut transaction| {
Box::pin(async move {
let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
let items = revisions_to_key_value_items(revisions.into());
let _ = transaction.batch_set(items).await?;
Ok(())
})
})
.await
} }
#[tracing::instrument(level = "debug", skip(kv_store), err)] #[tracing::instrument(level = "debug", skip(kv_store), err)]
@ -59,11 +68,11 @@ pub(crate) async fn delete_document(kv_store: &Arc<DocumentKVPersistence>, doc_i
} }
pub struct DocumentKVPersistence { pub struct DocumentKVPersistence {
inner: Arc<dyn KVStore>, inner: Arc<KVStore>,
} }
impl std::ops::Deref for DocumentKVPersistence { impl std::ops::Deref for DocumentKVPersistence {
type Target = Arc<dyn KVStore>; type Target = Arc<KVStore>;
fn deref(&self) -> &Self::Target { &self.inner } fn deref(&self) -> &Self::Target { &self.inner }
} }
@ -73,34 +82,21 @@ impl std::ops::DerefMut for DocumentKVPersistence {
} }
impl DocumentKVPersistence { impl DocumentKVPersistence {
pub(crate) fn new(kv_store: Arc<dyn KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } } pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } }
pub(crate) async fn batch_set_revision(&self, revisions: Vec<Revision>) -> Result<(), ServerError> { pub(crate) async fn batch_set_revision(&self, revisions: Vec<Revision>) -> Result<(), ServerError> {
let kv_store = self.inner.clone(); let items = revisions_to_key_value_items(revisions);
let items = revisions self.inner
.into_iter() .transaction(|mut t| Box::pin(async move { t.batch_set(items).await }))
.map(|revision| { .await
let key = make_revision_key(&revision.doc_id, revision.rev_id);
let value = Bytes::from(revision.write_to_bytes().unwrap());
KeyValue { key, value }
})
.collect::<Vec<KeyValue>>();
let _ = kv_store.batch_set(items).await?;
// use futures::stream::{self, StreamExt};
// let f = |revision: Revision, kv_store: Arc<dyn KVStore>| async move {
// let key = make_revision_key(&revision.doc_id, revision.rev_id);
// let bytes = revision.write_to_bytes().unwrap();
// let _ = kv_store.set(&key, Bytes::from(bytes)).await.unwrap();
// };
//
// stream::iter(revisions)
// .for_each_concurrent(None, |revision| f(revision, kv_store.clone()))
// .await;
Ok(())
} }
pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result<RepeatedRevision, ServerError> { pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result<RepeatedRevision, ServerError> {
let items = self.inner.batch_get_start_with(doc_id).await?; let doc_id = doc_id.to_owned();
let items = self
.inner
.transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await }))
.await?;
Ok(key_value_items_to_revisions(items)) Ok(key_value_items_to_revisions(items))
} }
@ -111,13 +107,21 @@ impl DocumentKVPersistence {
) -> Result<RepeatedRevision, ServerError> { ) -> Result<RepeatedRevision, ServerError> {
let rev_ids = rev_ids.into(); let rev_ids = rev_ids.into();
let items = match rev_ids { let items = match rev_ids {
None => self.inner.batch_get_start_with(doc_id).await?, None => {
let doc_id = doc_id.to_owned();
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await }))
.await?
},
Some(rev_ids) => { Some(rev_ids) => {
let keys = rev_ids let keys = rev_ids
.into_iter() .into_iter()
.map(|rev_id| make_revision_key(doc_id, rev_id)) .map(|rev_id| make_revision_key(doc_id, rev_id))
.collect::<Vec<String>>(); .collect::<Vec<String>>();
self.inner.batch_get(keys).await?
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_get(keys).await }))
.await?
}, },
}; };
@ -131,21 +135,37 @@ impl DocumentKVPersistence {
) -> Result<(), ServerError> { ) -> Result<(), ServerError> {
match rev_ids.into() { match rev_ids.into() {
None => { None => {
let _ = self.inner.batch_delete_key_start_with(doc_id).await?; let doc_id = doc_id.to_owned();
Ok(()) self.inner
.transaction(|mut t| Box::pin(async move { t.batch_delete_key_start_with(&doc_id).await }))
.await
}, },
Some(rev_ids) => { Some(rev_ids) => {
let keys = rev_ids let keys = rev_ids
.into_iter() .into_iter()
.map(|rev_id| make_revision_key(doc_id, rev_id)) .map(|rev_id| make_revision_key(doc_id, rev_id))
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let _ = self.inner.batch_delete(keys).await?;
Ok(()) self.inner
.transaction(|mut t| Box::pin(async move { t.batch_delete(keys).await }))
.await
}, },
} }
} }
} }
#[inline]
fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Vec<KeyValue> {
revisions
.into_iter()
.map(|revision| {
let key = make_revision_key(&revision.doc_id, revision.rev_id);
let value = Bytes::from(revision.write_to_bytes().unwrap());
KeyValue { key, value }
})
.collect::<Vec<KeyValue>>()
}
#[inline] #[inline]
fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevision { fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevision {
let mut revisions = items let mut revisions = items

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
services::kv::{KVAction, KVStore, KeyValue}, services::kv::{KVStore, KVTransaction, KeyValue},
util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
}; };
use anyhow::Context; use anyhow::Context;
@ -17,27 +17,26 @@ use sqlx::{
Postgres, Postgres,
Row, Row,
}; };
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin, sync::Arc};
const KV_TABLE: &str = "kv_table"; const KV_TABLE: &str = "kv_table";
pub(crate) struct PostgresKV { pub struct PostgresKV {
pub(crate) pg_pool: PgPool, pub(crate) pg_pool: PgPool,
} }
impl PostgresKV { impl PostgresKV {
async fn transaction<F, O>(&self, f: F) -> Result<O, ServerError> pub async fn transaction<F, O>(&self, f: F) -> Result<O, ServerError>
where where
F: for<'a> FnOnce(&'a mut DBTransaction<'_>) -> BoxFuture<'a, Result<O, ServerError>>, F: for<'a> FnOnce(Box<dyn KVTransaction + 'a>) -> BoxResultFuture<O, ServerError>,
{ {
let mut transaction = self let mut transaction = self
.pg_pool .pg_pool
.begin() .begin()
.await .await
.context("[KV]:Failed to acquire a Postgres connection")?; .context("[KV]:Failed to acquire a Postgres connection")?;
let postgres_transaction = PostgresTransaction(&mut transaction);
let result = f(&mut transaction).await; let result = f(Box::new(postgres_transaction)).await;
transaction transaction
.commit() .commit()
.await .await
@ -47,43 +46,32 @@ impl PostgresKV {
} }
} }
impl KVStore for PostgresKV {} pub(crate) struct PostgresTransaction<'a, 'b>(&'a mut DBTransaction<'b>);
pub(crate) struct PostgresTransaction<'a> {
pub(crate) transaction: DBTransaction<'a>,
}
impl<'a> PostgresTransaction<'a> {}
#[async_trait] #[async_trait]
impl KVAction for PostgresKV { impl<'a, 'b> KVTransaction for PostgresTransaction<'a, 'b> {
async fn get(&self, key: &str) -> Result<Option<Bytes>, ServerError> { async fn get(&mut self, key: &str) -> Result<Option<Bytes>, ServerError> {
let id = key.to_string(); let id = key.to_string();
self.transaction(|transaction| { let (sql, args) = SqlBuilder::select(KV_TABLE)
Box::pin(async move { .add_field("*")
let (sql, args) = SqlBuilder::select(KV_TABLE) .and_where_eq("id", &id)
.add_field("*") .build()?;
.and_where_eq("id", &id)
.build()?;
let result = sqlx::query_as_with::<Postgres, KVTable, PgArguments>(&sql, args) let result = sqlx::query_as_with::<Postgres, KVTable, PgArguments>(&sql, args)
.fetch_one(transaction) .fetch_one(self.0 as &mut DBTransaction<'b>)
.await; .await;
let result = match result { let result = match result {
Ok(val) => Ok(Some(Bytes::from(val.blob))), Ok(val) => Ok(Some(Bytes::from(val.blob))),
Err(error) => match error { Err(error) => match error {
Error::RowNotFound => Ok(None), Error::RowNotFound => Ok(None),
_ => Err(map_sqlx_error(error)), _ => Err(map_sqlx_error(error)),
}, },
}; };
result result
})
})
.await
} }
async fn set(&self, key: &str, bytes: Bytes) -> Result<(), ServerError> { async fn set(&mut self, key: &str, bytes: Bytes) -> Result<(), ServerError> {
self.batch_set(vec![KeyValue { self.batch_set(vec![KeyValue {
key: key.to_string(), key: key.to_string(),
value: bytes, value: bytes,
@ -91,115 +79,96 @@ impl KVAction for PostgresKV {
.await .await
} }
async fn remove(&self, key: &str) -> Result<(), ServerError> { async fn remove(&mut self, key: &str) -> Result<(), ServerError> {
let id = key.to_string(); let id = key.to_string();
self.transaction(|transaction| { let (sql, args) = SqlBuilder::delete(KV_TABLE).and_where_eq("id", &id).build()?;
Box::pin(async move { let _ = sqlx::query_with(&sql, args)
let (sql, args) = SqlBuilder::delete(KV_TABLE).and_where_eq("id", &id).build()?; .execute(self.0 as &mut DBTransaction<'_>)
let _ = sqlx::query_with(&sql, args) .await
.execute(transaction) .map_err(map_sqlx_error)?;
.await Ok(())
.map_err(map_sqlx_error)?;
Ok(())
})
})
.await
} }
async fn batch_set(&self, kvs: Vec<KeyValue>) -> Result<(), ServerError> { async fn batch_set(&mut self, kvs: Vec<KeyValue>) -> Result<(), ServerError> {
self.transaction(|transaction| { let mut builder = RawSqlBuilder::insert_into(KV_TABLE);
Box::pin(async move { let m_builder = builder.field("id").field("blob");
let mut builder = RawSqlBuilder::insert_into(KV_TABLE);
let m_builder = builder.field("id").field("blob");
let mut args = PgArguments::default(); let mut args = PgArguments::default();
kvs.iter().enumerate().for_each(|(index, _)| { kvs.iter().enumerate().for_each(|(index, _)| {
let index = index * 2 + 1; let index = index * 2 + 1;
m_builder.values(&[format!("${}", index), format!("${}", index + 1)]); m_builder.values(&[format!("${}", index), format!("${}", index + 1)]);
}); });
for kv in kvs { for kv in kvs {
args.add(kv.key); args.add(kv.key);
args.add(kv.value.to_vec()); args.add(kv.value.to_vec());
} }
let sql = m_builder.sql()?; let sql = m_builder.sql()?;
let _ = sqlx::query_with(&sql, args) let _ = sqlx::query_with(&sql, args)
.execute(transaction) .execute(self.0 as &mut DBTransaction<'_>)
.await .await
.map_err(map_sqlx_error)?; .map_err(map_sqlx_error)?;
Ok::<(), ServerError>(()) Ok::<(), ServerError>(())
})
})
.await
} }
async fn batch_get(&self, keys: Vec<String>) -> Result<Vec<KeyValue>, ServerError> { async fn batch_get(&mut self, keys: Vec<String>) -> Result<Vec<KeyValue>, ServerError> {
self.transaction(|transaction| { let sql = RawSqlBuilder::select_from(KV_TABLE)
Box::pin(async move { .field("id")
let sql = RawSqlBuilder::select_from(KV_TABLE) .field("blob")
.field("id") .and_where_in_quoted("id", &keys)
.field("blob") .sql()?;
.and_where_in_quoted("id", &keys)
.sql()?;
let rows = sqlx::query(&sql).fetch_all(transaction).await.map_err(map_sqlx_error)?; let rows = sqlx::query(&sql)
let kvs = rows_to_key_values(rows); .fetch_all(self.0 as &mut DBTransaction<'_>)
Ok::<Vec<KeyValue>, ServerError>(kvs) .await
}) .map_err(map_sqlx_error)?;
}) let kvs = rows_to_key_values(rows);
.await Ok::<Vec<KeyValue>, ServerError>(kvs)
} }
async fn batch_delete(&self, keys: Vec<String>) -> Result<(), ServerError> { async fn batch_delete(&mut self, keys: Vec<String>) -> Result<(), ServerError> {
self.transaction(|transaction| { let sql = RawSqlBuilder::delete_from(KV_TABLE).and_where_in("id", &keys).sql()?;
Box::pin(async move { let _ = sqlx::query(&sql)
let sql = RawSqlBuilder::delete_from(KV_TABLE).and_where_in("id", &keys).sql()?; .execute(self.0 as &mut DBTransaction<'_>)
let _ = sqlx::query(&sql).execute(transaction).await.map_err(map_sqlx_error)?; .await
.map_err(map_sqlx_error)?;
Ok::<(), ServerError>(()) Ok::<(), ServerError>(())
})
})
.await
} }
async fn batch_get_start_with(&self, key: &str) -> Result<Vec<KeyValue>, ServerError> { async fn batch_get_start_with(&mut self, key: &str) -> Result<Vec<KeyValue>, ServerError> {
let prefix = key.to_owned(); let prefix = key.to_owned();
self.transaction(|transaction| { let sql = RawSqlBuilder::select_from(KV_TABLE)
Box::pin(async move { .field("id")
let sql = RawSqlBuilder::select_from(KV_TABLE) .field("blob")
.field("id") .and_where_like_left("id", &prefix)
.field("blob") .sql()?;
.and_where_like_left("id", &prefix)
.sql()?;
let rows = sqlx::query(&sql).fetch_all(transaction).await.map_err(map_sqlx_error)?; let rows = sqlx::query(&sql)
.fetch_all(self.0 as &mut DBTransaction<'_>)
.await
.map_err(map_sqlx_error)?;
let kvs = rows_to_key_values(rows); let kvs = rows_to_key_values(rows);
Ok::<Vec<KeyValue>, ServerError>(kvs) Ok::<Vec<KeyValue>, ServerError>(kvs)
})
})
.await
} }
async fn batch_delete_key_start_with(&self, keyword: &str) -> Result<(), ServerError> { async fn batch_delete_key_start_with(&mut self, keyword: &str) -> Result<(), ServerError> {
let keyword = keyword.to_owned(); let keyword = keyword.to_owned();
self.transaction(|transaction| { let sql = RawSqlBuilder::delete_from(KV_TABLE)
Box::pin(async move { .and_where_like_left("id", &keyword)
let sql = RawSqlBuilder::delete_from(KV_TABLE) .sql()?;
.and_where_like_left("id", &keyword)
.sql()?;
let _ = sqlx::query(&sql).execute(transaction).await.map_err(map_sqlx_error)?; let _ = sqlx::query(&sql)
Ok::<(), ServerError>(()) .execute(self.0 as &mut DBTransaction<'_>)
}) .await
}) .map_err(map_sqlx_error)?;
.await Ok::<(), ServerError>(())
} }
} }
fn rows_to_key_values(rows: Vec<PgRow>) -> Vec<KeyValue> { fn rows_to_key_values(rows: Vec<PgRow>) -> Vec<KeyValue> {
rows.into_iter() rows.into_iter()
.map(|row| { .map(|row| {

View file

@ -5,16 +5,16 @@ use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures_core::future::BoxFuture; use futures_core::future::BoxFuture;
pub(crate) use kv::*; pub(crate) use kv::*;
use std::sync::Arc;
use backend_service::errors::ServerError; use backend_service::errors::ServerError;
use lib_infra::future::{BoxResultFuture, FutureResultSend}; use lib_infra::future::{BoxResultFuture, FutureResultSend};
#[derive(Clone, Debug, PartialEq, Eq)] // TODO: Generic the KVStore that enable switching KVStore to another
pub struct KeyValue { // implementation
pub key: String, pub type KVStore = PostgresKV;
pub value: Bytes,
}
#[rustfmt::skip]
// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html // https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html
// Note that using these trait methods will result in a heap allocation // Note that using these trait methods will result in a heap allocation
// per-function-call. This is not a significant cost for the vast majority of // per-function-call. This is not a significant cost for the vast majority of
@ -22,20 +22,21 @@ pub struct KeyValue {
// functionality in the public API of a low-level function that is expected to // functionality in the public API of a low-level function that is expected to
// be called millions of times a second. // be called millions of times a second.
#[async_trait] #[async_trait]
pub trait KVStore: KVAction + Send + Sync {} pub trait KVTransaction: Send + Sync {
async fn get(&mut self, key: &str) -> Result<Option<Bytes>, ServerError>;
async fn set(&mut self, key: &str, value: Bytes) -> Result<(), ServerError>;
async fn remove(&mut self, key: &str) -> Result<(), ServerError>;
// pub trait KVTransaction async fn batch_set(&mut self, kvs: Vec<KeyValue>) -> Result<(), ServerError>;
async fn batch_get(&mut self, keys: Vec<String>) -> Result<Vec<KeyValue>, ServerError>;
async fn batch_delete(&mut self, keys: Vec<String>) -> Result<(), ServerError>;
#[async_trait] async fn batch_get_start_with(&mut self, key: &str) -> Result<Vec<KeyValue>, ServerError>;
pub trait KVAction: Send + Sync { async fn batch_delete_key_start_with(&mut self, keyword: &str) -> Result<(), ServerError>;
async fn get(&self, key: &str) -> Result<Option<Bytes>, ServerError>; }
async fn set(&self, key: &str, value: Bytes) -> Result<(), ServerError>;
async fn remove(&self, key: &str) -> Result<(), ServerError>; #[derive(Clone, Debug, PartialEq, Eq)]
pub struct KeyValue {
async fn batch_set(&self, kvs: Vec<KeyValue>) -> Result<(), ServerError>; pub key: String,
async fn batch_get(&self, keys: Vec<String>) -> Result<Vec<KeyValue>, ServerError>; pub value: Bytes,
async fn batch_delete(&self, keys: Vec<String>) -> Result<(), ServerError>;
async fn batch_get_start_with(&self, key: &str) -> Result<Vec<KeyValue>, ServerError>;
async fn batch_delete_key_start_with(&self, keyword: &str) -> Result<(), ServerError>;
} }

View file

@ -41,6 +41,7 @@ async fn kv_batch_set_test() {
value: "b".to_string().into(), value: "b".to_string().into(),
}, },
]; ];
kv.batch_set(kvs.clone()).await.unwrap(); kv.batch_set(kvs.clone()).await.unwrap();
let kvs_from_db = kv let kvs_from_db = kv
.batch_get(kvs.clone().into_iter().map(|value| value.key).collect::<Vec<String>>()) .batch_get(kvs.clone().into_iter().map(|value| value.key).collect::<Vec<String>>())