🌊 Streams: Support classic streams without pipelines (#210223)

Unwired streams can come with or without a default ingest pipeline
defined. If there is a pipeline defined, it's clear where to plug in our
custom parsing logic.

However, so far we would bail out on streams without a configured ingest
pipeline. However, since there are good reasons to have data streams
without an ingest pipeline, we should support this case.

This PR makes sure we do:
* If the streams processing pipeline is the default pipeline itself
already, everything is fine
* If there is none yet, extend the user-managed index template to set
the default pipeline to the streams processing pipeline and roll over
the data stream to apply
This commit is contained in:
Joe Reuter 2025-02-13 11:20:29 +01:00 committed by GitHub
parent c241772ecb
commit 25127500bb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 151 additions and 14 deletions

View file

@ -14,6 +14,7 @@ import {
import { isResponseError } from '@kbn/es-errors';
import {
IndicesDataStream,
IndicesIndexTemplate,
IngestPipeline,
IngestProcessorContainer,
} from '@elastic/elasticsearch/lib/api/types';
@ -156,13 +157,58 @@ async function tryGettingPipeline({
});
}
type UnwrapPromise<T extends Promise<any>> = T extends Promise<infer Value> ? Value : never;
async function ensureStreamManagedPipelineReference(
scopedClusterClient: IScopedClusterClient,
pipelineName: string,
pipelineName: string | undefined,
definition: StreamDefinition,
executionPlan: ExecutionPlanStep[]
executionPlan: ExecutionPlanStep[],
unmanagedAssets: UnwrapPromise<ReturnType<typeof getUnmanagedElasticsearchAssets>>
) {
const streamManagedPipelineName = getProcessingPipelineName(definition.name);
if (pipelineName === streamManagedPipelineName) {
// the data stream is already calling the stream managed pipeline directly
return;
}
if (!pipelineName) {
// no ingest pipeline, we need to update the template to call the stream managed pipeline as
// the default pipeline
const indexTemplateAsset = unmanagedAssets.find((asset) => asset.type === 'index_template');
if (!indexTemplateAsset) {
throw new Error(`Could not find index template for stream ${definition.name}`);
}
const indexTemplate = (
await scopedClusterClient.asCurrentUser.indices.getIndexTemplate({
name: indexTemplateAsset.id,
})
).index_templates[0].index_template;
const updatedTemplate: IndicesIndexTemplate = {
...indexTemplate,
template: {
...indexTemplate.template,
settings: {
...indexTemplate.template?.settings,
index: {
...indexTemplate.template?.settings?.index,
default_pipeline: streamManagedPipelineName,
},
},
},
};
executionPlan.push({
method: 'PUT',
path: `/_index_template/${indexTemplateAsset.id}`,
body: updatedTemplate as unknown as Record<string, unknown>,
});
// rollover the data stream to apply the new default pipeline
executionPlan.push({
method: 'POST',
path: `/${definition.name}/_rollover`,
});
return;
}
const { targetPipelineName, targetPipeline, referencesStreamManagedPipeline } =
await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, definition.name);
if (!referencesStreamManagedPipeline) {
@ -205,17 +251,12 @@ export async function syncUnwiredStreamDefinitionObjects({
const executionPlan: ExecutionPlanStep[] = [];
const streamManagedPipelineName = getProcessingPipelineName(definition.name);
const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id;
if (!pipelineName) {
throw new Error('Unmanaged stream needs a default ingest pipeline');
}
if (pipelineName === streamManagedPipelineName) {
throw new Error('Unmanaged stream cannot have the @stream pipeline as the default pipeline');
}
await ensureStreamManagedPipelineReference(
scopedClusterClient,
pipelineName,
definition,
executionPlan
executionPlan,
unmanagedAssets
);
if (definition.ingest.processing.length) {

View file

@ -171,13 +171,17 @@ export async function getUnmanagedElasticsearchAssets({
const currentIndex = await scopedClusterClient.asCurrentUser.indices.get({
index: writeIndexName,
});
const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline!;
const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline;
return [
{
type: 'ingest_pipeline' as const,
id: ingestPipelineId,
},
...(ingestPipelineId
? [
{
type: 'ingest_pipeline' as const,
id: ingestPipelineId,
},
]
: []),
...componentTemplates.map((componentTemplateName) => ({
type: 'component_template' as const,
id: componentTemplateName,

View file

@ -242,5 +242,97 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
);
expect(classicStream).to.eql(undefined);
});
describe('Classic stream without pipeline', () => {
const TEMPLATE_NAME = 'mytemplate';
const DATA_STREAM_NAME = 'mytest-abc';
before(async () => {
await esClient.indices.putIndexTemplate({
name: TEMPLATE_NAME,
body: {
index_patterns: ['mytest*'],
priority: 1000,
template: {
lifecycle: {
data_retention: '7d',
},
},
data_stream: {
allow_custom_routing: false,
hidden: false,
},
},
});
await esClient.indices.createDataStream({
name: DATA_STREAM_NAME,
});
});
after(async () => {
await esClient.indices.deleteDataStream({
name: DATA_STREAM_NAME,
});
await esClient.indices.deleteIndexTemplate({
name: TEMPLATE_NAME,
});
});
it('Allows adding processing to classic streams without pipeline', async () => {
const putResponse = await apiClient.fetch('PUT /api/streams/{name}', {
params: {
path: {
name: DATA_STREAM_NAME,
},
body: {
dashboards: [],
stream: {
ingest: {
lifecycle: { inherit: {} },
routing: [],
processing: [
{
grok: {
if: { always: {} },
field: 'message',
patterns: [
'%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}',
],
},
},
],
unwired: {},
},
},
},
},
});
expect(putResponse.status).to.eql(200);
expect(putResponse.body).to.have.property('acknowledged', true);
});
it('Executes processing on classic streams without pipeline', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:10.000Z',
message: '2023-01-01T00:00:10.000Z error test',
};
const response = await indexDocument(esClient, DATA_STREAM_NAME, doc);
expect(response.result).to.eql('created');
const result = await fetchDocument(esClient, DATA_STREAM_NAME, response._id);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:10.000Z',
message: '2023-01-01T00:00:10.000Z error test',
inner_timestamp: '2023-01-01T00:00:10.000Z',
message2: 'test',
log: {
level: 'error',
},
});
});
});
});
}