[Fleet] Split bulk create artifact in small batch to prevent too big requests (#159187)

## Summary
Fixes: https://github.com/elastic/kibana/issues/158577

Introduces batching in the bulk create artifacts to prevent big bulk
requests. It takes artifact `encoded_size` as the artifact size to
determine the batch length.

The input and output of the existing function is the same, only the way
the bulk create operation is done has changed.

Introduces new fleet config value to change the create bulk batch size:
`createArtifactsBulkBatchSize`

It adds new unit test cases for the changes.

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
David Sánchez 2023-06-20 10:51:05 +02:00 committed by GitHub
parent b88c11d708
commit cc04704cb5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 274 additions and 43 deletions

View file

@ -49,6 +49,7 @@ export interface FleetConfigType {
disableILMPolicies: boolean;
fleetServerStandalone: boolean;
};
createArtifactsBulkBatchSize?: number;
}
// Calling Object.entries(PackagesGroupedByStatus) gave `status: string`

View file

@ -24,6 +24,7 @@ import {
PreconfiguredFleetServerHostsSchema,
PreconfiguredFleetProxiesSchema,
} from './types';
import { BULK_CREATE_MAX_ARTIFACTS_BYTES } from './services/artifacts/artifacts';
const DEFAULT_BUNDLED_PACKAGE_LOCATION = path.join(__dirname, '../target/bundled_packages');
const DEFAULT_GPG_KEY_PATH = path.join(__dirname, '../target/keys/GPG-KEY-elasticsearch');
@ -174,6 +175,25 @@ export const config: PluginConfigDescriptor = {
})
),
enabled: schema.boolean({ defaultValue: true }),
/**
* The max size of the artifacts encoded_size sum in a batch when more than one (there is at least one artifact in a batch).
* @example
* artifact1.encoded_size = 400
* artifact2.encoded_size = 600
* artifact3.encoded_size = 1_200
* and
* createArtifactsBulkBatchSize: 1_000
* then
* batch1 = [artifact1, artifact2]
* batch2 = [artifact3]
*/
createArtifactsBulkBatchSize: schema.maybe(
schema.number({
defaultValue: BULK_CREATE_MAX_ARTIFACTS_BYTES,
max: 4_000_000,
min: 400,
})
),
}),
};

View file

