mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 17:28:26 -04:00
fix(slo): non-breaking changes of an SLO running with older resources is a breaking change (#207090)
This commit is contained in:
parent
4f4637da58
commit
ac0a6e4100
5 changed files with 124 additions and 27 deletions
|
@ -26,6 +26,7 @@ const createTransformManagerMock = (): jest.Mocked<TransformManager> => {
|
|||
start: jest.fn(),
|
||||
stop: jest.fn(),
|
||||
inspect: jest.fn(),
|
||||
getVersion: jest.fn(),
|
||||
};
|
||||
};
|
||||
|
||||
|
@ -37,6 +38,7 @@ const createSummaryTransformManagerMock = (): jest.Mocked<TransformManager> => {
|
|||
start: jest.fn(),
|
||||
stop: jest.fn(),
|
||||
inspect: jest.fn(),
|
||||
getVersion: jest.fn(),
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
@ -111,4 +111,21 @@ export class DefaultSummaryTransformManager implements TransformManager {
|
|||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async getVersion(transformId: TransformId): Promise<number | undefined> {
|
||||
try {
|
||||
const response = await retryTransientEsErrors(
|
||||
() =>
|
||||
this.scopedClusterClient.asSecondaryAuthUser.transform.getTransform(
|
||||
{ transform_id: transformId },
|
||||
{ ignore: [404] }
|
||||
),
|
||||
{ logger: this.logger }
|
||||
);
|
||||
return response?.transforms[0]?._meta?.version;
|
||||
} catch (err) {
|
||||
this.logger.error(`Cannot retrieve SLO transform version [${transformId}]. ${err}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ export interface TransformManager {
|
|||
start(transformId: TransformId): Promise<void>;
|
||||
stop(transformId: TransformId): Promise<void>;
|
||||
uninstall(transformId: TransformId): Promise<void>;
|
||||
getVersion(transformId: TransformId): Promise<number | undefined>;
|
||||
}
|
||||
|
||||
export class DefaultTransformManager implements TransformManager {
|
||||
|
@ -133,6 +134,23 @@ export class DefaultTransformManager implements TransformManager {
|
|||
}
|
||||
}
|
||||
|
||||
async getVersion(transformId: TransformId): Promise<number | undefined> {
|
||||
try {
|
||||
const response = await retryTransientEsErrors(
|
||||
() =>
|
||||
this.scopedClusterClient.asSecondaryAuthUser.transform.getTransform(
|
||||
{ transform_id: transformId },
|
||||
{ ignore: [404] }
|
||||
),
|
||||
{ logger: this.logger }
|
||||
);
|
||||
return response?.transforms[0]?._meta?.version;
|
||||
} catch (err) {
|
||||
this.logger.error(`Cannot retrieve SLO transform version [${transformId}]. ${err}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async scheduleNowTransform(transformId: TransformId) {
|
||||
this.scopedClusterClient.asSecondaryAuthUser.transform
|
||||
.scheduleNowTransform({ transform_id: transformId })
|
||||
|
|
|
@ -21,6 +21,7 @@ import {
|
|||
getSLOSummaryTransformId,
|
||||
getSLOTransformId,
|
||||
SLO_DESTINATION_INDEX_PATTERN,
|
||||
SLO_RESOURCES_VERSION,
|
||||
SLO_SUMMARY_DESTINATION_INDEX_PATTERN,
|
||||
} from '../../common/constants';
|
||||
import { SLODefinition } from '../domain/models';
|
||||
|
@ -68,7 +69,7 @@ describe('UpdateSLO', () => {
|
|||
);
|
||||
});
|
||||
|
||||
describe('when the update payload does not change the original SLO', () => {
|
||||
describe('when the update does not change the original SLO', () => {
|
||||
function expectNoCallsToAnyMocks() {
|
||||
expect(mockEsClient.security.hasPrivileges).not.toBeCalled();
|
||||
|
||||
|
@ -86,6 +87,10 @@ describe('UpdateSLO', () => {
|
|||
expect(mockScopedClusterClient.asSecondaryAuthUser.ingest.putPipeline).not.toBeCalled();
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
mockSummaryTransformManager.getVersion.mockResolvedValue(SLO_RESOURCES_VERSION);
|
||||
});
|
||||
|
||||
it('returns early with a fully identical SLO payload', async () => {
|
||||
const slo = createSLO();
|
||||
mockRepository.findById.mockResolvedValueOnce(slo);
|
||||
|
@ -194,13 +199,69 @@ describe('UpdateSLO', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('handles breaking changes', () => {
|
||||
describe('without breaking changes update', () => {
|
||||
beforeEach(() => {
|
||||
mockEsClient.security.hasPrivileges.mockResolvedValue({
|
||||
has_all_requested: true,
|
||||
} as SecurityHasPrivilegesResponse);
|
||||
});
|
||||
|
||||
describe('when resources are up-to-date', () => {
|
||||
beforeEach(() => {
|
||||
mockSummaryTransformManager.getVersion.mockResolvedValue(SLO_RESOURCES_VERSION);
|
||||
});
|
||||
it('updates the summary pipeline with the new non-breaking changes', async () => {
|
||||
const slo = createSLO();
|
||||
mockRepository.findById.mockResolvedValueOnce(slo);
|
||||
await updateSLO.execute(slo.id, { name: 'updated name' });
|
||||
|
||||
expectNonBreakingChangeUpdatedResources();
|
||||
});
|
||||
|
||||
function expectNonBreakingChangeUpdatedResources() {
|
||||
expect(mockScopedClusterClient.asSecondaryAuthUser.ingest.putPipeline).toHaveBeenCalled();
|
||||
|
||||
expect(mockTransformManager.install).not.toHaveBeenCalled();
|
||||
expect(mockTransformManager.start).not.toHaveBeenCalled();
|
||||
expect(mockSummaryTransformManager.install).not.toHaveBeenCalled();
|
||||
expect(mockSummaryTransformManager.start).not.toHaveBeenCalled();
|
||||
|
||||
expect(mockEsClient.index).not.toHaveBeenCalled();
|
||||
}
|
||||
});
|
||||
|
||||
describe('when resources are running on an older version', () => {
|
||||
beforeEach(() => {
|
||||
mockSummaryTransformManager.getVersion.mockResolvedValue(SLO_RESOURCES_VERSION - 2);
|
||||
});
|
||||
|
||||
it('consideres the non-breaking changes as breaking', async () => {
|
||||
const slo = createSLO();
|
||||
mockRepository.findById.mockResolvedValueOnce(slo);
|
||||
await updateSLO.execute(slo.id, { name: 'updated name' });
|
||||
|
||||
expect(mockRepository.update).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
...slo,
|
||||
name: 'updated name',
|
||||
revision: 2,
|
||||
updatedAt: expect.anything(),
|
||||
})
|
||||
);
|
||||
expectInstallationOfUpdatedSLOResources();
|
||||
expectDeletionOfOriginalSLOResources(slo);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('with breaking changes update', () => {
|
||||
beforeEach(() => {
|
||||
mockEsClient.security.hasPrivileges.mockResolvedValue({
|
||||
has_all_requested: true,
|
||||
} as SecurityHasPrivilegesResponse);
|
||||
mockSummaryTransformManager.getVersion.mockResolvedValue(SLO_RESOURCES_VERSION);
|
||||
});
|
||||
|
||||
it('consideres a settings change as a breaking change', async () => {
|
||||
const slo = createSLO();
|
||||
mockRepository.findById.mockResolvedValueOnce(slo);
|
||||
|
@ -315,6 +376,7 @@ describe('UpdateSLO', () => {
|
|||
mockEsClient.security.hasPrivileges.mockResolvedValue({
|
||||
has_all_requested: true,
|
||||
} as SecurityHasPrivilegesResponse);
|
||||
mockSummaryTransformManager.getVersion.mockResolvedValue(SLO_RESOURCES_VERSION);
|
||||
});
|
||||
|
||||
it('throws a SecurityException error when the user does not have the required privileges on the source index', async () => {
|
||||
|
|
|
@ -11,6 +11,7 @@ import { asyncForEach } from '@kbn/std';
|
|||
import { isEqual, pick } from 'lodash';
|
||||
import {
|
||||
SLO_DESTINATION_INDEX_PATTERN,
|
||||
SLO_RESOURCES_VERSION,
|
||||
SLO_SUMMARY_DESTINATION_INDEX_PATTERN,
|
||||
SLO_SUMMARY_TEMP_INDEX_NAME,
|
||||
getSLOPipelineId,
|
||||
|
@ -53,15 +54,7 @@ export class UpdateSLO {
|
|||
return this.toResponse(originalSlo);
|
||||
}
|
||||
|
||||
const fields = [
|
||||
'indicator',
|
||||
'groupBy',
|
||||
'timeWindow',
|
||||
'objective',
|
||||
'budgetingMethod',
|
||||
'settings',
|
||||
];
|
||||
const requireRevisionBump = !isEqual(pick(originalSlo, fields), pick(updatedSlo, fields));
|
||||
const requireRevisionBump = await this.isRevisionBumpRequired(originalSlo, updatedSlo);
|
||||
|
||||
updatedSlo = Object.assign(updatedSlo, {
|
||||
updatedAt: new Date(),
|
||||
|
@ -77,23 +70,8 @@ export class UpdateSLO {
|
|||
rollbackOperations.push(() => this.repository.update(originalSlo));
|
||||
|
||||
if (!requireRevisionBump) {
|
||||
// At this point, we still need to update the sli and summary pipeline to include the changes (id and revision in the rollup index) and (name, desc, tags, ...) in the summary index
|
||||
|
||||
// we only have to update the summary pipeline to include the non-breaking changes (name, desc, tags, ...) in the summary index
|
||||
try {
|
||||
await retryTransientEsErrors(
|
||||
() =>
|
||||
this.scopedClusterClient.asSecondaryAuthUser.ingest.putPipeline(
|
||||
getSLOPipelineTemplate(updatedSlo)
|
||||
),
|
||||
{ logger: this.logger }
|
||||
);
|
||||
rollbackOperations.push(() =>
|
||||
this.scopedClusterClient.asSecondaryAuthUser.ingest.deletePipeline(
|
||||
{ id: getSLOPipelineId(updatedSlo.id, updatedSlo.revision) },
|
||||
{ ignore: [404] }
|
||||
)
|
||||
);
|
||||
|
||||
await retryTransientEsErrors(
|
||||
() =>
|
||||
this.scopedClusterClient.asSecondaryAuthUser.ingest.putPipeline(
|
||||
|
@ -205,6 +183,26 @@ export class UpdateSLO {
|
|||
return this.toResponse(updatedSlo);
|
||||
}
|
||||
|
||||
private async isRevisionBumpRequired(originalSlo: SLODefinition, updatedSlo: SLODefinition) {
|
||||
const fields = [
|
||||
'indicator',
|
||||
'groupBy',
|
||||
'timeWindow',
|
||||
'objective',
|
||||
'budgetingMethod',
|
||||
'settings',
|
||||
];
|
||||
const hasBreakingChanges = !isEqual(pick(originalSlo, fields), pick(updatedSlo, fields));
|
||||
const currentResourcesVersion = await this.summaryTransformManager.getVersion(
|
||||
getSLOSummaryTransformId(originalSlo.id, originalSlo.revision)
|
||||
);
|
||||
|
||||
const hasOutdatedVersion =
|
||||
currentResourcesVersion === undefined || currentResourcesVersion < SLO_RESOURCES_VERSION;
|
||||
|
||||
return hasBreakingChanges || hasOutdatedVersion;
|
||||
}
|
||||
|
||||
private async deleteOriginalSLO(originalSlo: SLODefinition) {
|
||||
try {
|
||||
const originalRollupTransformId = getSLOTransformId(originalSlo.id, originalSlo.revision);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue