🌊 Streams: Make deleting orphaned streams work (#218054)

Currently streams doesn't allow you to delete an orphaned stream because
`getPipelineTargets` required the data stream to exist.

This PR fixes the problem by handling the case gracefully.
This commit is contained in:
Joe Reuter 2025-04-14 15:49:00 +02:00 committed by GitHub
parent 6722f142a4
commit e2f0fddfd1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 37 additions and 14 deletions

View file

@ -5,7 +5,10 @@
* 2.0.
*/
import type { IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types';
import type {
IndicesDataStream,
IngestProcessorContainer,
} from '@elastic/elasticsearch/lib/api/types';
import type {
IngestStreamLifecycle,
StreamDefinition,
@ -202,7 +205,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
},
});
const { pipeline, template } = await this.getPipelineTargets();
const pipelineTargets = await this.getPipelineTargets();
if (!pipelineTargets) {
throw new StatusError('Could not find pipeline targets', 500);
}
const { pipeline, template } = pipelineTargets;
actions.push({
type: 'delete_processor_from_ingest_pipeline',
pipeline,
@ -253,7 +260,11 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
},
};
const { pipeline, template } = await this.getPipelineTargets();
const pipelineTargets = await this.getPipelineTargets();
if (!pipelineTargets) {
throw new StatusError('Could not find pipeline targets', 500);
}
const { pipeline, template } = pipelineTargets;
actions.push({
type: 'append_processor_to_ingest_pipeline',
pipeline,
@ -290,21 +301,32 @@ export class UnwiredStream extends StreamActiveRecord<UnwiredStreamDefinition> {
name: streamManagedPipelineName,
},
});
const { pipeline, template } = await this.getPipelineTargets();
actions.push({
type: 'delete_processor_from_ingest_pipeline',
pipeline,
template,
dataStream: this._definition.name,
referencePipeline: streamManagedPipelineName,
});
const pipelineTargets = await this.getPipelineTargets();
if (pipelineTargets) {
const { pipeline, template } = pipelineTargets;
actions.push({
type: 'delete_processor_from_ingest_pipeline',
pipeline,
template,
dataStream: this._definition.name,
referencePipeline: streamManagedPipelineName,
});
}
}
return actions;
}
private async getPipelineTargets() {
const dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
let dataStream: IndicesDataStream;
try {
dataStream = await this.dependencies.streamsClient.getDataStream(this._definition.name);
} catch (error) {
if (isNotFoundError(error)) {
return undefined;
}
throw error;
}
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient: this.dependencies.scopedClusterClient,

View file

@ -583,14 +583,15 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
expect(isUnwiredStreamDefinition(classicStream!.stream)).to.be(true);
});
after(async () => {
await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
it('should allow deleting', async () => {
const response = await apiClient.fetch('DELETE /api/streams/{name} 2023-10-31', {
params: {
path: {
name: ORPHANED_STREAM_NAME,
},
},
});
expect(response.status).to.eql(200);
});
});
});