@ -15,6 +15,11 @@ import { FLEET_SERVER_ARTIFACTS_INDEX } from '../../../common';
import { ArtifactsElasticsearchError } from '../../errors';
import { appContextService } from '../app_context';
import { createAppContextStartContractMock } from '../../mocks';
import { newArtifactToElasticsearchProperties } from './mappings';
import {
generateArtifactEsGetSingleHitMock,
generateArtifactEsSearchResultHitsMock,
@ -33,12 +38,12 @@ import {
} from './artifacts';
import type { NewArtifact } from './types';
import { newArtifactToElasticsearchProperties } from './mappings';
describe('When using the artifacts services', () => {
let esClientMock: ReturnType<typeof elasticsearchServiceMock.createInternalClient>;
beforeEach(() => {
appContextService.start(createAppContextStartContractMock());
esClientMock = elasticsearchServiceMock.createInternalClient();
});
@ -150,6 +155,144 @@ describe('When using the artifacts services', () => {
});
});
it('should create and return a single big artifact', async () => {
const { ...generatedArtifact } = generateArtifactMock({ encodedSize: 1_500 });
const newBigArtifact = generatedArtifact;
const { artifacts } = await bulkCreateArtifacts(esClientMock, [newBigArtifact]);
const artifact = artifacts![0];
expect(esClientMock.bulk).toHaveBeenCalledWith({
index: FLEET_SERVER_ARTIFACTS_INDEX,
refresh: false,
body: [
{
create: {
_id: `${artifact.packageName}:${artifact.identifier}-${artifact.decodedSha256}`,
},
},
{
...newArtifactToElasticsearchProperties(newBigArtifact),
created: expect.any(String),
},
],
});
expect(artifact).toEqual({
...newBigArtifact,
id: expect.any(String),
created: expect.any(String),
});
});
it('should create and return a multiple big artifacts', async () => {
const { ...generatedArtifact1 } = generateArtifactMock({ encodedSize: 5_000_500 });
const newBigArtifact1 = generatedArtifact1;
const { ...generatedArtifact2 } = generateArtifactMock({ encodedSize: 500 });
const newBigArtifact2 = generatedArtifact2;
const { ...generatedArtifact3 } = generateArtifactMock({ encodedSize: 233 });
const newBigArtifact3 = generatedArtifact3;
const { ...generatedArtifact4 } = generateArtifactMock({ encodedSize: 7_000_000 });
const newBigArtifact4 = generatedArtifact4;
const { artifacts } = await bulkCreateArtifacts(esClientMock, [
newBigArtifact1,
newBigArtifact2,
newBigArtifact3,
newBigArtifact4,
]);
const artifact1 = artifacts![0];
const artifact2 = artifacts![1];
const artifact3 = artifacts![2];
const artifact4 = artifacts![3];
expect(esClientMock.bulk).toHaveBeenCalledTimes(3);
expect(esClientMock.bulk).toHaveBeenNthCalledWith(1, {
index: FLEET_SERVER_ARTIFACTS_INDEX,
refresh: false,
body: [
{
create: {
_id: `${artifact3.packageName}:${artifact3.identifier}-${artifact3.decodedSha256}`,
},
},
{
...newArtifactToElasticsearchProperties(newBigArtifact3),
created: expect.any(String),
},
{
create: {
_id: `${artifact2.packageName}:${artifact2.identifier}-${artifact2.decodedSha256}`,
},
},
{
...newArtifactToElasticsearchProperties(newBigArtifact2),
created: expect.any(String),
},
],
});
expect(esClientMock.bulk).toHaveBeenNthCalledWith(2, {
index: FLEET_SERVER_ARTIFACTS_INDEX,
refresh: false,
body: [
{
create: {
_id: `${artifact1.packageName}:${artifact1.identifier}-${artifact1.decodedSha256}`,
},
},
{
...newArtifactToElasticsearchProperties(newBigArtifact1),
created: expect.any(String),
},
],
});
expect(esClientMock.bulk).toHaveBeenNthCalledWith(3, {
index: FLEET_SERVER_ARTIFACTS_INDEX,
refresh: false,
body: [
{
create: {
_id: `${artifact4.packageName}:${artifact4.identifier}-${artifact4.decodedSha256}`,
},
},
{
...newArtifactToElasticsearchProperties(newBigArtifact4),
created: expect.any(String),
},
],
});
expect(artifact1).toEqual({
...newBigArtifact1,
id: expect.any(String),
created: expect.any(String),
});
expect(artifact2).toEqual({
...newBigArtifact2,
id: expect.any(String),
created: expect.any(String),
});
expect(artifact3).toEqual({
...newBigArtifact3,
id: expect.any(String),
created: expect.any(String),
});
expect(artifact4).toEqual({
...newBigArtifact4,
id: expect.any(String),
created: expect.any(String),
});
});
it('should create and return none artifact when none provided', async () => {
await bulkCreateArtifacts(esClientMock, []);
expect(esClientMock.bulk).toHaveBeenCalledTimes(0);
});
it('should ignore 409 errors from elasticsearch', async () => {
esClientMock.bulk.mockResolvedValue({
errors: true,

View file

@ -11,6 +11,8 @@ import { promisify } from 'util';
import type { BinaryLike } from 'crypto';
import { createHash } from 'crypto';
import { isEmpty, sortBy } from 'lodash';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { ListResult } from '../../../common/types';
@ -22,6 +24,8 @@ import { isElasticsearchVersionConflictError } from '../../errors/utils';
import { withPackageSpan } from '../epm/packages/utils';
import { appContextService } from '../app_context';
import { isElasticsearchItemNotFoundError } from './utils';
import type {
Artifact,
@ -84,55 +88,105 @@ export const createArtifact = async (
return esSearchHitToArtifact({ _id: id, _source: newArtifactData });
};
// Max length in bytes for artifacts batch
export const BULK_CREATE_MAX_ARTIFACTS_BYTES = 4_000_000;
// Function to split artifacts in batches depending on the encoded_size value.
const generateArtifactBatches = (
artifacts: NewArtifact[],
maxArtifactsBatchSizeInBytes: number = BULK_CREATE_MAX_ARTIFACTS_BYTES
): {
batches: Array<Array<ArtifactElasticsearchProperties | { create: { _id: string } }>>;
artifactsEsResponse: Artifact[];
} => {
const batches: Array<Array<ArtifactElasticsearchProperties | { create: { _id: string } }>> = [];
const artifactsEsResponse: Artifact[] = [];
let artifactsBatchLengthInBytes = 0;
const sortedArtifacts = sortBy(artifacts, 'encodedSize');
sortedArtifacts.forEach((artifact, index) => {
const esArtifactResponse = esSearchHitToArtifact({
_id: uniqueIdFromArtifact(artifact),
_source: newArtifactToElasticsearchProperties(artifacts[index]),
});
const esArtifact = newArtifactToElasticsearchProperties(artifact);
const bulkOperation = {
create: {
_id: uniqueIdFromArtifact(artifact),
},
};
// Before adding the next artifact to the current batch, check if it can be added depending on the batch size limit.
// If there is no artifact yet added to the current batch, we add it anyway ignoring the batch limit as the batch size has to be > 0.
if (artifact.encodedSize + artifactsBatchLengthInBytes >= maxArtifactsBatchSizeInBytes) {
artifactsBatchLengthInBytes = artifact.encodedSize;
// Use non sorted artifacts array to preserve the artifacts order in the response
artifactsEsResponse.push(esArtifactResponse);
batches.push([bulkOperation, esArtifact]);
} else {
// Case it's the first one
if (isEmpty(batches)) {
batches.push([]);
}
// Adds the next artifact to the current batch and increases the batch size count with the artifact size.
artifactsBatchLengthInBytes += artifact.encodedSize;
// Use non sorted artifacts array to preserve the artifacts order in the response
artifactsEsResponse.push(esArtifactResponse);
batches[batches.length - 1].push(bulkOperation, esArtifact);
}
});
return { batches, artifactsEsResponse };
};
export const bulkCreateArtifacts = async (
esClient: ElasticsearchClient,
artifacts: NewArtifact[],
refresh = false
): Promise<{ artifacts?: Artifact[]; errors?: Error[] }> => {
const { ids, newArtifactsData } = artifacts.reduce<{
ids: string[];
newArtifactsData: ArtifactElasticsearchProperties[];
}>(
(acc, artifact) => {
acc.ids.push(uniqueIdFromArtifact(artifact));
acc.newArtifactsData.push(newArtifactToElasticsearchProperties(artifact));
return acc;
},
{ ids: [], newArtifactsData: [] }
const { batches, artifactsEsResponse } = generateArtifactBatches(
artifacts,
appContextService.getConfig()?.createArtifactsBulkBatchSize
);
const body = ids.flatMap((id, index) => [
{
create: {
_id: id,
},
},
newArtifactsData[index],
]);
const res = await withPackageSpan('Bulk create fleet artifacts', () =>
esClient.bulk({
index: FLEET_SERVER_ARTIFACTS_INDEX,
body,
refresh,
})
);
if (res.errors) {
const nonConflictErrors = res.items.reduce<Error[]>((acc, item) => {
if (item.create?.status !== 409) {
acc.push(new Error(item.create?.error?.reason));
}
return acc;
}, []);
if (nonConflictErrors.length > 0) {
return { errors: nonConflictErrors };
const logger = appContextService.getLogger();
const nonConflictErrors = [];
logger.debug(`Number of batches generated for fleet artifacts: ${batches.length}`);
for (let batchN = 0; batchN < batches.length; batchN++) {
logger.debug(
`Creating artifacts for batch ${batchN + 1} with ${batches[batchN].length / 2} artifacts`
);
logger.debug(`Artifacts in current batch: ${JSON.stringify(batches[batchN])}`);
// Generate a bulk create for the current batch of artifacts
const res = await withPackageSpan(`Bulk create fleet artifacts batch [${batchN}]`, () =>
esClient.bulk({
index: FLEET_SERVER_ARTIFACTS_INDEX,
body: batches[batchN],
refresh,
})
);
// Track errors of the bulk create action
if (res.errors) {
nonConflictErrors.push(
...res.items.reduce<Error[]>((acc, item) => {
if (item.create?.status !== 409) {
acc.push(new Error(item.create?.error?.reason));
}
return acc;
}, [])
);
}
}
// If any non conflict error, it returns only the errors
if (nonConflictErrors.length > 0) {
return { errors: nonConflictErrors };
}
return {
artifacts: ids.map((id, index) =>
esSearchHitToArtifact({ _id: id, _source: newArtifactsData[index] })
),
artifacts: artifactsEsResponse,
};
};

View file

@ -9,6 +9,9 @@ import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { ArtifactsClientAccessDeniedError, ArtifactsClientError } from '../../errors';
import { appContextService } from '../app_context';
import { createAppContextStartContractMock } from '../../mocks';
import { FleetArtifactsClient } from './client';
import {
generateArtifactEsGetSingleHitMock,
@ -20,7 +23,6 @@ import {
describe('When using the Fleet Artifacts Client', () => {
let esClientMock: ReturnType<typeof elasticsearchServiceMock.createInternalClient>;
let artifactClient: FleetArtifactsClient;
const setEsClientGetMock = (withInvalidArtifact?: boolean) => {
const singleHit = generateArtifactEsGetSingleHitMock();
@ -33,6 +35,8 @@ describe('When using the Fleet Artifacts Client', () => {
};
beforeEach(() => {
appContextService.start(createAppContextStartContractMock());
esClientMock = elasticsearchServiceMock.createInternalClient();
artifactClient = new FleetArtifactsClient(esClientMock, 'endpoint');
});

View file

@ -46,7 +46,7 @@ export const createArtifactsClientMock = (): jest.Mocked<ArtifactsClientInterfac
};
};
export const generateArtifactMock = (): Artifact => {
export const generateArtifactMock = (overrides?: Partial<Artifact>): Artifact => {
return {
id: '123',
type: 'trustlist',
@ -61,6 +61,7 @@ export const generateArtifactMock = (): Artifact => {
encodedSize: 22,
body: 'eJyrVkrNKynKTC1WsoqOrQUAJxkFKQ==',
created: '2021-03-08T14:47:13.714Z',
...overrides,
};
};

View file

@ -9,8 +9,15 @@ import type { SavedObjectsClientContract } from '@kbn/core/server';
import { savedObjectsClientMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import type { ElasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
// Because mocks are for testing only, should be ok to import the FleetArtifactsClient directly
import { FleetArtifactsClient } from '@kbn/fleet-plugin/server/services';
import { createArtifactsClientMock } from '@kbn/fleet-plugin/server/mocks';
import {
appContextService as fleetAppContextService,
FleetArtifactsClient,
} from '@kbn/fleet-plugin/server/services';
import {
createAppContextStartContractMock as fleetCreateAppContextStartContractMock,
createArtifactsClientMock,
} from '@kbn/fleet-plugin/server/mocks';
import type { EndpointArtifactClientInterface } from './artifact_client';
import { EndpointArtifactClient } from './artifact_client';
import { ManifestClient } from './manifest_client';
@ -37,6 +44,7 @@ export const createEndpointArtifactClientMock = (
const fleetArtifactClientMocked = createArtifactsClientMock();
const endpointArtifactClientMocked = new EndpointArtifactClient(fleetArtifactClientMocked);
fleetAppContextService.start(fleetCreateAppContextStartContractMock());
// Return the interface mocked with jest.fn() that fowards calls to the real instance
return {
createArtifact: jest.fn(async (...args) => {