[Fleet] Add a pipeline processor to all the ingest_pipeline installed by fleet (#134578)

This commit is contained in:
Nicolas Chaulet 2022-06-16 17:44:56 -04:00 committed by GitHub
parent 8da4cb29a7
commit 75f786bf73
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 286 additions and 31 deletions

View file

@ -10,7 +10,12 @@ import path from 'path';
import type { RegistryDataStream } from '../../../../types';
import { getPipelineNameForInstallation, rewriteIngestPipeline } from './helpers';
import {
addCustomPipelineProcessor,
getCustomPipelineNameForDatastream,
getPipelineNameForInstallation,
rewriteIngestPipeline,
} from './helpers';
test('a json-format pipeline with pipeline references is correctly rewritten', () => {
const inputStandard = readFileSync(
@ -137,3 +142,64 @@ test('getPipelineNameForInstallation gets correct name', () => {
`${dataStream.type}-${dataStream.dataset}-${packageVersion}-${pipelineRefName}`
);
});
describe('addCustomPipelineProcessor', () => {
it('add custom pipeline processor at the end of the pipeline for yaml pipeline', () => {
const pipelineInstall = addCustomPipelineProcessor({
contentForInstallation: `
processors:
- set:
field: test
value: toto
`,
extension: 'yml',
nameForInstallation: 'logs-test-1.0.0',
customIngestPipelineNameForInstallation: 'logs-test@custom',
});
expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(`
"---
processors:
- set:
field: test
value: toto
- pipeline:
name: logs-test@custom
ignore_missing_pipeline: true
"
`);
});
it('add custom pipeline processor at the end of the pipeline for json pipeline', () => {
const pipelineInstall = addCustomPipelineProcessor({
contentForInstallation: `{
"processors": [
{
"set": {
"field": "test",
"value": "toto"
}
}
]
}`,
extension: 'json',
nameForInstallation: 'logs-test-1.0.0',
customIngestPipelineNameForInstallation: 'logs-test@custom',
});
expect(pipelineInstall.contentForInstallation).toMatchInlineSnapshot(
`"{\\"processors\\":[{\\"set\\":{\\"field\\":\\"test\\",\\"value\\":\\"toto\\"}},{\\"pipeline\\":{\\"name\\":\\"logs-test@custom\\",\\"ignore_missing_pipeline\\":true}}]}"`
);
});
});
describe('getCustomPipelineNameForDatastream', () => {
it('return the correct custom pipeline for datastream', () => {
const res = getCustomPipelineNameForDatastream({
type: 'logs',
dataset: 'test',
} as any);
expect(res).toBe('logs-test@custom');
});
});

View file

@ -4,11 +4,14 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { safeDump, safeLoad } from 'js-yaml';
import { ElasticsearchAssetType } from '../../../../types';
import type { RegistryDataStream } from '../../../../types';
import { getPathParts } from '../../archive';
import type { PipelineInstall, RewriteSubstitution } from './types';
export const isTopLevelPipeline = (path: string) => {
const pathParts = getPathParts(path);
return (
@ -45,11 +48,9 @@ export const getPipelineNameForDatastream = ({
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}`;
};
export interface RewriteSubstitution {
source: string;
target: string;
templateFunction: string;
}
export const getCustomPipelineNameForDatastream = (dataStream: RegistryDataStream): string => {
return `${dataStream.type}-${dataStream.dataset}@custom`;
};
export function rewriteIngestPipeline(
pipeline: string,
@ -71,3 +72,41 @@ export function rewriteIngestPipeline(
});
return pipeline;
}
function mutatePipelineContentWithNewProcessor(jsonPipelineContent: any, processor: any) {
if (!jsonPipelineContent.processors) {
jsonPipelineContent.processors = [];
}
jsonPipelineContent.processors.push(processor);
}
export function addCustomPipelineProcessor(pipeline: PipelineInstall): PipelineInstall {
if (!pipeline.customIngestPipelineNameForInstallation) {
return pipeline;
}
const customPipelineProcessor = {
pipeline: {
name: pipeline.customIngestPipelineNameForInstallation,
ignore_missing_pipeline: true,
},
};
if (pipeline.extension === 'yml') {
const parsedPipelineContent = safeLoad(pipeline.contentForInstallation);
mutatePipelineContentWithNewProcessor(parsedPipelineContent, customPipelineProcessor);
return {
...pipeline,
contentForInstallation: `---\n${safeDump(parsedPipelineContent)}`,
};
}
const parsedPipelineContent = JSON.parse(pipeline.contentForInstallation);
mutatePipelineContentWithNewProcessor(parsedPipelineContent, customPipelineProcessor);
return {
...pipeline,
contentForInstallation: JSON.stringify(parsedPipelineContent),
};
}

View file

@ -5,6 +5,6 @@
* 2.0.
*/
export { prepareToInstallPipelines, isTopLevelPipeline } from './install';
export { getPipelineNameForDatastream } from './helpers';
export { prepareToInstallPipelines } from './install';
export { getPipelineNameForDatastream, isTopLevelPipeline } from './helpers';
export { deletePreviousPipelines, deletePipeline } from './remove';

View file

@ -19,28 +19,17 @@ import {
} from '../../../../constants';
import { appendMetadataToIngestPipeline } from '../meta';
import { retryTransientEsErrors } from '../retry';
import {
getCustomPipelineNameForDatastream,
getPipelineNameForDatastream,
getPipelineNameForInstallation,
rewriteIngestPipeline,
isTopLevelPipeline,
addCustomPipelineProcessor,
} from './helpers';
import type { RewriteSubstitution } from './helpers';
interface PipelineInstall {
nameForInstallation: string;
contentForInstallation: string;
extension: string;
}
export const isTopLevelPipeline = (path: string) => {
const pathParts = getPathParts(path);
return (
pathParts.type === ElasticsearchAssetType.ingestPipeline && pathParts.dataset === undefined
);
};
import type { PipelineInstall, RewriteSubstitution } from './types';
export const prepareToInstallPipelines = (
installablePackage: InstallablePackage,
@ -156,8 +145,8 @@ export async function installAllPipelines({
? paths.filter((path) => isDataStreamPipeline(path, dataStream.path))
: paths;
const pipelinesInfos: Array<{
name: string;
nameForInstallation: string;
customIngestPipelineNameForInstallation?: string;
content: string;
extension: string;
}> = [];
@ -176,8 +165,10 @@ export async function installAllPipelines({
});
const content = getAsset(path).toString('utf-8');
pipelinesInfos.push({
name,
nameForInstallation,
customIngestPipelineNameForInstallation: dataStream
? getCustomPipelineNameForDatastream(dataStream)
: undefined,
content,
extension,
});
@ -203,6 +194,7 @@ export async function installAllPipelines({
pipelinesToInstall.push({
nameForInstallation,
customIngestPipelineNameForInstallation: getCustomPipelineNameForDatastream(dataStream),
contentForInstallation: 'processors: []',
extension: 'yml',
});
@ -220,27 +212,36 @@ async function installPipeline({
logger,
pipeline,
installablePackage,
shouldAddCustomPipelineProcessor = true,
}: {
esClient: ElasticsearchClient;
logger: Logger;
pipeline: PipelineInstall;
installablePackage?: InstallablePackage;
shouldAddCustomPipelineProcessor?: boolean;
}): Promise<EsAssetReference> {
const pipelineWithMetadata = appendMetadataToIngestPipeline({
let pipelineToInstall = appendMetadataToIngestPipeline({
pipeline,
packageName: installablePackage?.name,
});
if (shouldAddCustomPipelineProcessor) {
pipelineToInstall = addCustomPipelineProcessor(pipelineToInstall);
}
const esClientParams = {
id: pipelineWithMetadata.nameForInstallation,
body: pipelineWithMetadata.contentForInstallation,
id: pipelineToInstall.nameForInstallation,
body:
pipelineToInstall.extension === 'yml'
? pipelineToInstall.contentForInstallation
: JSON.parse(pipelineToInstall.contentForInstallation),
};
const esClientRequestOptions: TransportRequestOptions = {
ignore: [404],
};
if (pipelineWithMetadata.extension === 'yml') {
if (pipelineToInstall.extension === 'yml') {
esClientRequestOptions.headers = {
// pipeline is YAML
'Content-Type': 'application/yaml',
@ -255,7 +256,7 @@ async function installPipeline({
);
return {
id: pipelineWithMetadata.nameForInstallation,
id: pipelineToInstall.nameForInstallation,
type: ElasticsearchAssetType.ingestPipeline,
};
}

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export interface PipelineInstall {
nameForInstallation: string;
contentForInstallation: string;
customIngestPipelineNameForInstallation?: string;
extension: string;
}
export interface RewriteSubstitution {
source: string;
target: string;
templateFunction: string;
}

View file

@ -58,6 +58,6 @@ export function appendMetadataToIngestPipeline({
return {
...pipeline,
contentForInstallation: parsedPipelineContent,
contentForInstallation: JSON.stringify(parsedPipelineContent),
};
}

View file

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../api_integration/ftr_provider_context';
import { setupFleetAndAgents } from '../agents/services';
import { skipIfNoDockerRegistry } from '../../helpers';
const TEST_INDEX = 'logs-log.log-test';
const CUSTOM_PIPELINE = 'logs-log.log@custom';
let pkgVersion: string;
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const supertest = getService('supertest');
const es = getService('es');
const esArchiver = getService('esArchiver');
describe('custom ingest pipeline for fleet managed datastreams', () => {
skipIfNoDockerRegistry(providerContext);
before(async () => {
await esArchiver.load('x-pack/test/functional/es_archives/fleet/empty_fleet_server');
});
setupFleetAndAgents(providerContext);
// Use the custom log package to test the custom ingest pipeline
before(async () => {
const { body: getPackagesRes } = await supertest.get(
`/api/fleet/epm/packages?experimental=true`
);
const logPackage = getPackagesRes.items.find((p: any) => p.name === 'log');
if (!logPackage) {
throw new Error('No log package');
}
pkgVersion = logPackage.version;
await supertest
.post(`/api/fleet/epm/packages/log/${pkgVersion}`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true })
.expect(200);
});
after(async () => {
await supertest
.delete(`/api/fleet/epm/packages/log/${pkgVersion}`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true })
.expect(200);
});
after(async () => {
await esArchiver.unload('x-pack/test/functional/es_archives/fleet/empty_fleet_server');
});
after(async () => {
const res = await es.search({
index: TEST_INDEX,
});
for (const hit of res.hits.hits) {
await es.delete({
id: hit._id,
index: hit._index,
});
}
});
describe('Without custom pipeline', () => {
it('Should write doc correctly', async () => {
const res = await es.index({
index: 'logs-log.log-test',
body: {
'@timestamp': '2020-01-01T09:09:00',
message: 'hello',
},
});
await es.get({
id: res._id,
index: res._index,
});
});
});
describe('Without custom pipeline', () => {
before(() =>
es.ingest.putPipeline({
id: CUSTOM_PIPELINE,
processors: [
{
set: {
field: 'test',
value: 'itworks',
},
},
],
})
);
after(() =>
es.ingest.deletePipeline({
id: CUSTOM_PIPELINE,
})
);
it('Should write doc correctly', async () => {
const res = await es.index({
index: 'logs-log.log-test',
body: {
'@timestamp': '2020-01-01T09:09:00',
message: 'hello',
},
});
const doc = await es.get<{ test: string }>({
id: res._id,
index: res._index,
});
expect(doc._source?.test).to.eql('itworks');
});
});
});
}

View file

@ -30,5 +30,6 @@ export default function loadTests({ loadTestFile }) {
loadTestFile(require.resolve('./remove_legacy_templates'));
loadTestFile(require.resolve('./install_error_rollback'));
loadTestFile(require.resolve('./final_pipeline'));
loadTestFile(require.resolve('./custom_ingest_pipeline'));
});
}