mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
🌊 Streams: Make tests platform agnostic (#206979)
Fixes https://github.com/elastic/streams-program/issues/29 This PR makes the streams API tests platform agnostic. Some changes besides basic moving over were required, documented in code
This commit is contained in:
parent
4b5f46619e
commit
11d5c96b44
21 changed files with 228 additions and 257 deletions
|
@ -31,7 +31,6 @@ enabled:
|
|||
- x-pack/test/api_integration/apis/synthetics/config.ts
|
||||
- x-pack/test/api_integration/apis/uptime/config.ts
|
||||
- x-pack/test/api_integration/apis/entity_manager/config.ts
|
||||
- x-pack/test/api_integration/apis/streams/config.ts
|
||||
- x-pack/test/apm_api_integration/basic/config.ts
|
||||
- x-pack/test/apm_api_integration/cloud/config.ts
|
||||
- x-pack/test/apm_api_integration/rules/config.ts
|
||||
|
|
4
.github/CODEOWNERS
vendored
4
.github/CODEOWNERS
vendored
|
@ -1247,7 +1247,6 @@ packages/kbn-monaco/src/esql @elastic/kibana-esql
|
|||
|
||||
|
||||
# Observability UI
|
||||
/x-pack/test/api_integration/apis/streams @elastic/observability-ui # Assigned per https://github.com/elastic/kibana/pull/201293
|
||||
/x-pack/test_serverless/api_integration/test_suites/observability/config.ts @elastic/observability-ui @elastic/appex-qa
|
||||
/x-pack/test_serverless/api_integration/test_suites/observability/index.ts @elastic/observability-ui
|
||||
|
||||
|
@ -1450,6 +1449,9 @@ packages/kbn-monaco/src/esql @elastic/kibana-esql
|
|||
# Observability settings
|
||||
/x-pack/plugins/observability_solution/observability/server/ui_settings.ts @elastic/obs-docs
|
||||
|
||||
# Streams
|
||||
/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams @elastic/streams-program-team
|
||||
|
||||
### END Observability Plugins
|
||||
|
||||
# Presentation
|
||||
|
|
|
@ -81,6 +81,7 @@ export class StreamsClient {
|
|||
assetClient: AssetClient;
|
||||
storageClient: StreamsStorageClient;
|
||||
logger: Logger;
|
||||
isServerless: boolean;
|
||||
}
|
||||
) {}
|
||||
|
||||
|
@ -178,6 +179,7 @@ export class StreamsClient {
|
|||
definition,
|
||||
logger,
|
||||
scopedClusterClient,
|
||||
isServerless: this.dependencies.isServerless,
|
||||
});
|
||||
} else if (isIngestStream(definition)) {
|
||||
await syncIngestStreamDefinitionObjects({
|
||||
|
|
|
@ -44,8 +44,10 @@ export async function syncWiredStreamDefinitionObjects({
|
|||
definition,
|
||||
scopedClusterClient,
|
||||
logger,
|
||||
isServerless,
|
||||
}: SyncStreamParamsBase & {
|
||||
definition: WiredStreamDefinition;
|
||||
isServerless: boolean;
|
||||
}) {
|
||||
const componentTemplate = generateLayer(definition.name, definition);
|
||||
await upsertComponent({
|
||||
|
@ -72,7 +74,7 @@ export async function syncWiredStreamDefinitionObjects({
|
|||
await upsertTemplate({
|
||||
esClient: scopedClusterClient.asCurrentUser,
|
||||
logger,
|
||||
template: generateIndexTemplate(definition.name),
|
||||
template: generateIndexTemplate(definition.name, isServerless),
|
||||
});
|
||||
|
||||
await upsertDataStream({
|
||||
|
|
|
@ -9,7 +9,7 @@ import { ASSET_VERSION } from '../../../../common/constants';
|
|||
import { getProcessingPipelineName } from '../ingest_pipelines/name';
|
||||
import { getIndexTemplateName } from './name';
|
||||
|
||||
export function generateIndexTemplate(id: string) {
|
||||
export function generateIndexTemplate(id: string, isServerless: boolean) {
|
||||
const composedOf = id.split('.').reduce((acc, _, index, array) => {
|
||||
const parent = array.slice(0, index + 1).join('.');
|
||||
return [...acc, `${parent}@stream.layer`];
|
||||
|
@ -27,7 +27,7 @@ export function generateIndexTemplate(id: string) {
|
|||
},
|
||||
data_stream: {
|
||||
hidden: false,
|
||||
failure_store: true,
|
||||
failure_store: isServerless ? undefined : true, // TODO: Enable failure store for serverless once it is rolled out
|
||||
},
|
||||
template: {
|
||||
settings: {
|
||||
|
|
|
@ -52,6 +52,8 @@ export class StreamsService {
|
|||
|
||||
const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request);
|
||||
|
||||
const isServerless = coreStart.elasticsearch.getCapabilities().serverless;
|
||||
|
||||
const storageAdapter = new StorageIndexAdapter(
|
||||
scopedClusterClient.asInternalUser,
|
||||
logger,
|
||||
|
@ -63,6 +65,7 @@ export class StreamsService {
|
|||
logger,
|
||||
scopedClusterClient,
|
||||
storageClient: storageAdapter.getClient(),
|
||||
isServerless,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { FtrConfigProviderContext, getKibanaCliLoggers } from '@kbn/test';
|
||||
|
||||
export default async function ({ readConfigFile }: FtrConfigProviderContext) {
|
||||
const baseIntegrationTestsConfig = await readConfigFile(require.resolve('../../config.ts'));
|
||||
return {
|
||||
...baseIntegrationTestsConfig.getAll(),
|
||||
kbnTestServer: {
|
||||
...baseIntegrationTestsConfig.get('kbnTestServer'),
|
||||
serverArgs: [
|
||||
...baseIntegrationTestsConfig.get('kbnTestServer.serverArgs'),
|
||||
`--logging.loggers=${JSON.stringify([
|
||||
...getKibanaCliLoggers(baseIntegrationTestsConfig.get('kbnTestServer.serverArgs')),
|
||||
{
|
||||
name: 'plugins.streams',
|
||||
level: 'debug',
|
||||
appenders: ['default'],
|
||||
},
|
||||
])}`,
|
||||
],
|
||||
},
|
||||
testFiles: [require.resolve('.')],
|
||||
};
|
||||
}
|
|
@ -1,95 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import {
|
||||
disableStreams,
|
||||
enableStreams,
|
||||
forkStream,
|
||||
getUnmappedFieldsForStream,
|
||||
indexDocument,
|
||||
simulateFieldsForStream,
|
||||
} from './helpers/requests';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
import { createStreamsRepositorySupertestClient } from './helpers/repository_client';
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
const esClient = getService('es');
|
||||
|
||||
const apiClient = createStreamsRepositorySupertestClient(supertest);
|
||||
|
||||
describe('Streams Schema', () => {
|
||||
before(async () => {
|
||||
await enableStreams(apiClient);
|
||||
|
||||
const doc = {
|
||||
'@timestamp': '2024-01-01T00:00:10.000Z',
|
||||
message: '2023-01-01T00:00:10.000Z error test',
|
||||
['some.field']: 'some value',
|
||||
['another.field']: 'another value',
|
||||
lastField: 'last value',
|
||||
['log.level']: 'warning',
|
||||
};
|
||||
|
||||
await indexDocument(esClient, 'logs', doc);
|
||||
});
|
||||
|
||||
after(async () => {
|
||||
await disableStreams(apiClient);
|
||||
});
|
||||
|
||||
describe('Unmapped fields API', () => {
|
||||
it('Returns unmapped fields', async () => {
|
||||
const response = await getUnmappedFieldsForStream(supertest, 'logs');
|
||||
expect(response.unmappedFields).to.eql(['another.field', 'lastField', 'some.field']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Fields simulation API', () => {
|
||||
it('Returns failure status when simulation would fail', async () => {
|
||||
const response = await simulateFieldsForStream(supertest, 'logs', {
|
||||
field_definitions: [{ name: 'message', type: 'boolean' }],
|
||||
});
|
||||
|
||||
expect(response.status).to.be('failure');
|
||||
expect(response.simulationError).to.be.a('string');
|
||||
expect(response.documentsWithRuntimeFieldsApplied).to.be(null);
|
||||
});
|
||||
it('Returns success status when simulation would succeed', async () => {
|
||||
const response = await simulateFieldsForStream(supertest, 'logs', {
|
||||
field_definitions: [{ name: 'message', type: 'keyword' }],
|
||||
});
|
||||
|
||||
expect(response.status).to.be('success');
|
||||
expect(response.simulationError).to.be(null);
|
||||
expect(response.documentsWithRuntimeFieldsApplied).length(1);
|
||||
});
|
||||
it('Returns unknown status when documents are missing and status cannot be determined', async () => {
|
||||
const forkBody = {
|
||||
stream: {
|
||||
name: 'logs.nginx',
|
||||
},
|
||||
condition: {
|
||||
field: 'log.logger',
|
||||
operator: 'eq' as const,
|
||||
value: 'nginx',
|
||||
},
|
||||
};
|
||||
|
||||
await forkStream(apiClient, 'logs', forkBody);
|
||||
const response = await simulateFieldsForStream(supertest, 'logs.nginx', {
|
||||
field_definitions: [{ name: 'message', type: 'keyword' }],
|
||||
});
|
||||
|
||||
expect(response.status).to.be('unknown');
|
||||
expect(response.simulationError).to.be(null);
|
||||
expect(response.documentsWithRuntimeFieldsApplied).to.be(null);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
|
@ -6,16 +6,19 @@
|
|||
*/
|
||||
import expect from '@kbn/expect';
|
||||
import { disableStreams, enableStreams, indexDocument } from '../helpers/requests';
|
||||
import { createStreamsRepositorySupertestClient } from '../helpers/repository_client';
|
||||
import { FtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from '../helpers/repository_client';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../../ftr_provider_context';
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
const esClient = getService('es');
|
||||
|
||||
const kibanaServer = getService('kibanaServer');
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
|
||||
const apiClient = createStreamsRepositorySupertestClient(supertest);
|
||||
const kibanaServer = getService('kibanaServer');
|
||||
|
||||
const SPACE_ID = 'default';
|
||||
const ARCHIVES = [
|
||||
|
@ -107,6 +110,7 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
|
||||
describe('Asset links', () => {
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
await enableStreams(apiClient);
|
||||
|
||||
await indexDocument(esClient, 'logs', {
|
|
@ -6,20 +6,26 @@
|
|||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
import { createStreamsRepositorySupertestClient } from './helpers/repository_client';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from './helpers/repository_client';
|
||||
import { disableStreams, enableStreams, fetchDocument, indexDocument } from './helpers/requests';
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
const esClient = getService('es');
|
||||
const config = getService('config');
|
||||
const isServerless = !!config.get('serverless');
|
||||
|
||||
const TEST_STREAM_NAME = 'logs-test-default';
|
||||
|
||||
const apiClient = createStreamsRepositorySupertestClient(supertest);
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
|
||||
describe('Classic streams', () => {
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
await enableStreams(apiClient);
|
||||
});
|
||||
|
||||
|
@ -94,10 +100,12 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
name: TEST_STREAM_NAME,
|
||||
dashboards: [],
|
||||
inherited_fields: {},
|
||||
lifecycle: {
|
||||
policy: 'logs',
|
||||
type: 'ilm',
|
||||
},
|
||||
lifecycle: isServerless
|
||||
? { type: 'dlm' }
|
||||
: {
|
||||
policy: 'logs',
|
||||
type: 'ilm',
|
||||
},
|
||||
stream: {
|
||||
ingest: {
|
||||
processing: [
|
|
@ -16,17 +16,20 @@ import {
|
|||
indexDocument,
|
||||
putStream,
|
||||
} from './helpers/requests';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
import { createStreamsRepositorySupertestClient } from './helpers/repository_client';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from './helpers/repository_client';
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
const esClient = getService('es');
|
||||
|
||||
const apiClient = createStreamsRepositorySupertestClient(supertest);
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
|
||||
describe('Enrichment', () => {
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
await enableStreams(apiClient);
|
||||
const body = {
|
||||
stream: {
|
|
@ -9,8 +9,11 @@ import expect from '@kbn/expect';
|
|||
import { ClientRequestParamsOf } from '@kbn/server-route-repository-utils';
|
||||
import type { StreamsRouteRepository } from '@kbn/streams-plugin/server';
|
||||
import { ReadStreamDefinition, WiredReadStreamDefinition } from '@kbn/streams-schema';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
import { createStreamsRepositorySupertestClient } from './helpers/repository_client';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from './helpers/repository_client';
|
||||
import { disableStreams, enableStreams, indexDocument } from './helpers/requests';
|
||||
|
||||
type StreamPutItem = ClientRequestParamsOf<
|
||||
|
@ -120,15 +123,16 @@ const streams: StreamPutItem[] = [
|
|||
},
|
||||
];
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
const esClient = getService('es');
|
||||
|
||||
const apiClient = createStreamsRepositorySupertestClient(supertest);
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
|
||||
// An anticipated use case is that a user will want to flush a tree of streams from a config file
|
||||
describe('Flush from config file', () => {
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
await enableStreams(apiClient);
|
||||
await createStreams();
|
||||
await indexDocuments();
|
|
@ -6,8 +6,11 @@
|
|||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
import { createStreamsRepositorySupertestClient } from './helpers/repository_client';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from './helpers/repository_client';
|
||||
import {
|
||||
disableStreams,
|
||||
enableStreams,
|
||||
|
@ -16,12 +19,11 @@ import {
|
|||
indexDocument,
|
||||
} from './helpers/requests';
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
const esClient = getService('es');
|
||||
|
||||
const apiClient = createStreamsRepositorySupertestClient(supertest);
|
||||
|
||||
interface Resources {
|
||||
indices: string[];
|
||||
componentTemplates: string[];
|
||||
|
@ -57,6 +59,10 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
return response.body.enabled;
|
||||
}
|
||||
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
});
|
||||
|
||||
describe('initially', () => {
|
||||
let resources: Resources;
|
||||
|
|
@ -5,16 +5,16 @@
|
|||
* 2.0.
|
||||
*/
|
||||
import type { StreamsRouteRepository } from '@kbn/streams-plugin/server';
|
||||
import supertest from 'supertest';
|
||||
import { RoleScopedSupertestProvider } from '../../../../services/role_scoped_supertest';
|
||||
import {
|
||||
RepositorySupertestClient,
|
||||
getApiClientFromSupertest,
|
||||
} from '../../../../common/utils/server_route_repository/create_supertest_service_from_repository';
|
||||
getAdminApiClient,
|
||||
} from '../../../../../../common/utils/server_route_repository/create_admin_service_from_repository';
|
||||
|
||||
export type StreamsSupertestRepositoryClient = RepositorySupertestClient<StreamsRouteRepository>;
|
||||
|
||||
export function createStreamsRepositorySupertestClient(
|
||||
st: supertest.Agent
|
||||
): StreamsSupertestRepositoryClient {
|
||||
return getApiClientFromSupertest<StreamsRouteRepository>(st);
|
||||
export async function createStreamsRepositoryAdminClient(
|
||||
st: ReturnType<typeof RoleScopedSupertestProvider>
|
||||
): Promise<StreamsSupertestRepositoryClient> {
|
||||
return getAdminApiClient<StreamsRouteRepository>(st);
|
||||
}
|
|
@ -6,7 +6,6 @@
|
|||
*/
|
||||
import { Client } from '@elastic/elasticsearch';
|
||||
import { JsonObject } from '@kbn/utility-types';
|
||||
import { Agent } from 'supertest';
|
||||
import expect from '@kbn/expect';
|
||||
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import { StreamConfigDefinition } from '@kbn/streams-schema';
|
||||
|
@ -75,33 +74,3 @@ export async function putStream(
|
|||
.expect(expectStatusCode)
|
||||
.then((response) => response.body);
|
||||
}
|
||||
|
||||
export async function getStream(supertest: Agent, name: string) {
|
||||
const req = supertest.get(`/api/streams/${encodeURIComponent(name)}`).set('kbn-xsrf', 'xxx');
|
||||
const response = await req.send().expect(200);
|
||||
return response.body;
|
||||
}
|
||||
|
||||
export async function listStreams(supertest: Agent) {
|
||||
const req = supertest.get(`/api/streams`).set('kbn-xsrf', 'xxx');
|
||||
const response = await req.send().expect(200);
|
||||
return response.body;
|
||||
}
|
||||
|
||||
export async function deleteStream(supertest: Agent, id: string) {
|
||||
const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx');
|
||||
const response = await req.send().expect(200);
|
||||
return response.body;
|
||||
}
|
||||
|
||||
export async function getUnmappedFieldsForStream(supertest: Agent, id: string) {
|
||||
const req = supertest.get(`/api/streams/${id}/schema/unmapped_fields`).set('kbn-xsrf', 'xxx');
|
||||
const response = await req.send().expect(200);
|
||||
return response.body;
|
||||
}
|
||||
|
||||
export async function simulateFieldsForStream(supertest: Agent, id: string, body: JsonObject) {
|
||||
const req = supertest.post(`/api/streams/${id}/schema/fields_simulation`).set('kbn-xsrf', 'xxx');
|
||||
const response = await req.send(body).expect(200);
|
||||
return response.body;
|
||||
}
|
|
@ -5,9 +5,9 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { FtrProviderContext } from '../../ftr_provider_context';
|
||||
import type { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
|
||||
export default function ({ loadTestFile }: FtrProviderContext) {
|
||||
export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext) {
|
||||
describe('Streams Endpoints', () => {
|
||||
loadTestFile(require.resolve('./full_flow'));
|
||||
loadTestFile(require.resolve('./enrichment'));
|
|
@ -7,9 +7,12 @@
|
|||
|
||||
import expect from '@kbn/expect';
|
||||
import { WiredStreamConfigDefinition, WiredStreamDefinition } from '@kbn/streams-schema';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
import { disableStreams, enableStreams, putStream } from './helpers/requests';
|
||||
import { createStreamsRepositorySupertestClient } from './helpers/repository_client';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from './helpers/repository_client';
|
||||
|
||||
const rootStreamDefinition: WiredStreamDefinition = {
|
||||
name: 'logs',
|
||||
|
@ -37,12 +40,13 @@ const rootStreamDefinition: WiredStreamDefinition = {
|
|||
},
|
||||
};
|
||||
|
||||
export default function ({ getService }: FtrProviderContext) {
|
||||
const supertest = getService('supertest');
|
||||
const apiClient = createStreamsRepositorySupertestClient(supertest);
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
|
||||
describe('Root stream', () => {
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
await enableStreams(apiClient);
|
||||
});
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { disableStreams, enableStreams, forkStream, indexDocument } from './helpers/requests';
|
||||
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
|
||||
import {
|
||||
StreamsSupertestRepositoryClient,
|
||||
createStreamsRepositoryAdminClient,
|
||||
} from './helpers/repository_client';
|
||||
|
||||
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
|
||||
const roleScopedSupertest = getService('roleScopedSupertest');
|
||||
const esClient = getService('es');
|
||||
|
||||
let apiClient: StreamsSupertestRepositoryClient;
|
||||
|
||||
describe('Streams Schema', () => {
|
||||
before(async () => {
|
||||
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
|
||||
await enableStreams(apiClient);
|
||||
|
||||
const doc = {
|
||||
'@timestamp': '2024-01-01T00:00:10.000Z',
|
||||
message: '2023-01-01T00:00:10.000Z error test',
|
||||
['some.field']: 'some value',
|
||||
['another.field']: 'another value',
|
||||
lastField: 'last value',
|
||||
['log.level']: 'warning',
|
||||
};
|
||||
|
||||
await indexDocument(esClient, 'logs', doc);
|
||||
});
|
||||
|
||||
after(async () => {
|
||||
await disableStreams(apiClient);
|
||||
});
|
||||
|
||||
describe('Unmapped fields API', () => {
|
||||
it('Returns unmapped fields', async () => {
|
||||
const response = await apiClient
|
||||
.fetch('GET /api/streams/{id}/schema/unmapped_fields', {
|
||||
params: {
|
||||
path: {
|
||||
id: 'logs',
|
||||
},
|
||||
},
|
||||
})
|
||||
.expect(200);
|
||||
expect(response.body.unmappedFields).to.eql(['another.field', 'lastField', 'some.field']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Fields simulation API', () => {
|
||||
it('Returns failure status when simulation would fail', async () => {
|
||||
const response = await apiClient.fetch('POST /api/streams/{id}/schema/fields_simulation', {
|
||||
params: {
|
||||
path: {
|
||||
id: 'logs',
|
||||
},
|
||||
body: {
|
||||
field_definitions: [{ name: 'message', type: 'boolean' }],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(response.body.status).to.be('failure');
|
||||
expect(response.body.simulationError).to.be.a('string');
|
||||
expect(response.body.documentsWithRuntimeFieldsApplied).to.be(null);
|
||||
});
|
||||
it('Returns success status when simulation would succeed', async () => {
|
||||
const response = await apiClient.fetch('POST /api/streams/{id}/schema/fields_simulation', {
|
||||
params: {
|
||||
path: {
|
||||
id: 'logs',
|
||||
},
|
||||
body: {
|
||||
field_definitions: [{ name: 'message', type: 'keyword' }],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(response.body.status).to.be('success');
|
||||
expect(response.body.simulationError).to.be(null);
|
||||
expect(response.body.documentsWithRuntimeFieldsApplied).length(1);
|
||||
});
|
||||
it('Returns unknown status when documents are missing and status cannot be determined', async () => {
|
||||
const forkBody = {
|
||||
stream: {
|
||||
name: 'logs.nginx',
|
||||
},
|
||||
condition: {
|
||||
field: 'log.logger',
|
||||
operator: 'eq' as const,
|
||||
value: 'nginx',
|
||||
},
|
||||
};
|
||||
|
||||
await forkStream(apiClient, 'logs', forkBody);
|
||||
const response = await apiClient.fetch('POST /api/streams/{id}/schema/fields_simulation', {
|
||||
params: {
|
||||
path: {
|
||||
id: 'logs.nginx',
|
||||
},
|
||||
body: {
|
||||
field_definitions: [{ name: 'message', type: 'keyword' }],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(response.body.status).to.be('unknown');
|
||||
expect(response.body.simulationError).to.be(null);
|
||||
expect(response.body.documentsWithRuntimeFieldsApplied).to.be(null);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
|
@ -22,5 +22,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
|
|||
loadTestFile(require.resolve('../../apis/observability/slo'));
|
||||
loadTestFile(require.resolve('../../apis/observability/synthetics'));
|
||||
loadTestFile(require.resolve('../../apis/observability/ai_assistant'));
|
||||
loadTestFile(require.resolve('../../apis/observability/streams'));
|
||||
});
|
||||
}
|
||||
|
|
|
@ -16,5 +16,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
|
|||
loadTestFile(require.resolve('../../apis/observability/synthetics'));
|
||||
loadTestFile(require.resolve('../../apis/observability/infra'));
|
||||
loadTestFile(require.resolve('../../apis/observability/ai_assistant'));
|
||||
loadTestFile(require.resolve('../../apis/observability/streams'));
|
||||
});
|
||||
}
|
||||
|
|
|
@ -12,10 +12,10 @@ import {
|
|||
ReturnOf,
|
||||
ClientRequestParamsOf,
|
||||
} from '@kbn/server-route-repository';
|
||||
import supertest from 'supertest';
|
||||
import { Subtract, RequiredKeys } from 'utility-types';
|
||||
import { format, UrlObject } from 'url';
|
||||
import { kbnTestConfig } from '@kbn/test';
|
||||
import { format } from 'url';
|
||||
import supertest from 'supertest';
|
||||
import { RoleScopedSupertestProvider } from '../../../api_integration/deployment_agnostic/services/role_scoped_supertest';
|
||||
|
||||
type MaybeOptional<TArgs extends Record<string, any>> = RequiredKeys<TArgs> extends never
|
||||
? [TArgs] | []
|
||||
|
@ -44,49 +44,13 @@ type RepositorySupertestReturnOf<
|
|||
}>
|
||||
>;
|
||||
|
||||
type ScopedApiClientWithBasicAuthFactory<TServerRouteRepository extends ServerRouteRepository> = (
|
||||
kibanaServer: UrlObject,
|
||||
username: string
|
||||
) => RepositorySupertestClient<TServerRouteRepository>;
|
||||
|
||||
type ApiClientFromSupertestFactory<TServerRouteRepository extends ServerRouteRepository> = (
|
||||
st: supertest.Agent
|
||||
) => RepositorySupertestClient<TServerRouteRepository>;
|
||||
|
||||
interface RepositorySupertestClientFactory<TServerRouteRepository extends ServerRouteRepository> {
|
||||
getScopedApiClientWithBasicAuth: ScopedApiClientWithBasicAuthFactory<TServerRouteRepository>;
|
||||
getApiClientFromSupertest: ApiClientFromSupertestFactory<TServerRouteRepository>;
|
||||
}
|
||||
|
||||
export function createSupertestClientFactoryFromRepository<
|
||||
TServerRouteRepository extends ServerRouteRepository
|
||||
>(): RepositorySupertestClientFactory<TServerRouteRepository> {
|
||||
return {
|
||||
getScopedApiClientWithBasicAuth: (kibanaServer, username) => {
|
||||
return getScopedApiClientWithBasicAuth<TServerRouteRepository>(kibanaServer, username);
|
||||
},
|
||||
getApiClientFromSupertest: (st) => {
|
||||
return getApiClientFromSupertest<TServerRouteRepository>(st);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function getScopedApiClientWithBasicAuth<TServerRouteRepository extends ServerRouteRepository>(
|
||||
kibanaServer: UrlObject,
|
||||
username: string
|
||||
): RepositorySupertestClient<TServerRouteRepository> {
|
||||
const { password } = kbnTestConfig.getUrlParts();
|
||||
const baseUrlWithAuth = format({
|
||||
...kibanaServer,
|
||||
auth: `${username}:${password}`,
|
||||
export async function getAdminApiClient<TServerRouteRepository extends ServerRouteRepository>(
|
||||
st: ReturnType<typeof RoleScopedSupertestProvider>
|
||||
): Promise<RepositorySupertestClient<TServerRouteRepository>> {
|
||||
const supertestAdmin = await st.getSupertestWithRoleScope('admin', {
|
||||
useCookieHeader: true,
|
||||
withInternalHeaders: true,
|
||||
});
|
||||
|
||||
return getApiClientFromSupertest(supertest(baseUrlWithAuth));
|
||||
}
|
||||
|
||||
export function getApiClientFromSupertest<TServerRouteRepository extends ServerRouteRepository>(
|
||||
st: supertest.Agent
|
||||
): RepositorySupertestClient<TServerRouteRepository> {
|
||||
return {
|
||||
fetch: (endpoint, ...rest) => {
|
||||
const options = rest.length ? rest[0] : { type: undefined };
|
||||
|
@ -98,16 +62,19 @@ export function getApiClientFromSupertest<TServerRouteRepository extends ServerR
|
|||
const { method, pathname, version } = formatRequest(endpoint, params.path);
|
||||
const url = format({ pathname, query: params?.query });
|
||||
|
||||
const headers: Record<string, string> = { 'kbn-xsrf': 'foo' };
|
||||
const headers: Record<string, string> = {
|
||||
'kbn-xsrf': 'foo',
|
||||
'x-elastic-internal-origin': 'kibana',
|
||||
};
|
||||
|
||||
if (version) {
|
||||
headers['Elastic-Api-Version'] = version;
|
||||
}
|
||||
|
||||
let res: supertest.Test;
|
||||
let res: unknown;
|
||||
if (type === 'form-data') {
|
||||
const fields: Array<[string, any]> = Object.entries(params.body);
|
||||
const formDataRequest = st[method](url)
|
||||
const formDataRequest = supertestAdmin[method](url)
|
||||
.set(headers)
|
||||
.set('Content-type', 'multipart/form-data');
|
||||
|
||||
|
@ -117,9 +84,9 @@ export function getApiClientFromSupertest<TServerRouteRepository extends ServerR
|
|||
|
||||
res = formDataRequest;
|
||||
} else if (params.body) {
|
||||
res = st[method](url).send(params.body).set(headers);
|
||||
res = supertestAdmin[method](url).send(params.body).set(headers);
|
||||
} else {
|
||||
res = st[method](url).set(headers);
|
||||
res = supertestAdmin[method](url).set(headers);
|
||||
}
|
||||
|
||||
return res as RepositorySupertestReturnOf<
|
Loading…
Add table
Add a link
Reference in a new issue