mirror of
https://github.com/AppFlowy-IO/AppFlowy-Cloud.git
synced 2025-04-19 03:24:42 -04:00
feat: add workspace settings with disable indexing option (#606)
* feat: add workspace settings with disable indexing option * chore: added api workspace settings endpoint * chore: add workspace settings read and write to client api * chore: add tests for workspace settings client api * fix: enforce authorization rules on workspace settings api * chore: fix appflowy indexer test configuration * chore: add dotenvy to appflowy indexer tests * chore: override appflowy indexer env vars for tests * chore: add appflowy indexer preindex env var
This commit is contained in:
parent
928d1dfe77
commit
b52369de7f
22 changed files with 463 additions and 56 deletions
12
.github/workflows/integration_test.yml
vendored
12
.github/workflows/integration_test.yml
vendored
|
@ -42,15 +42,17 @@ jobs:
|
|||
|
||||
- name: Build Docker Images
|
||||
run: |
|
||||
docker compose build appflowy_cloud appflowy_history
|
||||
docker compose build appflowy_cloud appflowy_history appflowy_indexer
|
||||
|
||||
- name: Push docker images to docker hub
|
||||
run: |
|
||||
docker tag appflowyinc/appflowy_cloud appflowyinc/appflowy_cloud:${GITHUB_SHA}
|
||||
docker tag appflowyinc/appflowy_history appflowyinc/appflowy_history:${GITHUB_SHA}
|
||||
docker tag appflowyinc/appflowy_indexer appflowyinc/appflowy_indexer:${GITHUB_SHA}
|
||||
echo ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }} | docker login --username appflowyinc --password-stdin
|
||||
docker push appflowyinc/appflowy_cloud:${GITHUB_SHA}
|
||||
docker push appflowyinc/appflowy_history:${GITHUB_SHA}
|
||||
docker push appflowyinc/appflowy_indexer:${GITHUB_SHA}
|
||||
APPFLOWY_HISTORY_VERSION=${GITHUB_SHA}
|
||||
APPFLOWY_CLOUD_VERSION=0.1.1
|
||||
|
||||
|
@ -63,9 +65,11 @@ jobs:
|
|||
matrix:
|
||||
include:
|
||||
- test_service: "appflowy_cloud"
|
||||
test_cmd: "--workspace --exclude appflowy-history --exclude appflowy-ai-client"
|
||||
test_cmd: "--workspace --exclude appflowy-history --exclude appflowy-ai-client --exclude appflowy-indexer"
|
||||
- test_service: "appflowy_history"
|
||||
test_cmd: "-p appflowy-history"
|
||||
- test_service: "appflowy_indexer"
|
||||
test_cmd: "-p appflowy-indexer"
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
|
@ -90,8 +94,10 @@ jobs:
|
|||
sed -i 's/APPFLOWY_MAILER_SMTP_USERNAME=.*/APPFLOWY_MAILER_SMTP_USERNAME=${{ secrets.CI_GOTRUE_SMTP_USER }}/' .env
|
||||
sed -i 's/APPFLOWY_MAILER_SMTP_PASSWORD=.*/APPFLOWY_MAILER_SMTP_PASSWORD=${{ secrets.CI_GOTRUE_SMTP_PASS }}/' .env
|
||||
sed -i 's/APPFLOWY_AI_OPENAI_API_KEY=.*/APPFLOWY_AI_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}/' .env
|
||||
sed -i 's/APPFLOWY_INDEXER_OPENAI_API_KEY=.*/APPFLOWY_INDEXER_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}/' .env
|
||||
sed -i 's/APPFLOWY_OPENAI_API_KEY=.*/APPFLOWY_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}/' .env
|
||||
sed -i 's/APPFLOWY_INDEXER_OPENAI_API_KEY=.*/APPFLOWY_INDEXER_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}/' .env
|
||||
sed -i 's/APPFLOWY_INDEXER_REDIS_URL=.*/APPFLOWY_INDEXER_REDIS_URL=redis:\/\/localhost:6379/' .env
|
||||
sed -i 's/APPFLOWY_INDEXER_DATABASE_URL=.*/APPFLOWY_INDEXER_DATABASE_URL=postgres:\/\/postgres:password@localhost:5432\/postgres/' .env
|
||||
|
||||
- name: Update Nginx Configuration
|
||||
# the wasm-pack headless tests will run on random ports, so we need to allow all origins
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT EXISTS(SELECT 1 FROM af_collab_embeddings WHERE oid = $1)",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "exists",
|
||||
"type_info": "Bool"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "16a29676fc0d55db87ab4c158f6c6511bb202d8bc16d69d36d84418203685a52"
|
||||
}
|
15
.sqlx/query-4c985215efcd09915403a7f76449fda9e0e9323806a6f42a5d8f73243f349b85.json
generated
Normal file
15
.sqlx/query-4c985215efcd09915403a7f76449fda9e0e9323806a6f42a5d8f73243f349b85.json
generated
Normal file
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "UPDATE af_workspace SET settings = $1 WHERE workspace_id = $2",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Jsonb",
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "4c985215efcd09915403a7f76449fda9e0e9323806a6f42a5d8f73243f349b85"
|
||||
}
|
22
.sqlx/query-7a4c7da16e99ff3875bdd7e0d189e26c3c1ab49672bace41992aecc446061850.json
generated
Normal file
22
.sqlx/query-7a4c7da16e99ff3875bdd7e0d189e26c3c1ab49672bace41992aecc446061850.json
generated
Normal file
|
@ -0,0 +1,22 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SElECT settings FROM af_workspace WHERE workspace_id = $1",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "settings",
|
||||
"type_info": "Jsonb"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
true
|
||||
]
|
||||
},
|
||||
"hash": "7a4c7da16e99ff3875bdd7e0d189e26c3c1ab49672bace41992aecc446061850"
|
||||
}
|
28
.sqlx/query-e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a.json
generated
Normal file
28
.sqlx/query-e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a.json
generated
Normal file
|
@ -0,0 +1,28 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\nSELECT\n w.settings['disable_indexing']::boolean as disable_indexing,\n CASE\n WHEN w.settings['disable_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = c.partition_key AND m.oid = c.oid)\n END as has_index\nFROM af_collab c\nJOIN af_workspace w ON c.workspace_id = w.workspace_id\nWHERE c.oid = $1",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "disable_indexing",
|
||||
"type_info": "Bool"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "has_index",
|
||||
"type_info": "Bool"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null,
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "e6105ace4f5b9b71a7edc30f80c6f06877f5a8c49830a4126ee61f7c2e9db03a"
|
||||
}
|
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -785,6 +785,7 @@ dependencies = [
|
|||
"env_logger",
|
||||
"futures",
|
||||
"humantime",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"openai_dive",
|
||||
"pgvector",
|
||||
|
|
51
libs/client-api/src/http_settings.rs
Normal file
51
libs/client-api/src/http_settings.rs
Normal file
|
@ -0,0 +1,51 @@
|
|||
use reqwest::Method;
|
||||
use tracing::instrument;
|
||||
|
||||
use database_entity::dto::AFWorkspaceSettings;
|
||||
use shared_entity::response::{AppResponse, AppResponseError};
|
||||
|
||||
use crate::http::log_request_id;
|
||||
use crate::Client;
|
||||
|
||||
impl Client {
|
||||
#[instrument(level = "info", skip_all, err)]
|
||||
pub async fn get_workspace_settings<T: AsRef<str>>(
|
||||
&self,
|
||||
workspace_id: T,
|
||||
) -> Result<AFWorkspaceSettings, AppResponseError> {
|
||||
let url = format!(
|
||||
"{}/api/workspace/{}/settings",
|
||||
self.base_url,
|
||||
workspace_id.as_ref()
|
||||
);
|
||||
let resp = self
|
||||
.http_client_with_auth(Method::GET, &url)
|
||||
.await?
|
||||
.send()
|
||||
.await?;
|
||||
log_request_id(&resp);
|
||||
let resp = AppResponse::<AFWorkspaceSettings>::from_response(resp).await?;
|
||||
resp.into_data()
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all, err)]
|
||||
pub async fn update_workspace_settings<T: AsRef<str>>(
|
||||
&self,
|
||||
workspace_id: T,
|
||||
settings: &AFWorkspaceSettings,
|
||||
) -> Result<(), AppResponseError> {
|
||||
let url = format!(
|
||||
"{}/api/workspace/{}/settings",
|
||||
self.base_url,
|
||||
workspace_id.as_ref()
|
||||
);
|
||||
let resp = self
|
||||
.http_client_with_auth(Method::POST, &url)
|
||||
.await?
|
||||
.json(&settings)
|
||||
.send()
|
||||
.await?;
|
||||
log_request_id(&resp);
|
||||
AppResponse::<()>::from_response(resp).await?.into_error()
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ pub use wasm::*;
|
|||
#[cfg(not(target_arch = "wasm32"))]
|
||||
mod http_chat;
|
||||
mod http_search;
|
||||
mod http_settings;
|
||||
pub mod ws;
|
||||
|
||||
pub mod error {
|
||||
|
|
|
@ -498,6 +498,12 @@ pub struct AFWorkspace {
|
|||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AFWorkspaces(pub Vec<AFWorkspace>);
|
||||
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
pub struct AFWorkspaceSettings {
|
||||
#[serde(default)]
|
||||
pub disable_indexing: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AFUserWorkspaceInfo {
|
||||
pub user_profile: AFUserProfile,
|
||||
|
|
|
@ -7,17 +7,31 @@ use uuid::Uuid;
|
|||
|
||||
use database_entity::dto::AFCollabEmbeddingParams;
|
||||
|
||||
pub async fn has_collab_embeddings(
|
||||
pub async fn get_index_status(
|
||||
tx: &mut Transaction<'_, sqlx::Postgres>,
|
||||
oid: &str,
|
||||
) -> Result<bool, sqlx::Error> {
|
||||
) -> Result<Option<bool>, sqlx::Error> {
|
||||
let result = sqlx::query!(
|
||||
"SELECT EXISTS(SELECT 1 FROM af_collab_embeddings WHERE oid = $1)",
|
||||
r#"
|
||||
SELECT
|
||||
w.settings['disable_indexing']::boolean as disable_indexing,
|
||||
CASE
|
||||
WHEN w.settings['disable_indexing']::boolean THEN
|
||||
FALSE
|
||||
ELSE
|
||||
EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = c.partition_key AND m.oid = c.oid)
|
||||
END as has_index
|
||||
FROM af_collab c
|
||||
JOIN af_workspace w ON c.workspace_id = w.workspace_id
|
||||
WHERE c.oid = $1"#,
|
||||
oid
|
||||
)
|
||||
.fetch_one(tx.deref_mut())
|
||||
.await?;
|
||||
Ok(result.exists.unwrap_or(false))
|
||||
if result.disable_indexing.unwrap_or(false) {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(result.has_index.unwrap_or(false)))
|
||||
}
|
||||
|
||||
pub async fn upsert_collab_embeddings(
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
use database_entity::dto::{AFRole, AFWorkspaceInvitation, AFWorkspaceInvitationStatus};
|
||||
use database_entity::dto::{
|
||||
AFRole, AFWorkspaceInvitation, AFWorkspaceInvitationStatus, AFWorkspaceSettings,
|
||||
};
|
||||
use futures_util::stream::BoxStream;
|
||||
use sqlx::{
|
||||
types::{uuid, Uuid},
|
||||
|
@ -752,3 +754,39 @@ pub async fn is_workspace_exist<'a, E: Executor<'a, Database = Postgres>>(
|
|||
|
||||
Ok(exists.unwrap_or(false))
|
||||
}
|
||||
|
||||
pub async fn select_workspace_settings<'a, E: Executor<'a, Database = Postgres>>(
|
||||
executor: E,
|
||||
workspace_id: &Uuid,
|
||||
) -> Result<Option<AFWorkspaceSettings>, AppError> {
|
||||
let json = sqlx::query_scalar!(
|
||||
r#"SElECT settings FROM af_workspace WHERE workspace_id = $1"#,
|
||||
workspace_id
|
||||
)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
|
||||
match json {
|
||||
None => Ok(None),
|
||||
Some(value) => {
|
||||
let settings: AFWorkspaceSettings = serde_json::from_value(value)?;
|
||||
Ok(Some(settings))
|
||||
},
|
||||
}
|
||||
}
|
||||
pub async fn upsert_workspace_settings(
|
||||
tx: &mut Transaction<'_, Postgres>,
|
||||
workspace_id: &Uuid,
|
||||
settings: &AFWorkspaceSettings,
|
||||
) -> Result<(), AppError> {
|
||||
let json = serde_json::to_value(settings)?;
|
||||
sqlx::query!(
|
||||
r#"UPDATE af_workspace SET settings = $1 WHERE workspace_id = $2"#,
|
||||
json,
|
||||
workspace_id
|
||||
)
|
||||
.execute(tx.deref_mut())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
2
migrations/20240604090043_add_workspace_settings.sql
Normal file
2
migrations/20240604090043_add_workspace_settings.sql
Normal file
|
@ -0,0 +1,2 @@
|
|||
-- Add migration script here
|
||||
ALTER TABLE af_workspace ADD COLUMN settings JSONB;
|
|
@ -43,5 +43,6 @@ dotenvy = "0.15.0"
|
|||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.11"
|
||||
lazy_static.workspace = true
|
||||
workspace-template.workspace = true
|
||||
pgvector = { workspace = true, features = ["sqlx"] }
|
|
@ -23,7 +23,7 @@ use collab_stream::model::{CollabUpdateEvent, StreamMessage};
|
|||
use collab_stream::stream_group::{ReadOption, StreamGroup};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::indexer::{Fragment, FragmentID, Indexer};
|
||||
use crate::indexer::{Fragment, FragmentID, IndexStatus, Indexer};
|
||||
use crate::watchers::DocumentWatcher;
|
||||
|
||||
const CONSUMER_NAME: &str = "open_collab_handle";
|
||||
|
@ -51,7 +51,18 @@ impl CollabHandle {
|
|||
ingest_interval: Duration,
|
||||
) -> Result<Option<Self>> {
|
||||
let closing = CancellationToken::new();
|
||||
let was_indexed = indexer.was_indexed(&object_id).await?;
|
||||
let was_indexed = match indexer.index_status(&object_id).await? {
|
||||
IndexStatus::Indexed => true,
|
||||
IndexStatus::NotIndexed => false,
|
||||
IndexStatus::NotPermitted => {
|
||||
tracing::trace!(
|
||||
"document {}/{} is not permitted to be indexed",
|
||||
workspace_id,
|
||||
object_id
|
||||
);
|
||||
return Ok(None);
|
||||
},
|
||||
};
|
||||
let content: Arc<dyn Indexable> = match collab_type {
|
||||
CollabType::Document => {
|
||||
let content = Document::from_doc_state(
|
||||
|
|
|
@ -35,6 +35,7 @@ impl OpenCollabConsumer {
|
|||
indexer: Arc<dyn Indexer>,
|
||||
control_stream_key: &str,
|
||||
ingest_interval: Duration,
|
||||
preindex: bool,
|
||||
) -> Result<Self> {
|
||||
let handles = Arc::new(DashMap::new());
|
||||
let mut control_group = redis_stream
|
||||
|
@ -42,7 +43,9 @@ impl OpenCollabConsumer {
|
|||
.await?;
|
||||
|
||||
// Handle unindexed documents
|
||||
Self::handle_unindexed_collabs(indexer.clone()).await;
|
||||
if preindex {
|
||||
Self::handle_unindexed_collabs(indexer.clone()).await;
|
||||
}
|
||||
|
||||
// Handle stale messages
|
||||
let stale_messages = control_group.get_unacked_messages(CONSUMER_NAME).await?;
|
||||
|
@ -289,7 +292,7 @@ mod test {
|
|||
use collab_document::document_data::default_document_data;
|
||||
use collab_entity::CollabType;
|
||||
use collab_stream::model::CollabControlEvent;
|
||||
use database::index::has_collab_embeddings;
|
||||
use database::index::get_index_status;
|
||||
use serde_json::json;
|
||||
use sqlx::Row;
|
||||
use std::sync::Arc;
|
||||
|
@ -350,6 +353,7 @@ mod test {
|
|||
indexer.clone(),
|
||||
"af_collab_control",
|
||||
Duration::from_secs(1), // interval longer than test timeout
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -425,10 +429,14 @@ mod test {
|
|||
|
||||
{
|
||||
let mut tx = db.begin().await.unwrap();
|
||||
let has_embedding = has_collab_embeddings(&mut tx, &object_id.to_string())
|
||||
let status = get_index_status(&mut tx, &object_id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!has_embedding, "collab should not have embeddings at start");
|
||||
assert_eq!(
|
||||
status,
|
||||
Some(false),
|
||||
"collab should not have embeddings at start"
|
||||
);
|
||||
}
|
||||
|
||||
let indexer = Arc::new(PostgresIndexer::new(openai, db.clone()));
|
||||
|
@ -438,16 +446,17 @@ mod test {
|
|||
indexer.clone(),
|
||||
"af_collab_control",
|
||||
Duration::from_secs(1), // interval longer than test timeout
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut tx = db.begin().await.unwrap();
|
||||
let has_embedding = has_collab_embeddings(&mut tx, &object_id.to_string())
|
||||
let status = get_index_status(&mut tx, &object_id.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(has_embedding, "collab should be indexed after start");
|
||||
assert_eq!(status, Some(true), "collab should be indexed after start");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ use uuid::Uuid;
|
|||
|
||||
use database::collab::select_blob_from_af_collab;
|
||||
use database::index::{
|
||||
get_collabs_without_embeddings, has_collab_embeddings, remove_collab_embeddings,
|
||||
get_collabs_without_embeddings, get_index_status, remove_collab_embeddings,
|
||||
upsert_collab_embeddings,
|
||||
};
|
||||
use database_entity::dto::{AFCollabEmbeddingParams, EmbeddingContentType};
|
||||
|
@ -27,7 +27,7 @@ use crate::error::Result;
|
|||
#[async_trait]
|
||||
pub trait Indexer: Send + Sync {
|
||||
/// Check if document with given id has been already a corresponding index entry.
|
||||
async fn was_indexed(&self, object_id: &str) -> Result<bool>;
|
||||
async fn index_status(&self, object_id: &str) -> Result<IndexStatus>;
|
||||
async fn update_index(&self, workspace_id: &Uuid, documents: Vec<Fragment>) -> Result<()>;
|
||||
async fn remove(&self, ids: &[FragmentID]) -> Result<()>;
|
||||
/// Returns a list of object ids, that have not been indexed yet.
|
||||
|
@ -41,6 +41,16 @@ pub struct UnindexedCollab {
|
|||
pub collab: EncodedCollab,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum IndexStatus {
|
||||
/// Document is indexed and up-to-date.
|
||||
Indexed,
|
||||
/// Document is not indexed.
|
||||
NotIndexed,
|
||||
/// Document should never be indexed.
|
||||
NotPermitted,
|
||||
}
|
||||
|
||||
pub type FragmentID = String;
|
||||
|
||||
/// Fragment represents a single piece of indexable data.
|
||||
|
@ -192,9 +202,13 @@ struct Embeddings {
|
|||
|
||||
#[async_trait]
|
||||
impl Indexer for PostgresIndexer {
|
||||
async fn was_indexed(&self, object_id: &str) -> Result<bool> {
|
||||
let found = has_collab_embeddings(&mut self.db.begin().await?, object_id).await?;
|
||||
Ok(found)
|
||||
async fn index_status(&self, object_id: &str) -> Result<IndexStatus> {
|
||||
let found = get_index_status(&mut self.db.begin().await?, object_id).await?;
|
||||
match found {
|
||||
None => Ok(IndexStatus::NotPermitted),
|
||||
Some(true) => Ok(IndexStatus::Indexed),
|
||||
Some(false) => Ok(IndexStatus::NotIndexed),
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_index(&self, workspace_id: &Uuid, documents: Vec<Fragment>) -> Result<()> {
|
||||
|
|
|
@ -47,6 +47,9 @@ pub struct Config {
|
|||
value_enum
|
||||
)]
|
||||
pub app_env: Environment,
|
||||
|
||||
#[clap(long, env = "APPFLOWY_INDEXER_PREINDEX", default_value = "true")]
|
||||
pub preindex: bool,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -68,6 +71,7 @@ async fn run_server(config: Config) -> Result<(), Box<dyn std::error::Error>> {
|
|||
Arc::new(indexer),
|
||||
&config.control_stream_key,
|
||||
config.ingest_interval.into(),
|
||||
config.preindex,
|
||||
)
|
||||
.await?;
|
||||
tracing::info!("AppFlowy Indexer started!");
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
use std::borrow::Cow;
|
||||
use std::env;
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::Arc;
|
||||
|
||||
use collab::core::collab::MutexCollab;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_entity::CollabType;
|
||||
use lazy_static::lazy_static;
|
||||
use sqlx::PgPool;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
use yrs::Subscription;
|
||||
|
||||
|
@ -15,15 +19,35 @@ use database::collab::insert_into_af_collab;
|
|||
use database::user::create_user;
|
||||
use database_entity::dto::CollabParams;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref APPFLOWY_INDEXER_OPENAI_API_KEY: Cow<'static, str> =
|
||||
get_env_var("APPFLOWY_INDEXER_OPENAI_API_KEY", "");
|
||||
pub static ref APPFLOWY_INDEXER_DATABASE_URL: Cow<'static, str> = get_env_var(
|
||||
"APPFLOWY_INDEXER_DATABASE_URL",
|
||||
"postgres://postgres:password@localhost:5432/postgres"
|
||||
);
|
||||
pub static ref APPFLOWY_INDEXER_REDIS_URL: Cow<'static, str> =
|
||||
get_env_var("APPFLOWY_INDEXER_REDIS_URL", "redis://localhost:6379");
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn get_env_var<'default>(key: &str, default: &'default str) -> Cow<'default, str> {
|
||||
dotenvy::dotenv().ok();
|
||||
match env::var(key) {
|
||||
Ok(value) => Cow::Owned(value),
|
||||
Err(_) => {
|
||||
warn!("could not read env var {}: using default: {}", key, default);
|
||||
Cow::Borrowed(default)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn openai_client() -> openai_dive::v1::api::Client {
|
||||
let api_key = std::env::var("APPFLOWY_INDEXER_OPENAI_API_KEY").unwrap();
|
||||
openai_dive::v1::api::Client::new(api_key)
|
||||
openai_dive::v1::api::Client::new(APPFLOWY_INDEXER_OPENAI_API_KEY.to_string())
|
||||
}
|
||||
|
||||
pub async fn db_pool() -> PgPool {
|
||||
let database_url = std::env::var("APPFLOWY_INDEXER_DATABASE_URL")
|
||||
.unwrap_or("postgres://postgres:password@localhost:5432/postgres".to_string());
|
||||
PgPool::connect(&database_url)
|
||||
PgPool::connect(&APPFLOWY_INDEXER_DATABASE_URL)
|
||||
.await
|
||||
.expect("failed to connect to database")
|
||||
}
|
||||
|
@ -67,9 +91,7 @@ pub async fn setup_collab(
|
|||
}
|
||||
|
||||
pub async fn redis_client() -> redis::Client {
|
||||
let redis_uri =
|
||||
std::env::var("APPFLOWY_INDEXER_REDIS_URL").unwrap_or("redis://localhost:6379".to_string());
|
||||
redis::Client::open(redis_uri).expect("failed to connect to redis")
|
||||
redis::Client::open(APPFLOWY_INDEXER_REDIS_URL.to_string()).expect("failed to connect to redis")
|
||||
}
|
||||
|
||||
pub async fn redis_stream() -> CollabRedisStream {
|
||||
|
|
|
@ -69,6 +69,10 @@ pub fn workspace_scope() -> Scope {
|
|||
.service(web::resource("/{workspace_id}")
|
||||
.route(web::delete().to(delete_workspace_handler))
|
||||
)
|
||||
.service(web::resource("/{workspace_id}/settings")
|
||||
.route(web::get().to(get_workspace_settings_handler))
|
||||
.route(web::post().to(post_workspace_settings_handler))
|
||||
)
|
||||
.service(web::resource("/{workspace_id}/open").route(web::put().to(open_workspace_handler)))
|
||||
.service(web::resource("/{workspace_id}/leave").route(web::post().to(leave_workspace_handler)))
|
||||
.service(
|
||||
|
@ -297,6 +301,42 @@ async fn post_accept_workspace_invite_handler(
|
|||
Ok(AppResponse::Ok().into())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, err, fields(user_uuid))]
|
||||
async fn get_workspace_settings_handler(
|
||||
user_uuid: UserUuid,
|
||||
state: Data<AppState>,
|
||||
workspace_id: web::Path<Uuid>,
|
||||
) -> Result<JsonAppResponse<AFWorkspaceSettings>> {
|
||||
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
|
||||
let settings = workspace::ops::get_workspace_settings(
|
||||
&state.pg_pool,
|
||||
&state.workspace_access_control,
|
||||
&workspace_id,
|
||||
&uid,
|
||||
)
|
||||
.await?;
|
||||
Ok(AppResponse::Ok().with_data(settings).into())
|
||||
}
|
||||
|
||||
#[instrument(level = "info", skip_all, err, fields(user_uuid))]
|
||||
async fn post_workspace_settings_handler(
|
||||
user_uuid: UserUuid,
|
||||
state: Data<AppState>,
|
||||
workspace_id: web::Path<Uuid>,
|
||||
data: Json<AFWorkspaceSettings>,
|
||||
) -> Result<JsonAppResponse<()>> {
|
||||
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
|
||||
workspace::ops::update_workspace_settings(
|
||||
&state.pg_pool,
|
||||
&state.workspace_access_control,
|
||||
&workspace_id,
|
||||
&uid,
|
||||
&data.into_inner(),
|
||||
)
|
||||
.await?;
|
||||
Ok(AppResponse::Ok().into())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, err)]
|
||||
async fn get_workspace_members_handler(
|
||||
_user_uuid: UserUuid,
|
||||
|
|
|
@ -8,7 +8,7 @@ use tracing::instrument;
|
|||
use uuid::Uuid;
|
||||
|
||||
use access_control::workspace::WorkspaceAccessControl;
|
||||
use app_error::AppError;
|
||||
use app_error::{AppError, ErrorCode};
|
||||
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
|
||||
use database::collab::upsert_collab_member_with_txn;
|
||||
use database::file::bucket_s3_impl::BucketClientS3Impl;
|
||||
|
@ -20,13 +20,14 @@ use database::workspace::{
|
|||
change_workspace_icon, delete_from_workspace, delete_workspace_members, get_invitation_by_id,
|
||||
insert_user_workspace, insert_workspace_invitation, rename_workspace, select_all_user_workspaces,
|
||||
select_user_is_workspace_owner, select_workspace, select_workspace_invitations_for_user,
|
||||
select_workspace_member, select_workspace_member_list, select_workspace_total_collab_bytes,
|
||||
update_updated_at_of_workspace, update_workspace_invitation_set_status_accepted,
|
||||
upsert_workspace_member, upsert_workspace_member_with_txn,
|
||||
select_workspace_member, select_workspace_member_list, select_workspace_settings,
|
||||
select_workspace_total_collab_bytes, update_updated_at_of_workspace,
|
||||
update_workspace_invitation_set_status_accepted, upsert_workspace_member,
|
||||
upsert_workspace_member_with_txn, upsert_workspace_settings,
|
||||
};
|
||||
use database_entity::dto::{
|
||||
AFAccessLevel, AFRole, AFWorkspace, AFWorkspaceInvitation, AFWorkspaceInvitationStatus,
|
||||
WorkspaceUsage,
|
||||
AFWorkspaceSettings, WorkspaceUsage,
|
||||
};
|
||||
use gotrue::params::{GenerateLinkParams, GenerateLinkType};
|
||||
use shared_entity::dto::workspace_dto::{
|
||||
|
@ -464,3 +465,48 @@ pub async fn get_workspace_document_total_bytes(
|
|||
total_document_size: byte_count,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_workspace_settings(
|
||||
pg_pool: &PgPool,
|
||||
workspace_access_control: &impl WorkspaceAccessControl,
|
||||
workspace_id: &Uuid,
|
||||
owner_uid: &i64,
|
||||
) -> Result<AFWorkspaceSettings, AppResponseError> {
|
||||
let has_access = workspace_access_control
|
||||
.enforce_role(owner_uid, &workspace_id.to_string(), AFRole::Owner)
|
||||
.await?;
|
||||
|
||||
if !has_access {
|
||||
return Err(AppResponseError::new(
|
||||
ErrorCode::UserUnAuthorized,
|
||||
"Only workspace owner can access workspace settings",
|
||||
));
|
||||
}
|
||||
|
||||
let settings = select_workspace_settings(pg_pool, workspace_id).await?;
|
||||
Ok(settings.unwrap_or_default())
|
||||
}
|
||||
|
||||
pub async fn update_workspace_settings(
|
||||
pg_pool: &PgPool,
|
||||
workspace_access_control: &impl WorkspaceAccessControl,
|
||||
workspace_id: &Uuid,
|
||||
owner_uid: &i64,
|
||||
workspace_settings: &AFWorkspaceSettings,
|
||||
) -> Result<(), AppResponseError> {
|
||||
let has_access = workspace_access_control
|
||||
.enforce_role(owner_uid, &workspace_id.to_string(), AFRole::Owner)
|
||||
.await?;
|
||||
|
||||
if !has_access {
|
||||
return Err(AppResponseError::new(
|
||||
ErrorCode::UserUnAuthorized,
|
||||
"Only workspace owner can edit workspace settings",
|
||||
));
|
||||
}
|
||||
|
||||
let mut tx = pg_pool.begin().await?;
|
||||
upsert_workspace_settings(&mut tx, workspace_id, workspace_settings).await?;
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -4,3 +4,4 @@ mod invitation_crud;
|
|||
mod member_crud;
|
||||
mod template_test;
|
||||
mod workspace_crud;
|
||||
mod workspace_settings;
|
||||
|
|
97
tests/workspace/workspace_settings.rs
Normal file
97
tests/workspace/workspace_settings.rs
Normal file
|
@ -0,0 +1,97 @@
|
|||
use app_error::ErrorCode;
|
||||
use client_api::Client;
|
||||
use client_api_test::generate_unique_registered_user_client;
|
||||
use database_entity::dto::{AFRole, AFWorkspaceInvitationStatus, AFWorkspaceSettings};
|
||||
use shared_entity::dto::workspace_dto::WorkspaceMemberInvitation;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_and_set_workspace_by_owner() {
|
||||
let (c, _user) = generate_unique_registered_user_client().await;
|
||||
let workspaces = c.get_workspaces().await.unwrap().0;
|
||||
let workspace_id = workspaces.first().unwrap().workspace_id.to_string();
|
||||
|
||||
let mut settings = c.get_workspace_settings(&workspace_id).await.unwrap();
|
||||
assert!(
|
||||
!settings.disable_indexing,
|
||||
"indexing should be enabled by default"
|
||||
);
|
||||
|
||||
settings.disable_indexing = true;
|
||||
c.update_workspace_settings(&workspace_id, &settings)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let settings = c.get_workspace_settings(&workspace_id).await.unwrap();
|
||||
assert!(settings.disable_indexing);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_and_set_workspace_by_non_owner() {
|
||||
let (alice_client, _alice) = generate_unique_registered_user_client().await;
|
||||
let workspaces = alice_client.get_workspaces().await.unwrap().0;
|
||||
let alice_workspace_id = workspaces.first().unwrap().workspace_id;
|
||||
|
||||
let (bob_client, bob) = generate_unique_registered_user_client().await;
|
||||
|
||||
invite_user_to_workspace(&alice_workspace_id, &alice_client, &bob_client, &bob.email).await;
|
||||
|
||||
let resp = bob_client
|
||||
.get_workspace_settings(&alice_workspace_id.to_string())
|
||||
.await;
|
||||
assert!(
|
||||
resp.is_err(),
|
||||
"non-owner should not have access to workspace settings"
|
||||
);
|
||||
assert_eq!(resp.err().unwrap().code, ErrorCode::UserUnAuthorized);
|
||||
|
||||
let resp = bob_client
|
||||
.update_workspace_settings(
|
||||
&alice_workspace_id.to_string(),
|
||||
&AFWorkspaceSettings {
|
||||
disable_indexing: true,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
resp.is_err(),
|
||||
"non-owner should not be able to edit workspace settings"
|
||||
);
|
||||
assert_eq!(resp.err().unwrap().code, ErrorCode::UserUnAuthorized);
|
||||
}
|
||||
|
||||
async fn invite_user_to_workspace(
|
||||
workspace_id: &Uuid,
|
||||
owner: &Client,
|
||||
member: &Client,
|
||||
member_email: &str,
|
||||
) {
|
||||
owner
|
||||
.invite_workspace_members(
|
||||
workspace_id.to_string().as_str(),
|
||||
vec![WorkspaceMemberInvitation {
|
||||
email: member_email.to_string(),
|
||||
role: AFRole::Member,
|
||||
}],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// list invitation with pending filter
|
||||
let pending_invs = member
|
||||
.list_workspace_invitations(Some(AFWorkspaceInvitationStatus::Pending))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(pending_invs.len(), 1);
|
||||
|
||||
// accept invitation
|
||||
let target_invite = pending_invs
|
||||
.iter()
|
||||
.find(|i| i.workspace_id == *workspace_id)
|
||||
.unwrap();
|
||||
|
||||
member
|
||||
.accept_workspace_invitation(target_invite.invite_id.to_string().as_str())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
Loading…
Add table
Reference in a new issue