[8.x] [Streams] Introducing the new Streams plugin (#198713) (#201723)

# Backport

This will backport the following commits from `main` to `8.x`:
- [[Streams] Introducing the new Streams plugin
(#198713)](https://github.com/elastic/kibana/pull/198713)

<!--- Backport version: 7.3.2 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT {commits} BACKPORT-->

Co-authored-by: Chris Cowan <chris@elastic.co>
This commit is contained in:
Dario Gieselaar 2024-11-26 11:12:27 +01:00 committed by GitHub
parent 4a48ec8efa
commit 570afd9967
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
61 changed files with 2418 additions and 0 deletions

View file

@ -909,6 +909,10 @@ routes, etc.
|The stack_connectors plugin provides connector types shipped with Kibana, built on top of the framework provided in the actions plugin.
|{kib-repo}blob/{branch}/x-pack/plugins/streams/README.md[streams]
|This plugin provides an interface to manage streams
|{kib-repo}blob/{branch}/x-pack/plugins/observability_solution/synthetics/README.md[synthetics]
|The purpose of this plugin is to provide users of Heartbeat more visibility of what's happening
in their infrastructure.

View file

@ -934,6 +934,7 @@
"@kbn/status-plugin-a-plugin": "link:test/server_integration/plugins/status_plugin_a",
"@kbn/status-plugin-b-plugin": "link:test/server_integration/plugins/status_plugin_b",
"@kbn/std": "link:packages/kbn-std",
"@kbn/streams-plugin": "link:x-pack/plugins/streams",
"@kbn/synthetics-plugin": "link:x-pack/plugins/observability_solution/synthetics",
"@kbn/synthetics-private-location": "link:x-pack/packages/kbn-synthetics-private-location",
"@kbn/task-manager-fixture-plugin": "link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture",

View file

@ -161,6 +161,7 @@ pageLoadAssetSize:
spaces: 57868
stackAlerts: 58316
stackConnectors: 67227
streams: 16742
synthetics: 55971
telemetry: 51957
telemetryManagementSection: 38586

View file

@ -1844,6 +1844,8 @@
"@kbn/stdio-dev-helpers/*": ["packages/kbn-stdio-dev-helpers/*"],
"@kbn/storybook": ["packages/kbn-storybook"],
"@kbn/storybook/*": ["packages/kbn-storybook/*"],
"@kbn/streams-plugin": ["x-pack/plugins/streams"],
"@kbn/streams-plugin/*": ["x-pack/plugins/streams/*"],
"@kbn/synthetics-e2e": ["x-pack/plugins/observability_solution/synthetics/e2e"],
"@kbn/synthetics-e2e/*": ["x-pack/plugins/observability_solution/synthetics/e2e/*"],
"@kbn/synthetics-plugin": ["x-pack/plugins/observability_solution/synthetics"],

View file

@ -0,0 +1,3 @@
# Streams Plugin
This plugin provides an interface to manage streams

View file

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema, TypeOf } from '@kbn/config-schema';
export const configSchema = schema.object({});
export type StreamsConfig = TypeOf<typeof configSchema>;
/**
* The following map is passed to the server plugin setup under the
* exposeToBrowser: option, and controls which of the above config
* keys are allow-listed to be available in the browser config.
*
* NOTE: anything exposed here will be visible in the UI dev tools,
* and therefore MUST NOT be anything that is sensitive information!
*/
export const exposeToBrowserConfig = {} as const;
type ValidKeys = keyof {
[K in keyof typeof exposeToBrowserConfig as (typeof exposeToBrowserConfig)[K] extends true
? K
: never]: true;
};
export type StreamsPublicConfig = Pick<StreamsConfig, ValidKeys>;

View file

@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const ASSET_VERSION = 1;
export const STREAMS_INDEX = '.kibana_streams';

View file

@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
const stringOrNumberOrBoolean = z.union([z.string(), z.number(), z.boolean()]);
export const filterConditionSchema = z.object({
field: z.string(),
operator: z.enum(['eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'contains', 'startsWith', 'endsWith']),
value: stringOrNumberOrBoolean,
});
export type FilterCondition = z.infer<typeof filterConditionSchema>;
export interface AndCondition {
and: Condition[];
}
export interface RerouteOrCondition {
or: Condition[];
}
export type Condition = FilterCondition | AndCondition | RerouteOrCondition | undefined;
export const conditionSchema: z.ZodType<Condition> = z.lazy(() =>
z.union([
filterConditionSchema,
z.object({ and: z.array(conditionSchema) }),
z.object({ or: z.array(conditionSchema) }),
])
);
export const grokProcessingDefinitionSchema = z.object({
type: z.literal('grok'),
field: z.string(),
patterns: z.array(z.string()),
pattern_definitions: z.optional(z.record(z.string())),
});
export const dissectProcessingDefinitionSchema = z.object({
type: z.literal('dissect'),
field: z.string(),
pattern: z.string(),
});
export const processingDefinitionSchema = z.object({
condition: z.optional(conditionSchema),
config: z.discriminatedUnion('type', [
grokProcessingDefinitionSchema,
dissectProcessingDefinitionSchema,
]),
});
export type ProcessingDefinition = z.infer<typeof processingDefinitionSchema>;
export const fieldDefinitionSchema = z.object({
name: z.string(),
type: z.enum(['keyword', 'match_only_text', 'long', 'double', 'date', 'boolean', 'ip']),
});
export type FieldDefinition = z.infer<typeof fieldDefinitionSchema>;
export const streamWithoutIdDefinitonSchema = z.object({
processing: z.array(processingDefinitionSchema).default([]),
fields: z.array(fieldDefinitionSchema).default([]),
children: z
.array(
z.object({
id: z.string(),
condition: conditionSchema,
})
)
.default([]),
});
export type StreamWithoutIdDefinition = z.infer<typeof streamDefinitonSchema>;
export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({
id: z.string(),
});
export type StreamDefinition = z.infer<typeof streamDefinitonSchema>;
export const streamDefinitonWithoutChildrenSchema = streamDefinitonSchema.omit({ children: true });
export type StreamWithoutChildrenDefinition = z.infer<typeof streamDefinitonWithoutChildrenSchema>;

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
module.exports = {
preset: '@kbn/test',
rootDir: '../../..',
roots: ['<rootDir>/x-pack/plugins/streams'],
coverageDirectory: '<rootDir>/target/kibana-coverage/jest/x-pack/plugins/streams',
coverageReporters: ['text', 'html'],
collectCoverageFrom: ['<rootDir>/x-pack/plugins/streams/{common,public,server}/**/*.{js,ts,tsx}'],
};

View file

@ -0,0 +1,28 @@
{
"type": "plugin",
"id": "@kbn/streams-plugin",
"owner": "@simianhacker @flash1293 @dgieselaar",
"description": "A manager for Streams",
"group": "observability",
"visibility": "private",
"plugin": {
"id": "streams",
"configPath": ["xpack", "streams"],
"browser": true,
"server": true,
"requiredPlugins": [
"data",
"security",
"encryptedSavedObjects",
"usageCollection",
"licensing",
"taskManager"
],
"optionalPlugins": [
"cloud",
"serverless"
],
"requiredBundles": [
]
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { PluginInitializer, PluginInitializerContext } from '@kbn/core/public';
import { Plugin } from './plugin';
export const plugin: PluginInitializer<{}, {}> = (context: PluginInitializerContext) => {
return new Plugin(context);
};

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreSetup, CoreStart, PluginInitializerContext } from '@kbn/core/public';
import { Logger } from '@kbn/logging';
import type { StreamsPublicConfig } from '../common/config';
import { StreamsPluginClass, StreamsPluginSetup, StreamsPluginStart } from './types';
export class Plugin implements StreamsPluginClass {
public config: StreamsPublicConfig;
public logger: Logger;
constructor(context: PluginInitializerContext<{}>) {
this.config = context.config.get();
this.logger = context.logger.get();
}
setup(core: CoreSetup<StreamsPluginStart>, pluginSetup: StreamsPluginSetup) {
return {};
}
start(core: CoreStart) {
return {};
}
stop() {}
}

View file

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Plugin as PluginClass } from '@kbn/core/public';
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface StreamsPluginSetup {}
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface StreamsPluginStart {}
export type StreamsPluginClass = PluginClass<{}, {}, StreamsPluginSetup, StreamsPluginStart>;

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { PluginInitializerContext } from '@kbn/core-plugins-server';
import { StreamsConfig } from '../common/config';
import { StreamsPluginSetup, StreamsPluginStart, config } from './plugin';
import { StreamsRouteRepository } from './routes';
export type { StreamsConfig, StreamsPluginSetup, StreamsPluginStart, StreamsRouteRepository };
export { config };
export const plugin = async (context: PluginInitializerContext<StreamsConfig>) => {
const { StreamsPlugin } = await import('./plugin');
return new StreamsPlugin(context);
};

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
ClusterPutComponentTemplateRequest,
MappingProperty,
} from '@elastic/elasticsearch/lib/api/types';
import { StreamDefinition } from '../../../../common/types';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { isRoot } from '../helpers/hierarchy';
import { getComponentTemplateName } from './name';
export function generateLayer(
id: string,
definition: StreamDefinition
): ClusterPutComponentTemplateRequest {
const properties: Record<string, MappingProperty> = {};
definition.fields.forEach((field) => {
properties[field.name] = {
type: field.type,
};
});
return {
name: getComponentTemplateName(id),
template: {
settings: isRoot(definition.id) ? logsSettings : {},
mappings: {
subobjects: false,
properties,
},
},
version: ASSET_VERSION,
_meta: {
managed: true,
description: `Default settings for the ${id} stream`,
},
};
}

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types';
export const logsSettings: IndicesIndexSettings = {
index: {
lifecycle: {
name: 'logs',
},
codec: 'best_compression',
mapping: {
total_fields: {
ignore_dynamic_beyond_limit: true,
},
ignore_malformed: true,
},
},
};

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { ClusterPutComponentTemplateRequest } from '@elastic/elasticsearch/lib/api/types';
import { retryTransientEsErrors } from '../helpers/retry';
interface DeleteComponentOptions {
esClient: ElasticsearchClient;
name: string;
logger: Logger;
}
interface ComponentManagementOptions {
esClient: ElasticsearchClient;
component: ClusterPutComponentTemplateRequest;
logger: Logger;
}
export async function deleteComponent({ esClient, name, logger }: DeleteComponentOptions) {
try {
await retryTransientEsErrors(
() => esClient.cluster.deleteComponentTemplate({ name }, { ignore: [404] }),
{ logger }
);
} catch (error: any) {
logger.error(`Error deleting component template: ${error.message}`);
throw error;
}
}
export async function upsertComponent({ esClient, component, logger }: ComponentManagementOptions) {
try {
await retryTransientEsErrors(() => esClient.cluster.putComponentTemplate(component), {
logger,
});
logger.debug(() => `Installed component template: ${JSON.stringify(component)}`);
} catch (error: any) {
logger.error(`Error updating component template: ${error.message}`);
throw error;
}
}

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export function getComponentTemplateName(id: string) {
return `${id}@stream.layer`;
}

View file

@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { retryTransientEsErrors } from '../helpers/retry';
interface DataStreamManagementOptions {
esClient: ElasticsearchClient;
name: string;
logger: Logger;
}
interface DeleteDataStreamOptions {
esClient: ElasticsearchClient;
name: string;
logger: Logger;
}
interface RolloverDataStreamOptions {
esClient: ElasticsearchClient;
name: string;
logger: Logger;
}
export async function upsertDataStream({ esClient, name, logger }: DataStreamManagementOptions) {
const dataStreamExists = await esClient.indices.exists({ index: name });
if (dataStreamExists) {
return;
}
try {
await retryTransientEsErrors(() => esClient.indices.createDataStream({ name }), { logger });
logger.debug(() => `Installed data stream: ${name}`);
} catch (error: any) {
logger.error(`Error creating data stream: ${error.message}`);
throw error;
}
}
export async function deleteDataStream({ esClient, name, logger }: DeleteDataStreamOptions) {
try {
await retryTransientEsErrors(
() => esClient.indices.deleteDataStream({ name }, { ignore: [404] }),
{ logger }
);
} catch (error: any) {
logger.error(`Error deleting data stream: ${error.message}`);
throw error;
}
}
export async function rolloverDataStreamIfNecessary({
esClient,
name,
logger,
}: RolloverDataStreamOptions) {
const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` });
for (const dataStream of dataStreams.data_streams) {
const currentMappings =
Object.values(
await esClient.indices.getMapping({
index: dataStream.indices.at(-1)?.index_name,
})
)[0].mappings.properties || {};
const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name });
const simulatedMappings = simulatedIndex.template.mappings.properties || {};
// check whether the same fields and same types are listed (don't check for other mapping attributes)
const isDifferent =
Object.values(simulatedMappings).length !== Object.values(currentMappings).length ||
Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => {
const currentType = currentMappings[fieldName]?.type;
return currentType !== type;
});
if (!isDifferent) {
continue;
}
try {
await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), {
logger,
});
logger.debug(() => `Rolled over data stream: ${dataStream.name}`);
} catch (error: any) {
logger.error(`Error rolling over data stream: ${error.message}`);
throw error;
}
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class ComponentTemplateNotFound extends Error {
constructor(message: string) {
super(message);
this.name = 'ComponentTemplateNotFound';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class DefinitionIdInvalid extends Error {
constructor(message: string) {
super(message);
this.name = 'DefinitionIdInvalid';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class DefinitionNotFound extends Error {
constructor(message: string) {
super(message);
this.name = 'DefinitionNotFound';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class ForkConditionMissing extends Error {
constructor(message: string) {
super(message);
this.name = 'ForkConditionMissing';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class IdConflict extends Error {
constructor(message: string) {
super(message);
this.name = 'IdConflict';
}
}

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export * from './definition_id_invalid';
export * from './definition_not_found';
export * from './id_conflict_error';
export * from './permission_denied';
export * from './security_exception';
export * from './index_template_not_found';
export * from './fork_condition_missing';
export * from './component_template_not_found';

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class IndexTemplateNotFound extends Error {
constructor(message: string) {
super(message);
this.name = 'IndexTemplateNotFound';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class IngestPipelineNotFound extends Error {
constructor(message: string) {
super(message);
this.name = 'IngestPipelineNotFound';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class MalformedChildren extends Error {
constructor(message: string) {
super(message);
this.name = 'MalformedChildren';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class MalformedFields extends Error {
constructor(message: string) {
super(message);
this.name = 'MalformedFields';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class MalformedStreamId extends Error {
constructor(message: string) {
super(message);
this.name = 'MalformedStreamId';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class PermissionDenied extends Error {
constructor(message: string) {
super(message);
this.name = 'PermissionDenied';
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export class SecurityException extends Error {
constructor(message: string) {
super(message);
this.name = 'SecurityException';
}
}

View file

@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { conditionToPainless } from './condition_to_painless';
const operatorConditionAndResutls = [
{
condition: { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
result: 'ctx.log?.logger == "nginx_proxy"',
},
{
condition: { field: 'log.logger', operator: 'neq' as const, value: 'nginx_proxy' },
result: 'ctx.log?.logger != "nginx_proxy"',
},
{
condition: { field: 'http.response.status_code', operator: 'lt' as const, value: 500 },
result: 'ctx.http?.response?.status_code < 500',
},
{
condition: { field: 'http.response.status_code', operator: 'lte' as const, value: 500 },
result: 'ctx.http?.response?.status_code <= 500',
},
{
condition: { field: 'http.response.status_code', operator: 'gt' as const, value: 500 },
result: 'ctx.http?.response?.status_code > 500',
},
{
condition: { field: 'http.response.status_code', operator: 'gte' as const, value: 500 },
result: 'ctx.http?.response?.status_code >= 500',
},
{
condition: { field: 'log.logger', operator: 'startsWith' as const, value: 'nginx' },
result: 'ctx.log?.logger.startsWith("nginx")',
},
{
condition: { field: 'log.logger', operator: 'endsWith' as const, value: 'proxy' },
result: 'ctx.log?.logger.endsWith("proxy")',
},
{
condition: { field: 'log.logger', operator: 'contains' as const, value: 'proxy' },
result: 'ctx.log?.logger.contains("proxy")',
},
];
describe('conditionToPainless', () => {
describe('operators', () => {
operatorConditionAndResutls.forEach((setup) => {
test(`${setup.condition.operator}`, () => {
expect(conditionToPainless(setup.condition)).toEqual(setup.result);
});
});
});
describe('and', () => {
test('simple', () => {
const condition = {
and: [
{ field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
{ field: 'log.level', operator: 'eq' as const, value: 'error' },
],
};
expect(
expect(conditionToPainless(condition)).toEqual(
'ctx.log?.logger == "nginx_proxy" && ctx.log?.level == "error"'
)
);
});
});
describe('or', () => {
test('simple', () => {
const condition = {
or: [
{ field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
{ field: 'log.level', operator: 'eq' as const, value: 'error' },
],
};
expect(
expect(conditionToPainless(condition)).toEqual(
'ctx.log?.logger == "nginx_proxy" || ctx.log?.level == "error"'
)
);
});
});
describe('nested', () => {
test('and with a filter and or with 2 filters', () => {
const condition = {
and: [
{ field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
{
or: [
{ field: 'log.level', operator: 'eq' as const, value: 'error' },
{ field: 'log.level', operator: 'eq' as const, value: 'ERROR' },
],
},
],
};
expect(
expect(conditionToPainless(condition)).toEqual(
'ctx.log?.logger == "nginx_proxy" && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")'
)
);
});
test('and with 2 or with filters', () => {
const condition = {
and: [
{
or: [
{ field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' },
{ field: 'service.name', operator: 'eq' as const, value: 'nginx' },
],
},
{
or: [
{ field: 'log.level', operator: 'eq' as const, value: 'error' },
{ field: 'log.level', operator: 'eq' as const, value: 'ERROR' },
],
},
],
};
expect(
expect(conditionToPainless(condition)).toEqual(
'(ctx.log?.logger == "nginx_proxy" || ctx.service?.name == "nginx") && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")'
)
);
});
});
});

View file

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { isBoolean, isString } from 'lodash';
import {
AndCondition,
Condition,
conditionSchema,
FilterCondition,
filterConditionSchema,
RerouteOrCondition,
} from '../../../../common/types';
function isFilterCondition(subject: any): subject is FilterCondition {
const result = filterConditionSchema.safeParse(subject);
return result.success;
}
function isAndCondition(subject: any): subject is AndCondition {
const result = conditionSchema.safeParse(subject);
return result.success && subject.and != null;
}
function isOrCondition(subject: any): subject is RerouteOrCondition {
const result = conditionSchema.safeParse(subject);
return result.success && subject.or != null;
}
function safePainlessField(condition: FilterCondition) {
return `ctx.${condition.field.split('.').join('?.')}`;
}
function encodeValue(value: string | number | boolean) {
if (isString(value)) {
return `"${value}"`;
}
if (isBoolean(value)) {
return value ? 'true' : 'false';
}
return value;
}
function toPainless(condition: FilterCondition) {
switch (condition.operator) {
case 'neq':
return `${safePainlessField(condition)} != ${encodeValue(condition.value)}`;
case 'lt':
return `${safePainlessField(condition)} < ${encodeValue(condition.value)}`;
case 'lte':
return `${safePainlessField(condition)} <= ${encodeValue(condition.value)}`;
case 'gt':
return `${safePainlessField(condition)} > ${encodeValue(condition.value)}`;
case 'gte':
return `${safePainlessField(condition)} >= ${encodeValue(condition.value)}`;
case 'startsWith':
return `${safePainlessField(condition)}.startsWith(${encodeValue(condition.value)})`;
case 'endsWith':
return `${safePainlessField(condition)}.endsWith(${encodeValue(condition.value)})`;
case 'contains':
return `${safePainlessField(condition)}.contains(${encodeValue(condition.value)})`;
default:
return `${safePainlessField(condition)} == ${encodeValue(condition.value)}`;
}
}
export function conditionToPainless(condition: Condition, nested = false): string {
if (isFilterCondition(condition)) {
return toPainless(condition);
}
if (isAndCondition(condition)) {
const and = condition.and.map((filter) => conditionToPainless(filter, true)).join(' && ');
return nested ? `(${and})` : and;
}
if (isOrCondition(condition)) {
const or = condition.or.map((filter) => conditionToPainless(filter, true)).join(' || ');
return nested ? `(${or})` : or;
}
return 'false';
}

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamDefinition } from '../../../../common/types';
export function isDescendandOf(parent: StreamDefinition, child: StreamDefinition) {
return child.id.startsWith(parent.id);
}
export function isChildOf(parent: StreamDefinition, child: StreamDefinition) {
return (
isDescendandOf(parent, child) && child.id.split('.').length === parent.id.split('.').length + 1
);
}
export function getParentId(id: string) {
const parts = id.split('.');
if (parts.length === 1) {
return undefined;
}
return parts.slice(0, parts.length - 1).join('.');
}
export function isRoot(id: string) {
return id.split('.').length === 1;
}
export function getAncestors(id: string) {
const parts = id.split('.');
return parts.slice(0, parts.length - 1).map((_, index) => parts.slice(0, index + 1).join('.'));
}

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { setTimeout } from 'timers/promises';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { Logger } from '@kbn/logging';
import { SecurityException } from '../errors';
const MAX_ATTEMPTS = 5;
const retryResponseStatuses = [
503, // ServiceUnavailable
408, // RequestTimeout
410, // Gone
];
const isRetryableError = (e: any) =>
e instanceof EsErrors.NoLivingConnectionsError ||
e instanceof EsErrors.ConnectionError ||
e instanceof EsErrors.TimeoutError ||
(e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!));
/**
* Retries any transient network or configuration issues encountered from Elasticsearch with an exponential backoff.
* Should only be used to wrap operations that are idempotent and can be safely executed more than once.
*/
export const retryTransientEsErrors = async <T>(
esCall: () => Promise<T>,
{ logger, attempt = 0 }: { logger?: Logger; attempt?: number } = {}
): Promise<T> => {
try {
return await esCall();
} catch (e) {
if (attempt < MAX_ATTEMPTS && isRetryableError(e)) {
const retryCount = attempt + 1;
const retryDelaySec = Math.min(Math.pow(2, retryCount), 64); // 2s, 4s, 8s, 16s, 32s, 64s, 64s, 64s ...
logger?.warn(
`Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${
e.stack
}`
);
await setTimeout(retryDelaySec * 1000);
return retryTransientEsErrors(esCall, { logger, attempt: retryCount });
}
if (e.meta?.body?.error?.type === 'security_exception') {
throw new SecurityException(e.meta.body.error.reason);
}
throw e;
}
};

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ASSET_VERSION } from '../../../../common/constants';
import { getProcessingPipelineName } from '../ingest_pipelines/name';
import { getIndexTemplateName } from './name';
export function generateIndexTemplate(id: string) {
const composedOf = id.split('.').reduce((acc, _, index, array) => {
const parent = array.slice(0, index + 1).join('.');
return [...acc, `${parent}@stream.layer`];
}, [] as string[]);
return {
name: getIndexTemplateName(id),
index_patterns: [id],
composed_of: composedOf,
priority: 200,
version: ASSET_VERSION,
_meta: {
managed: true,
description: `The index template for ${id} stream`,
},
data_stream: {
hidden: false,
},
template: {
settings: {
index: {
default_pipeline: getProcessingPipelineName(id),
},
},
},
allow_auto_create: true,
// ignore missing component templates to be more robust against out-of-order syncs
ignore_missing_component_templates: composedOf,
};
}

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IndicesPutIndexTemplateRequest } from '@elastic/elasticsearch/lib/api/types';
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { retryTransientEsErrors } from '../helpers/retry';
interface TemplateManagementOptions {
esClient: ElasticsearchClient;
template: IndicesPutIndexTemplateRequest;
logger: Logger;
}
interface DeleteTemplateOptions {
esClient: ElasticsearchClient;
name: string;
logger: Logger;
}
export async function upsertTemplate({ esClient, template, logger }: TemplateManagementOptions) {
try {
await retryTransientEsErrors(() => esClient.indices.putIndexTemplate(template), { logger });
logger.debug(() => `Installed index template: ${JSON.stringify(template)}`);
} catch (error: any) {
logger.error(`Error updating index template: ${error.message}`);
throw error;
}
}
export async function deleteTemplate({ esClient, name, logger }: DeleteTemplateOptions) {
try {
await retryTransientEsErrors(
() => esClient.indices.deleteIndexTemplate({ name }, { ignore: [404] }),
{ logger }
);
} catch (error: any) {
logger.error(`Error deleting index template: ${error.message}`);
throw error;
}
}

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export function getIndexTemplateName(id: string) {
return `${id}@stream`;
}

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamDefinition } from '../../../../common/types';
import { ASSET_VERSION } from '../../../../common/constants';
import { conditionToPainless } from '../helpers/condition_to_painless';
import { logsDefaultPipelineProcessors } from './logs_default_pipeline';
import { isRoot } from '../helpers/hierarchy';
import { getProcessingPipelineName } from './name';
export function generateIngestPipeline(id: string, definition: StreamDefinition) {
return {
id: getProcessingPipelineName(id),
processors: [
...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []),
...definition.processing.map((processor) => {
const { type, ...config } = processor.config;
return {
[type]: {
...config,
if: processor.condition ? conditionToPainless(processor.condition) : undefined,
},
};
}),
{
pipeline: {
name: `${id}@stream.reroutes`,
ignore_missing_pipeline: true,
},
},
],
_meta: {
description: `Default pipeline for the ${id} stream`,
managed: true,
},
version: ASSET_VERSION,
};
}

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamDefinition } from '../../../../common/types';
import { ASSET_VERSION } from '../../../../common/constants';
import { conditionToPainless } from '../helpers/condition_to_painless';
import { getReroutePipelineName } from './name';
interface GenerateReroutePipelineParams {
definition: StreamDefinition;
}
export async function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) {
return {
id: getReroutePipelineName(definition.id),
processors: definition.children.map((child) => {
return {
reroute: {
destination: child.id,
if: conditionToPainless(child.condition),
},
};
}),
_meta: {
description: `Reoute pipeline for the ${definition.id} stream`,
managed: true,
},
version: ASSET_VERSION,
};
}

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const logsDefaultPipelineProcessors = [
{
set: {
description: "If '@timestamp' is missing, set it with the ingest timestamp",
field: '@timestamp',
override: false,
copy_from: '_ingest.timestamp',
},
},
{
pipeline: {
name: 'logs@json-pipeline',
ignore_missing_pipeline: true,
},
},
];

View file

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types';
import { retryTransientEsErrors } from '../helpers/retry';
interface DeletePipelineOptions {
esClient: ElasticsearchClient;
id: string;
logger: Logger;
}
interface PipelineManagementOptions {
esClient: ElasticsearchClient;
pipeline: IngestPutPipelineRequest;
logger: Logger;
}
export async function deleteIngestPipeline({ esClient, id, logger }: DeletePipelineOptions) {
try {
await retryTransientEsErrors(() => esClient.ingest.deletePipeline({ id }, { ignore: [404] }), {
logger,
});
} catch (error: any) {
logger.error(`Error deleting ingest pipeline: ${error.message}`);
throw error;
}
}
export async function upsertIngestPipeline({
esClient,
pipeline,
logger,
}: PipelineManagementOptions) {
try {
await retryTransientEsErrors(() => esClient.ingest.putPipeline(pipeline), { logger });
logger.debug(() => `Installed index template: ${JSON.stringify(pipeline)}`);
} catch (error: any) {
logger.error(`Error updating index template: ${error.message}`);
throw error;
}
}

View file

@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export function getProcessingPipelineName(id: string) {
return `${id}@stream.processing`;
}
export function getReroutePipelineName(id: string) {
return `${id}@stream.reroutes`;
}

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { STREAMS_INDEX } from '../../../common/constants';
export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) {
return scopedClusterClient.asInternalUser.indices.create({
index: STREAMS_INDEX,
mappings: {
dynamic: 'strict',
properties: {
processing: {
type: 'object',
enabled: false,
},
fields: {
type: 'object',
enabled: false,
},
children: {
type: 'object',
enabled: false,
},
id: {
type: 'keyword',
},
},
},
});
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { StreamDefinition } from '../../../common/types';
export const rootStreamDefinition: StreamDefinition = {
id: 'logs',
processing: [],
children: [],
fields: [
{
name: '@timestamp',
type: 'date',
},
{
name: 'message',
type: 'match_only_text',
},
{
name: 'host.name',
type: 'keyword',
},
{
name: 'log.level',
type: 'keyword',
},
],
};

View file

@ -0,0 +1,286 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { FieldDefinition, StreamDefinition } from '../../../common/types';
import { STREAMS_INDEX } from '../../../common/constants';
import { DefinitionNotFound } from './errors';
import { deleteTemplate, upsertTemplate } from './index_templates/manage_index_templates';
import { generateLayer } from './component_templates/generate_layer';
import { generateIngestPipeline } from './ingest_pipelines/generate_ingest_pipeline';
import { generateReroutePipeline } from './ingest_pipelines/generate_reroute_pipeline';
import { generateIndexTemplate } from './index_templates/generate_index_template';
import { deleteComponent, upsertComponent } from './component_templates/manage_component_templates';
import { getIndexTemplateName } from './index_templates/name';
import { getComponentTemplateName } from './component_templates/name';
import { getProcessingPipelineName, getReroutePipelineName } from './ingest_pipelines/name';
import {
deleteIngestPipeline,
upsertIngestPipeline,
} from './ingest_pipelines/manage_ingest_pipelines';
import { getAncestors } from './helpers/hierarchy';
import { MalformedFields } from './errors/malformed_fields';
import {
deleteDataStream,
rolloverDataStreamIfNecessary,
upsertDataStream,
} from './data_streams/manage_data_streams';
interface BaseParams {
scopedClusterClient: IScopedClusterClient;
}
interface BaseParamsWithDefinition extends BaseParams {
definition: StreamDefinition;
}
interface DeleteStreamParams extends BaseParams {
id: string;
logger: Logger;
}
export async function deleteStreamObjects({ id, scopedClusterClient, logger }: DeleteStreamParams) {
await deleteDataStream({
esClient: scopedClusterClient.asCurrentUser,
name: id,
logger,
});
await deleteTemplate({
esClient: scopedClusterClient.asCurrentUser,
name: getIndexTemplateName(id),
logger,
});
await deleteComponent({
esClient: scopedClusterClient.asCurrentUser,
name: getComponentTemplateName(id),
logger,
});
await deleteIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
id: getProcessingPipelineName(id),
logger,
});
await deleteIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
id: getReroutePipelineName(id),
logger,
});
await scopedClusterClient.asInternalUser.delete({
id,
index: STREAMS_INDEX,
refresh: 'wait_for',
});
}
async function upsertInternalStream({ definition, scopedClusterClient }: BaseParamsWithDefinition) {
return scopedClusterClient.asInternalUser.index({
id: definition.id,
index: STREAMS_INDEX,
document: definition,
refresh: 'wait_for',
});
}
type ListStreamsParams = BaseParams;
export async function listStreams({ scopedClusterClient }: ListStreamsParams) {
const response = await scopedClusterClient.asInternalUser.search<StreamDefinition>({
index: STREAMS_INDEX,
size: 10000,
fields: ['id'],
_source: false,
sort: [{ id: 'asc' }],
});
const definitions = response.hits.hits.map((hit) => hit.fields as { id: string[] });
return definitions;
}
interface ReadStreamParams extends BaseParams {
id: string;
}
export async function readStream({ id, scopedClusterClient }: ReadStreamParams) {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
index: STREAMS_INDEX,
});
const definition = response._source as StreamDefinition;
return {
definition,
};
} catch (e) {
if (e.meta?.statusCode === 404) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
throw e;
}
}
interface ReadAncestorsParams extends BaseParams {
id: string;
}
export async function readAncestors({ id, scopedClusterClient }: ReadAncestorsParams) {
const ancestorIds = getAncestors(id);
return await Promise.all(
ancestorIds.map((ancestorId) => readStream({ scopedClusterClient, id: ancestorId }))
);
}
interface ReadDescendantsParams extends BaseParams {
id: string;
}
export async function readDescendants({ id, scopedClusterClient }: ReadDescendantsParams) {
const response = await scopedClusterClient.asInternalUser.search<StreamDefinition>({
index: STREAMS_INDEX,
size: 10000,
body: {
query: {
bool: {
filter: {
prefix: {
id,
},
},
must_not: {
term: {
id,
},
},
},
},
},
});
return response.hits.hits.map((hit) => hit._source as StreamDefinition);
}
export async function validateAncestorFields(
scopedClusterClient: IScopedClusterClient,
id: string,
fields: FieldDefinition[]
) {
const ancestors = await readAncestors({
id,
scopedClusterClient,
});
for (const ancestor of ancestors) {
for (const field of fields) {
if (
ancestor.definition.fields.some(
(ancestorField) => ancestorField.type !== field.type && ancestorField.name === field.name
)
) {
throw new MalformedFields(
`Field ${field.name} is already defined with incompatible type in the parent stream ${ancestor.definition.id}`
);
}
}
}
}
export async function validateDescendantFields(
scopedClusterClient: IScopedClusterClient,
id: string,
fields: FieldDefinition[]
) {
const descendants = await readDescendants({
id,
scopedClusterClient,
});
for (const descendant of descendants) {
for (const field of fields) {
if (
descendant.fields.some(
(descendantField) =>
descendantField.type !== field.type && descendantField.name === field.name
)
) {
throw new MalformedFields(
`Field ${field.name} is already defined with incompatible type in the child stream ${descendant.id}`
);
}
}
}
}
export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamParams) {
try {
await readStream({ id, scopedClusterClient });
return true;
} catch (e) {
if (e instanceof DefinitionNotFound) {
return false;
}
throw e;
}
}
interface SyncStreamParams {
scopedClusterClient: IScopedClusterClient;
definition: StreamDefinition;
rootDefinition?: StreamDefinition;
logger: Logger;
}
export async function syncStream({
scopedClusterClient,
definition,
rootDefinition,
logger,
}: SyncStreamParams) {
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,
logger,
component: generateLayer(definition.id, definition),
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
logger,
pipeline: generateIngestPipeline(definition.id, definition),
});
const reroutePipeline = await generateReroutePipeline({
definition,
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
logger,
pipeline: reroutePipeline,
});
await upsertTemplate({
esClient: scopedClusterClient.asCurrentUser,
logger,
template: generateIndexTemplate(definition.id),
});
if (rootDefinition) {
const parentReroutePipeline = await generateReroutePipeline({
definition: rootDefinition,
});
await upsertIngestPipeline({
esClient: scopedClusterClient.asCurrentUser,
logger,
pipeline: parentReroutePipeline,
});
}
await upsertDataStream({
esClient: scopedClusterClient.asCurrentUser,
logger,
name: definition.id,
});
await upsertInternalStream({
scopedClusterClient,
definition,
});
await rolloverDataStreamIfNecessary({
esClient: scopedClusterClient.asCurrentUser,
name: definition.id,
logger,
});
}

View file

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
CoreSetup,
CoreStart,
KibanaRequest,
Logger,
Plugin,
PluginConfigDescriptor,
PluginInitializerContext,
} from '@kbn/core/server';
import { registerRoutes } from '@kbn/server-route-repository';
import { StreamsConfig, configSchema, exposeToBrowserConfig } from '../common/config';
import { StreamsRouteRepository } from './routes';
import { RouteDependencies } from './routes/types';
import {
StreamsPluginSetupDependencies,
StreamsPluginStartDependencies,
StreamsServer,
} from './types';
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface StreamsPluginSetup {}
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface StreamsPluginStart {}
export const config: PluginConfigDescriptor<StreamsConfig> = {
schema: configSchema,
exposeToBrowser: exposeToBrowserConfig,
};
export class StreamsPlugin
implements
Plugin<
StreamsPluginSetup,
StreamsPluginStart,
StreamsPluginSetupDependencies,
StreamsPluginStartDependencies
>
{
public config: StreamsConfig;
public logger: Logger;
public server?: StreamsServer;
constructor(context: PluginInitializerContext<StreamsConfig>) {
this.config = context.config.get();
this.logger = context.logger.get();
}
public setup(core: CoreSetup, plugins: StreamsPluginSetupDependencies): StreamsPluginSetup {
this.server = {
config: this.config,
logger: this.logger,
} as StreamsServer;
registerRoutes<RouteDependencies>({
repository: StreamsRouteRepository,
dependencies: {
server: this.server,
getScopedClients: async ({ request }: { request: KibanaRequest }) => {
const [coreStart] = await core.getStartServices();
const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request);
const soClient = coreStart.savedObjects.getScopedClient(request);
return { scopedClusterClient, soClient };
},
},
core,
logger: this.logger,
});
return {};
}
public start(core: CoreStart, plugins: StreamsPluginStartDependencies): StreamsPluginStart {
if (this.server) {
this.server.core = core;
this.server.isServerless = core.elasticsearch.getCapabilities().serverless;
this.server.security = plugins.security;
this.server.encryptedSavedObjects = plugins.encryptedSavedObjects;
this.server.taskManager = plugins.taskManager;
}
return {};
}
public stop() {}
}

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createServerRouteFactory } from '@kbn/server-route-repository';
import { StreamsRouteHandlerResources } from './types';
export const createServerRoute = createServerRouteFactory<StreamsRouteHandlerResources>();

View file

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { deleteStreamRoute } from './streams/delete';
import { editStreamRoute } from './streams/edit';
import { enableStreamsRoute } from './streams/enable';
import { forkStreamsRoute } from './streams/fork';
import { listStreamsRoute } from './streams/list';
import { readStreamRoute } from './streams/read';
import { resyncStreamsRoute } from './streams/resync';
export const StreamsRouteRepository = {
...enableStreamsRoute,
...resyncStreamsRoute,
...forkStreamsRoute,
...readStreamRoute,
...editStreamRoute,
...deleteStreamRoute,
...listStreamsRoute,
};
export type StreamsRouteRepository = typeof StreamsRouteRepository;

View file

@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { syncStream, readStream, deleteStreamObjects } from '../../lib/streams/stream_crud';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { getParentId } from '../../lib/streams/helpers/hierarchy';
export const deleteStreamRoute = createServerRoute({
endpoint: 'DELETE /api/streams/{id} 2023-10-31',
options: {
access: 'public',
availability: {
stability: 'experimental',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
}),
handler: async ({ response, params, logger, request, getScopedClients }) => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const parentId = getParentId(params.path.id);
if (!parentId) {
throw new MalformedStreamId('Cannot delete root stream');
}
await updateParentStream(scopedClusterClient, params.path.id, parentId, logger);
await deleteStream(scopedClusterClient, params.path.id, logger);
return response.ok({ body: { acknowledged: true } });
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
return response.notFound({ body: e });
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
) {
return response.customError({ body: e, statusCode: 400 });
}
return response.customError({ body: e, statusCode: 500 });
}
},
});
async function deleteStream(scopedClusterClient: IScopedClusterClient, id: string, logger: Logger) {
try {
const { definition } = await readStream({ scopedClusterClient, id });
for (const child of definition.children) {
await deleteStream(scopedClusterClient, child.id, logger);
}
await deleteStreamObjects({ scopedClusterClient, id, logger });
} catch (e) {
if (e instanceof DefinitionNotFound) {
logger.debug(`Stream definition for ${id} not found.`);
} else {
throw e;
}
}
}
async function updateParentStream(
scopedClusterClient: IScopedClusterClient,
id: string,
parentId: string,
logger: Logger
) {
const { definition: parentDefinition } = await readStream({
scopedClusterClient,
id: parentId,
});
parentDefinition.children = parentDefinition.children.filter((child) => child.id !== id);
await syncStream({
scopedClusterClient,
definition: parentDefinition,
logger,
});
return parentDefinition;
}

View file

@ -0,0 +1,171 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { StreamDefinition, streamWithoutIdDefinitonSchema } from '../../../common/types';
import {
syncStream,
readStream,
checkStreamExists,
validateAncestorFields,
validateDescendantFields,
} from '../../lib/streams/stream_crud';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { getParentId } from '../../lib/streams/helpers/hierarchy';
import { MalformedChildren } from '../../lib/streams/errors/malformed_children';
export const editStreamRoute = createServerRoute({
endpoint: 'PUT /api/streams/{id} 2023-10-31',
options: {
access: 'public',
availability: {
stability: 'experimental',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
body: streamWithoutIdDefinitonSchema,
}),
handler: async ({ response, params, logger, request, getScopedClients }) => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
await validateStreamChildren(scopedClusterClient, params.path.id, params.body.children);
await validateAncestorFields(scopedClusterClient, params.path.id, params.body.fields);
await validateDescendantFields(scopedClusterClient, params.path.id, params.body.fields);
const parentId = getParentId(params.path.id);
let parentDefinition: StreamDefinition | undefined;
if (parentId) {
parentDefinition = await updateParentStream(
scopedClusterClient,
parentId,
params.path.id,
logger
);
}
const streamDefinition = { ...params.body };
await syncStream({
scopedClusterClient,
definition: { ...streamDefinition, id: params.path.id },
rootDefinition: parentDefinition,
logger,
});
for (const child of streamDefinition.children) {
const streamExists = await checkStreamExists({
scopedClusterClient,
id: child.id,
});
if (streamExists) {
continue;
}
// create empty streams for each child if they don't exist
const childDefinition = {
id: child.id,
children: [],
fields: [],
processing: [],
};
await syncStream({
scopedClusterClient,
definition: childDefinition,
logger,
});
}
return response.ok({ body: { acknowledged: true } });
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
return response.notFound({ body: e });
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
) {
return response.customError({ body: e, statusCode: 400 });
}
return response.customError({ body: e, statusCode: 500 });
}
},
});
async function updateParentStream(
scopedClusterClient: IScopedClusterClient,
parentId: string,
id: string,
logger: Logger
) {
const { definition: parentDefinition } = await readStream({
scopedClusterClient,
id: parentId,
});
if (!parentDefinition.children.some((child) => child.id === id)) {
// add the child to the parent stream with an empty condition for now
parentDefinition.children.push({
id,
condition: undefined,
});
await syncStream({
scopedClusterClient,
definition: parentDefinition,
logger,
});
}
return parentDefinition;
}
async function validateStreamChildren(
scopedClusterClient: IScopedClusterClient,
id: string,
children: Array<{ id: string }>
) {
try {
const { definition: oldDefinition } = await readStream({
scopedClusterClient,
id,
});
const oldChildren = oldDefinition.children.map((child) => child.id);
const newChildren = new Set(children.map((child) => child.id));
if (oldChildren.some((child) => !newChildren.has(child))) {
throw new MalformedChildren(
'Cannot remove children from a stream, please delete the stream instead'
);
}
} catch (e) {
// Ignore if the stream does not exist, but re-throw if it's another error
if (!(e instanceof DefinitionNotFound)) {
throw e;
}
}
}

View file

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { SecurityException } from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { syncStream } from '../../lib/streams/stream_crud';
import { rootStreamDefinition } from '../../lib/streams/root_stream_definition';
import { createStreamsIndex } from '../../lib/streams/internal_stream_mapping';
export const enableStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_enable 2023-10-31',
params: z.object({}),
options: {
access: 'public',
availability: {
stability: 'experimental',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
},
handler: async ({ request, response, logger, getScopedClients }) => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
await createStreamsIndex(scopedClusterClient);
await syncStream({
scopedClusterClient,
definition: rootStreamDefinition,
logger,
});
return response.ok({ body: { acknowledged: true } });
} catch (e) {
if (e instanceof SecurityException) {
return response.customError({ body: e, statusCode: 400 });
}
return response.customError({ body: e, statusCode: 500 });
}
},
});

View file

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import {
DefinitionNotFound,
ForkConditionMissing,
IndexTemplateNotFound,
SecurityException,
} from '../../lib/streams/errors';
import { createServerRoute } from '../create_server_route';
import { conditionSchema, streamDefinitonWithoutChildrenSchema } from '../../../common/types';
import { syncStream, readStream, validateAncestorFields } from '../../lib/streams/stream_crud';
import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id';
import { isChildOf } from '../../lib/streams/helpers/hierarchy';
export const forkStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/{id}/_fork 2023-10-31',
options: {
access: 'public',
availability: {
stability: 'experimental',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
},
params: z.object({
path: z.object({
id: z.string(),
}),
body: z.object({ stream: streamDefinitonWithoutChildrenSchema, condition: conditionSchema }),
}),
handler: async ({ response, params, logger, request, getScopedClients }) => {
try {
if (!params.body.condition) {
throw new ForkConditionMissing('You must provide a condition to fork a stream');
}
const { scopedClusterClient } = await getScopedClients({ request });
const { definition: rootDefinition } = await readStream({
scopedClusterClient,
id: params.path.id,
});
const childDefinition = { ...params.body.stream, children: [] };
// check whether root stream has a child of the given name already
if (rootDefinition.children.some((child) => child.id === childDefinition.id)) {
throw new MalformedStreamId(
`The stream with ID (${params.body.stream.id}) already exists as a child of the parent stream`
);
}
if (!isChildOf(rootDefinition, childDefinition)) {
throw new MalformedStreamId(
`The ID (${params.body.stream.id}) from the new stream must start with the parent's id (${rootDefinition.id}), followed by a dot and a name`
);
}
await validateAncestorFields(
scopedClusterClient,
params.body.stream.id,
params.body.stream.fields
);
rootDefinition.children.push({
id: params.body.stream.id,
condition: params.body.condition,
});
await syncStream({
scopedClusterClient,
definition: rootDefinition,
rootDefinition,
logger,
});
await syncStream({
scopedClusterClient,
definition: childDefinition,
rootDefinition,
logger,
});
return response.ok({ body: { acknowledged: true } });
} catch (e) {
if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) {
return response.notFound({ body: e });
}
if (
e instanceof SecurityException ||
e instanceof ForkConditionMissing ||
e instanceof MalformedStreamId
) {
return response.customError({ body: e, statusCode: 400 });
}
return response.customError({ body: e, statusCode: 500 });
}
},
});

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
import { listStreams } from '../../lib/streams/stream_crud';
export const listStreamsRoute = createServerRoute({
endpoint: 'GET /api/streams 2023-10-31',
options: {
access: 'public',
availability: {
stability: 'experimental',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
},
params: z.object({}),
handler: async ({ response, request, getScopedClients }) => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const definitions = await listStreams({ scopedClusterClient });
const trees = asTrees(definitions);
return response.ok({ body: { streams: trees } });
} catch (e) {
if (e instanceof DefinitionNotFound) {
return response.notFound({ body: e });
}
return response.customError({ body: e, statusCode: 500 });
}
},
});
interface ListStreamDefinition {
id: string;
children: ListStreamDefinition[];
}
function asTrees(definitions: Array<{ id: string[] }>) {
const trees: ListStreamDefinition[] = [];
definitions.forEach((definition) => {
const path = definition.id[0].split('.');
let currentTree = trees;
path.forEach((_id, index) => {
const partialPath = path.slice(0, index + 1).join('.');
const existingNode = currentTree.find((node) => node.id === partialPath);
if (existingNode) {
currentTree = existingNode.children;
} else {
const newNode = { id: partialPath, children: [] };
currentTree.push(newNode);
currentTree = newNode.children;
}
});
});
return trees;
}

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { createServerRoute } from '../create_server_route';
import { DefinitionNotFound } from '../../lib/streams/errors';
import { readAncestors, readStream } from '../../lib/streams/stream_crud';
export const readStreamRoute = createServerRoute({
endpoint: 'GET /api/streams/{id} 2023-10-31',
options: {
access: 'public',
availability: {
stability: 'experimental',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
},
params: z.object({
path: z.object({ id: z.string() }),
}),
handler: async ({ response, params, request, getScopedClients }) => {
try {
const { scopedClusterClient } = await getScopedClients({ request });
const streamEntity = await readStream({
scopedClusterClient,
id: params.path.id,
});
const ancestors = await readAncestors({
id: streamEntity.definition.id,
scopedClusterClient,
});
const body = {
...streamEntity.definition,
inheritedFields: ancestors.flatMap(({ definition: { id, fields } }) =>
fields.map((field) => ({ ...field, from: id }))
),
};
return response.ok({ body });
} catch (e) {
if (e instanceof DefinitionNotFound) {
return response.notFound({ body: e });
}
return response.customError({ body: e, statusCode: 500 });
}
},
});

View file

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { createServerRoute } from '../create_server_route';
import { syncStream, readStream, listStreams } from '../../lib/streams/stream_crud';
export const resyncStreamsRoute = createServerRoute({
endpoint: 'POST /api/streams/_resync 2023-10-31',
options: {
access: 'public',
availability: {
stability: 'experimental',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
},
params: z.object({}),
handler: async ({ response, logger, request, getScopedClients }) => {
const { scopedClusterClient } = await getScopedClients({ request });
const streams = await listStreams({ scopedClusterClient });
for (const stream of streams) {
const { definition } = await readStream({
scopedClusterClient,
id: stream.id[0],
});
await syncStream({
scopedClusterClient,
definition,
logger,
});
}
return response.ok({});
},
});

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { KibanaRequest } from '@kbn/core-http-server';
import { DefaultRouteHandlerResources } from '@kbn/server-route-repository';
import { IScopedClusterClient } from '@kbn/core-elasticsearch-server';
import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import { StreamsServer } from '../types';
export interface RouteDependencies {
server: StreamsServer;
getScopedClients: ({ request }: { request: KibanaRequest }) => Promise<{
scopedClusterClient: IScopedClusterClient;
soClient: SavedObjectsClientContract;
}>;
}
export type StreamsRouteHandlerResources = RouteDependencies & DefaultRouteHandlerResources;

View file

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreStart, ElasticsearchClient, Logger } from '@kbn/core/server';
import { SecurityPluginStart } from '@kbn/security-plugin/server';
import {
EncryptedSavedObjectsPluginSetup,
EncryptedSavedObjectsPluginStart,
} from '@kbn/encrypted-saved-objects-plugin/server';
import { LicensingPluginStart } from '@kbn/licensing-plugin/server';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import { StreamsConfig } from '../common/config';
export interface StreamsServer {
core: CoreStart;
config: StreamsConfig;
logger: Logger;
security: SecurityPluginStart;
encryptedSavedObjects: EncryptedSavedObjectsPluginStart;
isServerless: boolean;
taskManager: TaskManagerStartContract;
}
export interface ElasticsearchAccessorOptions {
elasticsearchClient: ElasticsearchClient;
}
export interface StreamsPluginSetupDependencies {
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup;
taskManager: TaskManagerSetupContract;
}
export interface StreamsPluginStartDependencies {
security: SecurityPluginStart;
encryptedSavedObjects: EncryptedSavedObjectsPluginStart;
licensing: LicensingPluginStart;
taskManager: TaskManagerStartContract;
}

View file

@ -0,0 +1,31 @@
{
"extends": "../../../tsconfig.base.json",
"compilerOptions": {
"outDir": "target/types"
},
"include": [
"../../../typings/**/*",
"common/**/*",
"server/**/*",
"public/**/*",
"types/**/*"
],
"exclude": [
"target/**/*"
],
"kbn_references": [
"@kbn/config-schema",
"@kbn/core",
"@kbn/logging",
"@kbn/core-plugins-server",
"@kbn/core-http-server",
"@kbn/security-plugin",
"@kbn/core-saved-objects-api-server",
"@kbn/core-elasticsearch-server",
"@kbn/task-manager-plugin",
"@kbn/server-route-repository",
"@kbn/zod",
"@kbn/encrypted-saved-objects-plugin",
"@kbn/licensing-plugin",
]
}

View file

@ -7464,6 +7464,10 @@
version "0.0.0"
uid ""
"@kbn/streams-plugin@link:x-pack/plugins/streams":
version "0.0.0"
uid ""
"@kbn/synthetics-e2e@link:x-pack/plugins/observability_solution/synthetics/e2e":
version "0.0.0"
uid ""