[8.x] 🌊 Streams: Make deleting orphaned streams work (#218054) (#218117)

# Backport

This will backport the following commits from `main` to `8.x`:
- [🌊 Streams: Make deleting orphaned streams work
(#218054)](https://github.com/elastic/kibana/pull/218054)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Joe
Reuter","email":"johannes.reuter@elastic.co"},"sourceCommit":{"committedDate":"2025-04-14T13:49:00Z","message":"🌊
Streams: Make deleting orphaned streams work (#218054)\n\nCurrently
streams doesn't allow you to delete an orphaned stream
because\n`getPipelineTargets` required the data stream to exist.\n\nThis
PR fixes the problem by handling the case
gracefully.","sha":"e2f0fddfd12d5f0fad09821f6559054190305ac7","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:obs-ux-logs","backport:version","Feature:Streams","v9.1.0","v8.19.0"],"title":"🌊
Streams: Make deleting orphaned streams
work","number":218054,"url":"https://github.com/elastic/kibana/pull/218054","mergeCommit":{"message":"🌊
Streams: Make deleting orphaned streams work (#218054)\n\nCurrently
streams doesn't allow you to delete an orphaned stream
because\n`getPipelineTargets` required the data stream to exist.\n\nThis
PR fixes the problem by handling the case
gracefully.","sha":"e2f0fddfd12d5f0fad09821f6559054190305ac7"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/218054","number":218054,"mergeCommit":{"message":"🌊
Streams: Make deleting orphaned streams work (#218054)\n\nCurrently
streams doesn't allow you to delete an orphaned stream
because\n`getPipelineTargets` required the data stream to exist.\n\nThis
PR fixes the problem by handling the case
gracefully.","sha":"e2f0fddfd12d5f0fad09821f6559054190305ac7"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
This commit is contained in:
Kibana Machine 2025-04-14 17:40:13 +02:00 committed by GitHub
parent 2d50a33948
commit 8e6ff32635
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 37 additions and 14 deletions

View file

@ -5,7 +5,10 @@
* 2.0.
*/
import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
import type {
IndicesDataStream,
IngestProcessorContainer,
} from '@elastic/elasticsearch/lib/api/types';
import type {
IngestStreamLifecycle,
StreamDefinition,
@ -202,7 +205,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
},
});
const { pipeline, template } = await this.getPipelineTargets();
const pipelineTargets = await this.getPipelineTargets();
if (!pipelineTargets) {
throw new StatusError('Could not find pipeline targets', 500);
}
const { pipeline, template } = pipelineTargets;
actions.push({
type: 'delete_processor_from_ingest_pipeline',
pipeline,
@ -253,7 +260,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
},
};
const { pipeline, template } = await this.getPipelineTargets();
const pipelineTargets = await this.getPipelineTargets();
if (!pipelineTargets) {
throw new StatusError('Could not find pipeline targets', 500);
}
const { pipeline, template } = pipelineTargets;
actions.push({
type: 'append_processor_to_ingest_pipeline',
pipeline,
@ -290,21 +301,32 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
name: streamManagedPipelineName,
},
});
const { pipeline, template } = await this.getPipelineTargets();
actions.push({
type: 'delete_processor_from_ingest_pipeline',
pipeline,
template,
dataStream: this._definition.name,
referencePipeline: streamManagedPipelineName,
});
const pipelineTargets = await this.getPipelineTargets();
if (pipelineTargets) {
const { pipeline, template } = pipelineTargets;
actions.push({
type: 'delete_processor_from_ingest_pipeline',
pipeline,
template,
dataStream: this._definition.name,
referencePipeline: streamManagedPipelineName,
});
}
}
return actions;
}
private async getPipelineTargets() {
const dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
let dataStream: IndicesDataStream;
try {
dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
} catch (error) {
if (isNotFoundError(error)) {
return undefined;
}
throw error;
}
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient: this.dependencies.scopedClusterClient,

View file

@ -585,14 +585,15 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(isUnwiredStreamDefinition(classicStream!.stream)).to.be(true);
});
after(async () => {
await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
it('should allow deleting', async () => {
const response = await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
params: {
path: {
name: ORPHANED_STREAM_NAME,
},
},
});
expect(response.status).to.eql(200);
});
});
});