[Fleet] Fix unattended Transforms in integration packages not automatically restarting after reauthorizing (#210217)

## Summary

This PR partially addresses an issue with
https://github.com/elastic/integrations/issues/12486 where the transform
doesn't "restart" immediately after reauthorizing. This is because for
unattended transform, calling `_start` will come back with 409 transform
already started error. So this PR tracks if the transform has
`settings.unattended: true`, if yes, stop the transform first before
starting.

Without this step, the transform will retry again and become healthy
again anyway but it takes longer for that retry to happen, so this PR
speeds up the process of retrying.


### Checklist

Check the PR satisfies following conditions. 

Reviewers should verify this PR satisfies this list as well.

- [ ] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/src/platform/packages/shared/kbn-i18n/README.md)
- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [ ] If a plugin configuration key changed, check if it needs to be
allowlisted in the cloud and added to the [docker
list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker)
- [ ] This was checked for breaking HTTP API changes, and any breaking
changes have been approved by the breaking-change committee. The
`release_note:breaking` label should be applied in these situations.
- [ ] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
- [ ] The PR description includes the appropriate Release Notes section,
and the correct `release_note:*` label is applied per the
[guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)

### Identify risks

Does this PR introduce any risks? For example, consider risks like hard
to test bugs, performance regression, potential of data loss.

Describe the risk, its severity, and mitigation for each identified
risk. Invite stakeholders and evaluate how to proceed before merging.

- [ ] [See some risk
examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx)
- [ ] ...

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Quynh Nguyen (Quinn) 2025-02-12 23:49:58 -06:00 committed by GitHub
parent a0f8f1ddff
commit e710f09d0c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -10,17 +10,16 @@ import type { Logger } from '@kbn/logging';
import type { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import { sortBy, uniqBy } from 'lodash';
import pMap from 'p-map';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import type { ErrorResponseBase } from '@elastic/elasticsearch/lib/api/types';
import pMap from 'p-map';
import { MAX_CONCURRENT_TRANSFORMS_OPERATIONS } from '../../../../constants';
import type { SecondaryAuthorizationHeader } from '../../../../../common/types/models/transform_api_key';
import { updateEsAssetReferences } from '../../packages/es_assets_reference';
import type { Installation } from '../../../../../common';
import { ElasticsearchAssetType, PACKAGES_SAVED_OBJECT_TYPE } from '../../../../../common';
import { retryTransientEsErrors } from '../retry';
import { MAX_CONCURRENT_TRANSFORMS_OPERATIONS } from '../../../../constants';
interface FleetTransformMetadata {
fleet_transform_version?: string;
@ -32,6 +31,7 @@ interface FleetTransformMetadata {
last_authorized_by?: string;
run_as_kibana_system?: boolean;
transformId: string;
unattended?: boolean;
}
const isErrorResponse = (arg: unknown): arg is ErrorResponseBase =>
@ -43,6 +43,7 @@ async function reauthorizeAndStartTransform({
transformId,
secondaryAuth,
meta,
shouldStopBeforeStart,
}: {
esClient: ElasticsearchClient;
logger: Logger;
@ -50,6 +51,7 @@ async function reauthorizeAndStartTransform({
secondaryAuth?: SecondaryAuthorizationHeader;
shouldInstallSequentially?: boolean;
meta?: object;
shouldStopBeforeStart?: boolean;
}): Promise<{ transformId: string; success: boolean; error: null | any }> {
try {
await retryTransientEsErrors(
@ -71,6 +73,18 @@ async function reauthorizeAndStartTransform({
}
try {
// For unattended transforms, we need to stop the transform before starting it
// otherwise, starting transform will fail with a 409 error
if (shouldStopBeforeStart) {
await retryTransientEsErrors(
() =>
esClient.transform.stopTransform(
{ transform_id: transformId, wait_for_completion: true },
{ ignore: [404, 409] }
),
{ logger, additionalResponseStatuses: [400] }
);
}
const startedTransform = await retryTransientEsErrors(
() => esClient.transform.startTransform({ transform_id: transformId }, { ignore: [409] }),
{ logger, additionalResponseStatuses: [400] }
@ -121,30 +135,23 @@ export async function handleTransformReauthorizeAndStart({
);
}
const transformInfos = await pMap(
transforms,
({ transformId }) =>
retryTransientEsErrors(
() =>
esClient.transform.getTransform(
{
transform_id: transformId,
},
{ ...(secondaryAuth ? secondaryAuth : {}), ignore: [404] }
),
{ logger, additionalResponseStatuses: [400] }
const transformInfos = await retryTransientEsErrors(
() =>
esClient.transform.getTransform(
{
transform_id: transforms.map((t) => t.transformId).join(','),
},
{ ...(secondaryAuth ? secondaryAuth : {}), ignore: [404] }
),
{
concurrency: MAX_CONCURRENT_TRANSFORMS_OPERATIONS,
}
{ logger, additionalResponseStatuses: [400] }
);
const transformsMetadata: FleetTransformMetadata[] = transformInfos
.flat()
.filter((t) => t.transforms !== undefined)
.map<FleetTransformMetadata>((t) => {
const transform = t.transforms?.[0];
return { ...transform._meta, transformId: transform?.id };
const transformsMetadata: FleetTransformMetadata[] = transformInfos.transforms
.map<FleetTransformMetadata>((transform) => {
return {
...transform._meta,
transformId: transform?.id,
unattended: Boolean(transform.settings?.unattended),
};
})
.filter((t) => t?.run_as_kibana_system === false);
@ -160,13 +167,14 @@ export async function handleTransformReauthorizeAndStart({
(t) => t.order,
]);
for (const { transformId, ...meta } of sortedTransformsMetadata) {
for (const { transformId, unattended, ...meta } of sortedTransformsMetadata) {
const authorizedTransform = await reauthorizeAndStartTransform({
esClient,
logger,
transformId,
secondaryAuth,
meta: { ...meta, last_authorized_by: username },
shouldStopBeforeStart: unattended,
});
authorizedTransforms.push(authorizedTransform);
@ -175,13 +183,14 @@ export async function handleTransformReauthorizeAndStart({
// Else, create & start all the transforms at once for speed
authorizedTransforms = await pMap(
transformsMetadata,
async ({ transformId, ...meta }) =>
async ({ transformId, unattended, ...meta }) =>
reauthorizeAndStartTransform({
esClient,
logger,
transformId,
secondaryAuth,
meta: { ...meta, last_authorized_by: username },
shouldStopBeforeStart: unattended,
}),
{
concurrency: MAX_CONCURRENT_TRANSFORMS_OPERATIONS,