mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 09:48:58 -04:00
[Fleet] Support local routing rules (#161573)
This commit is contained in:
parent
8b72a32c13
commit
c8bec1d07d
17 changed files with 396 additions and 37 deletions
|
@ -335,6 +335,7 @@ export enum RegistryDataStreamKeys {
|
|||
ingest_pipeline = 'ingest_pipeline',
|
||||
elasticsearch = 'elasticsearch',
|
||||
dataset_is_prefix = 'dataset_is_prefix',
|
||||
routing_rules = 'routing_rules',
|
||||
}
|
||||
|
||||
export interface RegistryDataStream {
|
||||
|
@ -351,6 +352,7 @@ export interface RegistryDataStream {
|
|||
[RegistryDataStreamKeys.ingest_pipeline]?: string;
|
||||
[RegistryDataStreamKeys.elasticsearch]?: RegistryElasticsearch;
|
||||
[RegistryDataStreamKeys.dataset_is_prefix]?: boolean;
|
||||
[RegistryDataStreamKeys.routing_rules]?: RegistryDataStreamRoutingRules[];
|
||||
}
|
||||
|
||||
export interface RegistryElasticsearch {
|
||||
|
@ -374,6 +376,15 @@ export interface RegistryDataStreamPrivileges {
|
|||
indices?: string[];
|
||||
}
|
||||
|
||||
export interface RegistryDataStreamRoutingRules {
|
||||
source_dataset: string;
|
||||
rules: Array<{
|
||||
target_dataset: string;
|
||||
if: string;
|
||||
namespace: string;
|
||||
}>;
|
||||
}
|
||||
|
||||
export type RegistryVarType =
|
||||
| 'integer'
|
||||
| 'bool'
|
||||
|
|
|
@ -459,7 +459,7 @@ describe('parseAndVerifyDataStreams', () => {
|
|||
paths: ['input-only-0.1.0/data_stream/stream1/README.md'],
|
||||
pkgName: 'input-only',
|
||||
pkgVersion: '0.1.0',
|
||||
manifests: {},
|
||||
manifestsAndRoutingRules: {},
|
||||
})
|
||||
).toThrowError("No manifest.yml file found for data stream 'stream1'");
|
||||
});
|
||||
|
@ -470,7 +470,7 @@ describe('parseAndVerifyDataStreams', () => {
|
|||
paths: ['input-only-0.1.0/data_stream/stream1/manifest.yml'],
|
||||
pkgName: 'input-only',
|
||||
pkgVersion: '0.1.0',
|
||||
manifests: {
|
||||
manifestsAndRoutingRules: {
|
||||
'input-only-0.1.0/data_stream/stream1/manifest.yml': Buffer.alloc(1),
|
||||
},
|
||||
})
|
||||
|
@ -483,7 +483,7 @@ describe('parseAndVerifyDataStreams', () => {
|
|||
paths: ['input-only-0.1.0/data_stream/stream1/manifest.yml'],
|
||||
pkgName: 'input-only',
|
||||
pkgVersion: '0.1.0',
|
||||
manifests: {
|
||||
manifestsAndRoutingRules: {
|
||||
'input-only-0.1.0/data_stream/stream1/manifest.yml': Buffer.from(
|
||||
`
|
||||
title: Custom Logs`,
|
||||
|
@ -502,7 +502,7 @@ describe('parseAndVerifyDataStreams', () => {
|
|||
paths: ['input-only-0.1.0/data_stream/stream1/manifest.yml'],
|
||||
pkgName: 'input-only',
|
||||
pkgVersion: '0.1.0',
|
||||
manifests: {
|
||||
manifestsAndRoutingRules: {
|
||||
'input-only-0.1.0/data_stream/stream1/manifest.yml': Buffer.from(
|
||||
`
|
||||
title: Custom Logs
|
||||
|
@ -532,7 +532,7 @@ describe('parseAndVerifyDataStreams', () => {
|
|||
paths: ['input-only-0.1.0/data_stream/stream1/manifest.yml'],
|
||||
pkgName: 'input-only',
|
||||
pkgVersion: '0.1.0',
|
||||
manifests: {
|
||||
manifestsAndRoutingRules: {
|
||||
'input-only-0.1.0/data_stream/stream1/manifest.yml': Buffer.from(
|
||||
`
|
||||
title: Custom Logs
|
||||
|
@ -558,6 +558,58 @@ describe('parseAndVerifyDataStreams', () => {
|
|||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it('should parse routing rules', async () => {
|
||||
expect(
|
||||
parseAndVerifyDataStreams({
|
||||
paths: ['input-only-0.1.0/data_stream/stream1/manifest.yml'],
|
||||
pkgName: 'input-only',
|
||||
pkgVersion: '0.1.0',
|
||||
manifestsAndRoutingRules: {
|
||||
'input-only-0.1.0/data_stream/stream1/manifest.yml': Buffer.from(
|
||||
`
|
||||
title: Custom Logs
|
||||
type: logs
|
||||
dataset: ds
|
||||
version: 0.1.0`,
|
||||
'utf8'
|
||||
),
|
||||
'input-only-0.1.0/data_stream/stream1/routing_rules.yml': Buffer.from(
|
||||
`
|
||||
- source_dataset: ds
|
||||
rules:
|
||||
- target_dataset: ds.test
|
||||
if: true == true
|
||||
namespace: "default"
|
||||
`,
|
||||
'utf8'
|
||||
),
|
||||
},
|
||||
})
|
||||
).toEqual([
|
||||
{
|
||||
dataset: 'ds',
|
||||
package: 'input-only',
|
||||
path: 'stream1',
|
||||
release: 'ga',
|
||||
title: 'Custom Logs',
|
||||
type: 'logs',
|
||||
elasticsearch: {},
|
||||
routing_rules: [
|
||||
{
|
||||
source_dataset: 'ds',
|
||||
rules: [
|
||||
{
|
||||
target_dataset: 'ds.test',
|
||||
if: 'true == true',
|
||||
namespace: 'default',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('parseAndVerifyStreams', () => {
|
||||
|
|
|
@ -39,6 +39,8 @@ import { unpackBufferEntries } from '.';
|
|||
|
||||
const readFileAsync = promisify(readFile);
|
||||
export const MANIFEST_NAME = 'manifest.yml';
|
||||
export const DATASTREAM_MANIFEST_NAME = 'manifest.yml';
|
||||
export const DATASTREAM_ROUTING_RULES_NAME = 'routing_rules.yml';
|
||||
|
||||
const DEFAULT_RELEASE_VALUE = 'ga';
|
||||
|
||||
|
@ -79,7 +81,7 @@ export const expandDottedEntries = (obj: object) => {
|
|||
}, {} as Record<string, any>);
|
||||
};
|
||||
|
||||
type ManifestMap = Record<string, Buffer>;
|
||||
type AssetsBufferMap = Record<string, Buffer>;
|
||||
|
||||
// not sure these are 100% correct but they do the job here
|
||||
// keeping them local until others need them
|
||||
|
@ -142,16 +144,21 @@ export async function generatePackageInfoFromArchiveBuffer(
|
|||
archiveBuffer: Buffer,
|
||||
contentType: string
|
||||
): Promise<{ paths: string[]; packageInfo: ArchivePackage }> {
|
||||
const manifests: ManifestMap = {};
|
||||
const manifestsAndRoutingRules: AssetsBufferMap = {};
|
||||
const entries = await unpackBufferEntries(archiveBuffer, contentType);
|
||||
const paths: string[] = [];
|
||||
entries.forEach(({ path: bufferPath, buffer }) => {
|
||||
paths.push(bufferPath);
|
||||
if (bufferPath.endsWith(MANIFEST_NAME) && buffer) manifests[bufferPath] = buffer;
|
||||
if (
|
||||
buffer &&
|
||||
(bufferPath.endsWith(MANIFEST_NAME) || bufferPath.endsWith(DATASTREAM_ROUTING_RULES_NAME))
|
||||
) {
|
||||
manifestsAndRoutingRules[bufferPath] = buffer;
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
packageInfo: parseAndVerifyArchive(paths, manifests),
|
||||
packageInfo: parseAndVerifyArchive(paths, manifestsAndRoutingRules),
|
||||
paths,
|
||||
};
|
||||
}
|
||||
|
@ -164,18 +171,21 @@ export async function _generatePackageInfoFromPaths(
|
|||
paths: string[],
|
||||
topLevelDir: string
|
||||
): Promise<ArchivePackage> {
|
||||
const manifests: ManifestMap = {};
|
||||
const manifestsAndRoutingRules: AssetsBufferMap = {};
|
||||
await Promise.all(
|
||||
paths.map(async (filePath) => {
|
||||
if (filePath.endsWith(MANIFEST_NAME)) manifests[filePath] = await readFileAsync(filePath);
|
||||
if (filePath.endsWith(MANIFEST_NAME) || filePath.endsWith(DATASTREAM_ROUTING_RULES_NAME)) {
|
||||
manifestsAndRoutingRules[filePath] = await readFileAsync(filePath);
|
||||
}
|
||||
})
|
||||
);
|
||||
return parseAndVerifyArchive(paths, manifests, topLevelDir);
|
||||
|
||||
return parseAndVerifyArchive(paths, manifestsAndRoutingRules, topLevelDir);
|
||||
}
|
||||
|
||||
export function parseAndVerifyArchive(
|
||||
paths: string[],
|
||||
manifests: ManifestMap,
|
||||
manifestsAndRoutingRules: AssetsBufferMap,
|
||||
topLevelDirOverride?: string
|
||||
): ArchivePackage {
|
||||
// The top-level directory must match pkgName-pkgVersion, and no other top-level files or directories may be present
|
||||
|
@ -190,7 +200,7 @@ export function parseAndVerifyArchive(
|
|||
|
||||
// The package must contain a manifest file ...
|
||||
const manifestFile = path.posix.join(toplevelDir, MANIFEST_NAME);
|
||||
const manifestBuffer = manifests[manifestFile];
|
||||
const manifestBuffer = manifestsAndRoutingRules[manifestFile];
|
||||
if (!paths.includes(manifestFile) || !manifestBuffer) {
|
||||
throw new PackageInvalidArchiveError(
|
||||
`Package at top-level directory ${toplevelDir} must contain a top-level ${MANIFEST_NAME} file.`
|
||||
|
@ -239,7 +249,7 @@ export function parseAndVerifyArchive(
|
|||
pkgName: parsed.name,
|
||||
pkgVersion: parsed.version,
|
||||
pkgBasePathOverride: topLevelDirOverride,
|
||||
manifests,
|
||||
manifestsAndRoutingRules,
|
||||
});
|
||||
|
||||
if (parsedDataStreams.length) {
|
||||
|
@ -284,10 +294,10 @@ export function parseAndVerifyDataStreams(opts: {
|
|||
paths: string[];
|
||||
pkgName: string;
|
||||
pkgVersion: string;
|
||||
manifests: ManifestMap;
|
||||
manifestsAndRoutingRules: AssetsBufferMap;
|
||||
pkgBasePathOverride?: string;
|
||||
}): RegistryDataStream[] {
|
||||
const { paths, pkgName, pkgVersion, manifests, pkgBasePathOverride } = opts;
|
||||
const { paths, pkgName, pkgVersion, manifestsAndRoutingRules, pkgBasePathOverride } = opts;
|
||||
// A data stream is made up of a subdirectory of name-version/data_stream/, containing a manifest.yml
|
||||
const dataStreamPaths = new Set<string>();
|
||||
const dataStreams: RegistryDataStream[] = [];
|
||||
|
@ -305,8 +315,8 @@ export function parseAndVerifyDataStreams(opts: {
|
|||
|
||||
dataStreamPaths.forEach((dataStreamPath) => {
|
||||
const fullDataStreamPath = path.posix.join(dataStreamsBasePath, dataStreamPath);
|
||||
const manifestFile = path.posix.join(fullDataStreamPath, MANIFEST_NAME);
|
||||
const manifestBuffer = manifests[manifestFile];
|
||||
const manifestFile = path.posix.join(fullDataStreamPath, DATASTREAM_MANIFEST_NAME);
|
||||
const manifestBuffer = manifestsAndRoutingRules[manifestFile];
|
||||
if (!paths.includes(manifestFile) || !manifestBuffer) {
|
||||
throw new PackageInvalidArchiveError(
|
||||
`No manifest.yml file found for data stream '${dataStreamPath}'`
|
||||
|
@ -322,6 +332,20 @@ export function parseAndVerifyDataStreams(opts: {
|
|||
);
|
||||
}
|
||||
|
||||
// Routing rules
|
||||
const routingRulesFiles = path.posix.join(fullDataStreamPath, DATASTREAM_ROUTING_RULES_NAME);
|
||||
const routingRulesBuffer = manifestsAndRoutingRules[routingRulesFiles];
|
||||
let dataStreamRoutingRules: any;
|
||||
if (routingRulesBuffer) {
|
||||
try {
|
||||
dataStreamRoutingRules = yaml.safeLoad(routingRulesBuffer.toString());
|
||||
} catch (error) {
|
||||
throw new PackageInvalidArchiveError(
|
||||
`Could not parse package manifest for data stream '${dataStreamPath}': ${error}.`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const {
|
||||
title: dataStreamTitle,
|
||||
release = DEFAULT_RELEASE_VALUE,
|
||||
|
@ -357,6 +381,10 @@ export function parseAndVerifyDataStreams(opts: {
|
|||
elasticsearch: parsedElasticsearchEntry,
|
||||
};
|
||||
|
||||
if (dataStreamRoutingRules) {
|
||||
dataStreamObject.routing_rules = dataStreamRoutingRules;
|
||||
}
|
||||
|
||||
if (ingestPipeline) {
|
||||
dataStreamObject.ingest_pipeline = ingestPipeline;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import { appContextService } from '../../app_context';
|
|||
|
||||
import { getArchiveEntry, setArchiveEntry, setArchiveFilelist, setPackageInfo } from '.';
|
||||
import type { ArchiveEntry } from '.';
|
||||
import { MANIFEST_NAME, parseAndVerifyArchive } from './parse';
|
||||
import { DATASTREAM_ROUTING_RULES_NAME, MANIFEST_NAME, parseAndVerifyArchive } from './parse';
|
||||
|
||||
const ONE_BYTE = 1024 * 1024;
|
||||
// could be anything, picked this from https://github.com/elastic/elastic-agent-client/issues/17
|
||||
|
@ -207,7 +207,7 @@ export const getEsPackage = async (
|
|||
return undefined;
|
||||
}
|
||||
|
||||
const manifests: Record<string, Buffer> = {};
|
||||
const manifestsAndRoutingRules: Record<string, Buffer> = {};
|
||||
const entries: ArchiveEntry[] = assets.map(packageAssetToArchiveEntry);
|
||||
const paths: string[] = [];
|
||||
entries.forEach(({ path, buffer }) => {
|
||||
|
@ -216,12 +216,16 @@ export const getEsPackage = async (
|
|||
paths.push(path);
|
||||
}
|
||||
paths.push(path);
|
||||
if (path.endsWith(MANIFEST_NAME) && buffer) manifests[path] = buffer;
|
||||
if (path.endsWith(MANIFEST_NAME) && buffer) {
|
||||
manifestsAndRoutingRules[path] = buffer;
|
||||
} else if (path.endsWith(DATASTREAM_ROUTING_RULES_NAME) && buffer) {
|
||||
manifestsAndRoutingRules[path] = buffer;
|
||||
}
|
||||
});
|
||||
// // Add asset references to cache
|
||||
setArchiveFilelist({ name: pkgName, version: pkgVersion }, paths);
|
||||
|
||||
const packageInfo = parseAndVerifyArchive(paths, manifests);
|
||||
const packageInfo = parseAndVerifyArchive(paths, manifestsAndRoutingRules);
|
||||
setPackageInfo({ name: pkgName, version: pkgVersion, packageInfo });
|
||||
|
||||
return {
|
||||
|
|
|
@ -11,7 +11,7 @@ import path from 'path';
|
|||
import type { RegistryDataStream } from '../../../../types';
|
||||
|
||||
import {
|
||||
addCustomPipelineProcessor,
|
||||
addCustomPipelineAndLocalRoutingRulesProcessor,
|
||||
getPipelineNameForInstallation,
|
||||
rewriteIngestPipeline,
|
||||
} from './helpers';
|
||||
|
@ -142,9 +142,9 @@ test('getPipelineNameForInstallation gets correct name', () => {
|
|||
);
|
||||
});
|
||||
|
||||
describe('addCustomPipelineProcessor', () => {
|
||||
describe('addCustomPipelineAndLocalRoutingRulesProcessor', () => {
|
||||
it('add custom pipeline processor at the end of the pipeline for yaml pipeline', () => {
|
||||
const pipelineInstall = addCustomPipelineProcessor({
|
||||
const pipelineInstall = addCustomPipelineAndLocalRoutingRulesProcessor({
|
||||
contentForInstallation: `
|
||||
processors:
|
||||
- set:
|
||||
|
@ -170,7 +170,7 @@ processors:
|
|||
});
|
||||
|
||||
it('add custom pipeline processor at the end of the pipeline for json pipeline', () => {
|
||||
const pipelineInstall = addCustomPipelineProcessor({
|
||||
const pipelineInstall = addCustomPipelineAndLocalRoutingRulesProcessor({
|
||||
contentForInstallation: `{
|
||||
"processors": [
|
||||
{
|
||||
|
@ -190,4 +190,89 @@ processors:
|
|||
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"logs-test@custom\\",\\"ignore_missing_pipeline\\":true}}]}"`
|
||||
);
|
||||
});
|
||||
|
||||
describe('with local routing rules', () => {
|
||||
it('add reroute processor after custom pipeline processor for yaml pipeline', () => {
|
||||
const pipelineInstall = addCustomPipelineAndLocalRoutingRulesProcessor({
|
||||
contentForInstallation: `
|
||||
processors:
|
||||
- set:
|
||||
field: test
|
||||
value: toto
|
||||
`,
|
||||
extension: 'yml',
|
||||
nameForInstallation: 'logs-test-1.0.0',
|
||||
customIngestPipelineNameForInstallation: 'logs-test@custom',
|
||||
dataStream: {
|
||||
dataset: 'test',
|
||||
routing_rules: [
|
||||
{
|
||||
source_dataset: 'test',
|
||||
rules: [
|
||||
{
|
||||
target_dataset: 'test.reroute',
|
||||
if: 'true == true',
|
||||
namespace: 'default',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
} as any,
|
||||
});
|
||||
|
||||
expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(`
|
||||
"---
|
||||
processors:
|
||||
- set:
|
||||
field: test
|
||||
value: toto
|
||||
- pipeline:
|
||||
name: logs-test@custom
|
||||
ignore_missing_pipeline: true
|
||||
- reroute:
|
||||
tag: test
|
||||
dataset: test.reroute
|
||||
namespace: default
|
||||
if: true == true
|
||||
"
|
||||
`);
|
||||
});
|
||||
|
||||
it('add reroute processor after custom pipeline processor for json pipeline', () => {
|
||||
const pipelineInstall = addCustomPipelineAndLocalRoutingRulesProcessor({
|
||||
contentForInstallation: `{
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "test",
|
||||
"value": "toto"
|
||||
}
|
||||
}
|
||||
]
|
||||
}`,
|
||||
extension: 'json',
|
||||
nameForInstallation: 'logs-test-1.0.0',
|
||||
customIngestPipelineNameForInstallation: 'logs-test@custom',
|
||||
dataStream: {
|
||||
dataset: 'test',
|
||||
routing_rules: [
|
||||
{
|
||||
source_dataset: 'test',
|
||||
rules: [
|
||||
{
|
||||
target_dataset: 'test.reroute',
|
||||
if: 'true == true',
|
||||
namespace: 'default',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
} as any,
|
||||
});
|
||||
|
||||
expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(
|
||||
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"logs-test@custom\\",\\"ignore_missing_pipeline\\":true}},{\\"reroute\\":{\\"tag\\":\\"test\\",\\"dataset\\":\\"test.reroute\\",\\"namespace\\":\\"default\\",\\"if\\":\\"true == true\\"}}]}"`
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -69,21 +69,39 @@ function mutatePipelineContentWithNewProcessor(jsonPipelineContent: any, process
|
|||
jsonPipelineContent.processors.push(processor);
|
||||
}
|
||||
|
||||
export function addCustomPipelineProcessor(pipeline: PipelineInstall): PipelineInstall {
|
||||
export function addCustomPipelineAndLocalRoutingRulesProcessor(
|
||||
pipeline: PipelineInstall
|
||||
): PipelineInstall {
|
||||
if (!pipeline.customIngestPipelineNameForInstallation) {
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
const localRoutingRules =
|
||||
pipeline.dataStream?.routing_rules?.find(
|
||||
(rule) => rule.source_dataset === pipeline.dataStream?.dataset
|
||||
)?.rules ?? [];
|
||||
|
||||
const customPipelineProcessor = {
|
||||
pipeline: {
|
||||
name: pipeline.customIngestPipelineNameForInstallation,
|
||||
ignore_missing_pipeline: true,
|
||||
},
|
||||
};
|
||||
const rerouteProcessors = localRoutingRules.map((routingRule) => ({
|
||||
reroute: {
|
||||
tag: pipeline.dataStream?.dataset,
|
||||
dataset: routingRule.target_dataset,
|
||||
namespace: routingRule.namespace,
|
||||
if: routingRule.if,
|
||||
},
|
||||
}));
|
||||
|
||||
if (pipeline.extension === 'yml') {
|
||||
const parsedPipelineContent = safeLoad(pipeline.contentForInstallation);
|
||||
mutatePipelineContentWithNewProcessor(parsedPipelineContent, customPipelineProcessor);
|
||||
rerouteProcessors.forEach((processor) =>
|
||||
mutatePipelineContentWithNewProcessor(parsedPipelineContent, processor)
|
||||
);
|
||||
return {
|
||||
...pipeline,
|
||||
contentForInstallation: `---\n${safeDump(parsedPipelineContent)}`,
|
||||
|
@ -92,6 +110,9 @@ export function addCustomPipelineProcessor(pipeline: PipelineInstall): PipelineI
|
|||
|
||||
const parsedPipelineContent = JSON.parse(pipeline.contentForInstallation);
|
||||
mutatePipelineContentWithNewProcessor(parsedPipelineContent, customPipelineProcessor);
|
||||
rerouteProcessors.forEach((processor) =>
|
||||
mutatePipelineContentWithNewProcessor(parsedPipelineContent, processor)
|
||||
);
|
||||
|
||||
return {
|
||||
...pipeline,
|
||||
|
|
|
@ -34,7 +34,7 @@ import {
|
|||
getPipelineNameForInstallation,
|
||||
rewriteIngestPipeline,
|
||||
isTopLevelPipeline,
|
||||
addCustomPipelineProcessor,
|
||||
addCustomPipelineAndLocalRoutingRulesProcessor,
|
||||
} from './helpers';
|
||||
import type { PipelineInstall, RewriteSubstitution } from './types';
|
||||
|
||||
|
@ -152,12 +152,9 @@ export async function installAllPipelines({
|
|||
const pipelinePaths = dataStream
|
||||
? paths.filter((path) => isDataStreamPipeline(path, dataStream.path))
|
||||
: paths;
|
||||
const pipelinesInfos: Array<{
|
||||
nameForInstallation: string;
|
||||
customIngestPipelineNameForInstallation?: string;
|
||||
content: string;
|
||||
extension: string;
|
||||
}> = [];
|
||||
const pipelinesInfos: Array<
|
||||
Omit<PipelineInstall, 'contentForInstallation'> & { content: string }
|
||||
> = [];
|
||||
const substitutions: RewriteSubstitution[] = [];
|
||||
|
||||
let datastreamPipelineCreated = false;
|
||||
|
@ -177,6 +174,7 @@ export async function installAllPipelines({
|
|||
nameForInstallation,
|
||||
customIngestPipelineNameForInstallation:
|
||||
dataStream && isMainPipeline ? getCustomPipelineNameForDatastream(dataStream) : undefined,
|
||||
dataStream,
|
||||
content,
|
||||
extension,
|
||||
});
|
||||
|
@ -203,6 +201,7 @@ export async function installAllPipelines({
|
|||
pipelinesToInstall.push({
|
||||
nameForInstallation,
|
||||
customIngestPipelineNameForInstallation: getCustomPipelineNameForDatastream(dataStream),
|
||||
dataStream,
|
||||
contentForInstallation: 'processors: []',
|
||||
extension: 'yml',
|
||||
});
|
||||
|
@ -234,7 +233,7 @@ async function installPipeline({
|
|||
});
|
||||
|
||||
if (shouldAddCustomPipelineProcessor) {
|
||||
pipelineToInstall = addCustomPipelineProcessor(pipelineToInstall);
|
||||
pipelineToInstall = addCustomPipelineAndLocalRoutingRulesProcessor(pipelineToInstall);
|
||||
}
|
||||
|
||||
const esClientParams = {
|
||||
|
|
|
@ -5,11 +5,14 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import type { RegistryDataStream } from '../../../../types';
|
||||
|
||||
export interface PipelineInstall {
|
||||
nameForInstallation: string;
|
||||
contentForInstallation: string;
|
||||
customIngestPipelineNameForInstallation?: string;
|
||||
extension: string;
|
||||
dataStream?: RegistryDataStream;
|
||||
}
|
||||
|
||||
export interface RewriteSubstitution {
|
||||
|
|
|
@ -78,7 +78,7 @@ export default function (providerContext: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
describe('Without custom pipeline', () => {
|
||||
describe('With custom pipeline', () => {
|
||||
before(() =>
|
||||
es.ingest.putPipeline({
|
||||
id: CUSTOM_PIPELINE,
|
||||
|
|
|
@ -43,6 +43,7 @@ export default function loadTests({ loadTestFile, getService }) {
|
|||
loadTestFile(require.resolve('./install_hidden_datastreams'));
|
||||
loadTestFile(require.resolve('./bulk_get_assets'));
|
||||
loadTestFile(require.resolve('./install_dynamic_template_metric'));
|
||||
loadTestFile(require.resolve('./routing_rules'));
|
||||
loadTestFile(require.resolve('./install_runtime_field'));
|
||||
});
|
||||
}
|
||||
|
|
87
x-pack/test/fleet_api_integration/apis/epm/routing_rules.ts
Normal file
87
x-pack/test/fleet_api_integration/apis/epm/routing_rules.ts
Normal file
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import expect from '@kbn/expect';
|
||||
import { FtrProviderContext } from '../../../api_integration/ftr_provider_context';
|
||||
import { setupFleetAndAgents } from '../agents/services';
|
||||
import { skipIfNoDockerRegistry } from '../../helpers';
|
||||
|
||||
const TEST_WRITE_INDEX = 'logs-routing_rules.test-test';
|
||||
const TEST_REROUTE_INDEX = 'logs-routing_rules.reroute-test';
|
||||
|
||||
const ROUTING_RULES_PKG_NAME = 'routing_rules';
|
||||
const ROUTING_RULES_PKG_VERSION = '1.0.0';
|
||||
|
||||
export default function (providerContext: FtrProviderContext) {
|
||||
const { getService } = providerContext;
|
||||
const supertest = getService('supertest');
|
||||
const es = getService('es');
|
||||
const esArchiver = getService('esArchiver');
|
||||
|
||||
describe('routing rules for fleet managed datastreams', () => {
|
||||
skipIfNoDockerRegistry(providerContext);
|
||||
before(async () => {
|
||||
await esArchiver.load('x-pack/test/functional/es_archives/fleet/empty_fleet_server');
|
||||
});
|
||||
setupFleetAndAgents(providerContext);
|
||||
|
||||
before(async () => {
|
||||
await supertest
|
||||
.post(`/api/fleet/epm/packages/${ROUTING_RULES_PKG_NAME}/${ROUTING_RULES_PKG_VERSION}`)
|
||||
.set('kbn-xsrf', 'xxxx')
|
||||
.send({ force: true })
|
||||
.expect(200);
|
||||
});
|
||||
after(async () => {
|
||||
await supertest
|
||||
.delete(`/api/fleet/epm/packages/${ROUTING_RULES_PKG_NAME}/${ROUTING_RULES_PKG_VERSION}`)
|
||||
.set('kbn-xsrf', 'xxxx')
|
||||
.send({ force: true })
|
||||
.expect(200);
|
||||
});
|
||||
|
||||
after(async () => {
|
||||
await esArchiver.unload('x-pack/test/functional/es_archives/fleet/empty_fleet_server');
|
||||
});
|
||||
|
||||
after(async () => {
|
||||
const res = await es.search({
|
||||
index: TEST_REROUTE_INDEX,
|
||||
ignore_unavailable: true,
|
||||
});
|
||||
|
||||
for (const hit of res.hits.hits) {
|
||||
await es.delete({
|
||||
id: hit._id,
|
||||
index: hit._index,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
it('Should write doc correctly and apply the routing rule', async () => {
|
||||
const res = await es.index({
|
||||
index: TEST_WRITE_INDEX,
|
||||
body: {
|
||||
'@timestamp': '2020-01-01T09:09:00',
|
||||
message: 'hello',
|
||||
data_stream: {
|
||||
dataset: 'routing_rules.test',
|
||||
namespace: 'test',
|
||||
type: 'logs',
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const resWrite = await es.get({
|
||||
id: res._id,
|
||||
index: res._index,
|
||||
});
|
||||
|
||||
expect(resWrite._index.match(/logs-routing_rules.reroute-test/));
|
||||
});
|
||||
});
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
- name: data_stream.type
|
||||
type: constant_keyword
|
||||
description: >
|
||||
Data stream type.
|
||||
- name: data_stream.dataset
|
||||
type: constant_keyword
|
||||
description: >
|
||||
Data stream dataset.
|
||||
- name: data_stream.namespace
|
||||
type: constant_keyword
|
||||
description: >
|
||||
Data stream namespace.
|
||||
- name: '@timestamp'
|
||||
type: date
|
||||
description: >
|
||||
Event timestamp.
|
||||
- name: 'message'
|
||||
type: text
|
|
@ -0,0 +1,6 @@
|
|||
title: Test Dataset
|
||||
dataset: routing_rules.test
|
||||
type: logs
|
||||
elasticsearch:
|
||||
dynamic_dataset: true
|
||||
dynamic_namespace: true
|
|
@ -0,0 +1,9 @@
|
|||
# "Local" routing rules are included under this current dataset, not a special case
|
||||
- source_dataset: routing_rules.test
|
||||
rules:
|
||||
# Route error logs to `nginx.error` when they're sourced from an error logfile
|
||||
- target_dataset: routing_rules.reroute
|
||||
if: 'true == true'
|
||||
namespace:
|
||||
- '{{ data_stream.namespace }}'
|
||||
- default
|
|
@ -0,0 +1,3 @@
|
|||
# routing rules
|
||||
|
||||
This package has routing rules
|
|
@ -0,0 +1,7 @@
|
|||
<svg xmlns="http://www.w3.org/2000/svg" width="64" height="64" viewBox="0 0 64 64">
|
||||
<g fill="none" fill-rule="evenodd">
|
||||
<path fill="#F04E98" d="M29,32.0001 L15.935,9.4321 C13.48,5.1941 7,6.9351 7,11.8321 L7,52.1681 C7,57.0651 13.48,58.8061 15.935,54.5671 L29,32.0001 Z"/>
|
||||
<path fill="#FA744E" d="M34.7773,32.0001 L33.3273,34.5051 L20.2613,57.0731 C19.8473,57.7871 19.3533,58.4271 18.8023,59.0001 L34.9273,59.0001 C38.7073,59.0001 42.2213,57.0601 44.2363,53.8611 L58.0003,32.0001 L34.7773,32.0001 Z"/>
|
||||
<path fill="#343741" d="M44.2363,10.1392 C42.2213,6.9402 38.7073,5.0002 34.9273,5.0002 L18.8023,5.0002 C19.3533,5.5732 19.8473,6.2122 20.2613,6.9272 L33.3273,29.4942 L34.7773,32.0002 L58.0003,32.0002 L44.2363,10.1392 Z"/>
|
||||
</g>
|
||||
</svg>
|
After Width: | Height: | Size: 750 B |
|
@ -0,0 +1,25 @@
|
|||
format_version: 2.9.0
|
||||
name: routing_rules
|
||||
title: Package with routing rules
|
||||
description: This integration package has routing rules.
|
||||
version: 1.0.0
|
||||
categories: []
|
||||
# Options are experimental, beta, ga
|
||||
release: beta
|
||||
# The package type. The options for now are [integration, solution], more type might be added in the future.
|
||||
# The default type is integration and will be set if empty.
|
||||
type: integration
|
||||
license: basic
|
||||
owner:
|
||||
github: elastic/fleet
|
||||
|
||||
requirement:
|
||||
elasticsearch:
|
||||
versions: '>7.7.0'
|
||||
kibana:
|
||||
versions: '>7.7.0'
|
||||
|
||||
icons:
|
||||
- src: '/img/logo.svg'
|
||||
size: '16x16'
|
||||
type: 'image/svg+xml'
|
Loading…
Add table
Add a link
Reference in a new issue