[Fleet] Add rate limiting to install by upload endpoint (#184036)

Fixes https://github.com/elastic/ingest-dev/issues/3217

## Summary

Add rate limiting to "install by upload" endpoint. 
Implemented with a cache that is set with the timestamp of each install
by upload, independently from the package name/version. If the time
elapsed since the last timestamp it's less than retry time (10s), the
endpoint will return `429 Too many requests`.

### Testing
- Upload a package with 
```
curl -XPOST -H 'content-type: application/zip' -H 'kbn-xsrf: true' http://localhost:5601/YOUR_PATH/api/fleet/epm/packages -u elastic:changeme --data-binary @PACKAGE_NAME.zip
```
- Upload another package shortly after. It can be the same one or
another one, as the rate limiting is applied across all uploads, no
matter the package name.
- If the second upload happens <10s after the first one, should return
error `429 Too Many Requests. Please wait 10s before uploading again.`

### Checklist

- [ ]
[Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html)
was added for features that require explanation or tutorials
- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Cristina Amico 2024-06-06 17:13:52 +02:00 committed by GitHub
parent c3c5744f3d
commit 966736e4b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 149 additions and 44 deletions

View file

@ -606,6 +606,9 @@
},
"400": {
"$ref": "#/components/responses/error"
},
"429": {
"$ref": "#/components/responses/error"
}
},
"operationId": "install-package-by-upload",

View file

@ -392,6 +392,8 @@ paths:
- items
'400':
$ref: '#/components/responses/error'
'429':
$ref: '#/components/responses/error'
operationId: install-package-by-upload
description: ''
parameters:

View file

@ -9,4 +9,4 @@ content:
error:
type: string
message:
type: string
type: string

View file

@ -79,6 +79,8 @@ post:
- items
'400':
$ref: ../components/responses/error.yaml
'429':
$ref: ../components/responses/error.yaml
operationId: install-package-by-upload
description: ''
parameters:

View file

@ -43,6 +43,7 @@ import {
PackagePolicyRequestError,
FleetNotFoundError,
PackageSavedObjectConflictError,
FleetTooManyRequestsError,
} from '.';
type IngestErrorHandler = (
@ -114,6 +115,10 @@ const getHTTPResponseCode = (error: FleetError): number => {
if (error instanceof PackageUnsupportedMediaTypeError) {
return 415;
}
// Too many requests
if (error instanceof FleetTooManyRequestsError) {
return 429;
}
// Internal Server Error
if (error instanceof UninstallTokenError) {
return 500;

View file

@ -44,7 +44,6 @@ export class PackageRemovalError extends FleetError {}
export class PackageESError extends FleetError {}
export class ConcurrentInstallOperationError extends FleetError {}
export class PackageSavedObjectConflictError extends FleetError {}
export class KibanaSOReferenceError extends FleetError {}
export class PackageAlreadyInstalledError extends FleetError {}
@ -81,6 +80,7 @@ export class FleetSetupError extends FleetError {}
export class GenerateServiceTokenError extends FleetError {}
export class FleetUnauthorizedError extends FleetError {}
export class FleetNotFoundError extends FleetError {}
export class FleetTooManyRequestsError extends FleetError {}
export class OutputUnauthorizedError extends FleetError {}
export class OutputInvalidError extends FleetError {}

View file

@ -65,7 +65,12 @@ import {
getTemplateInputs,
} from '../../services/epm/packages';
import type { BulkInstallResponse } from '../../services/epm/packages';
import { defaultFleetErrorHandler, fleetErrorToResponseOptions, FleetError } from '../../errors';
import {
defaultFleetErrorHandler,
fleetErrorToResponseOptions,
FleetError,
FleetTooManyRequestsError,
} from '../../errors';
import { appContextService, checkAllowedPackages } from '../../services';
import { getPackageUsageStats } from '../../services/epm/packages/get';
import { updatePackage } from '../../services/epm/packages/update';
@ -80,6 +85,7 @@ import type {
import { getDataStreams } from '../../services/epm/data_streams';
import { NamingCollisionError } from '../../services/epm/packages/custom_integrations/validation/check_naming_collision';
import { DatasetNamePrefixError } from '../../services/epm/packages/custom_integrations/validation/check_dataset_name_format';
import { UPLOAD_RETRY_AFTER_MS } from '../../services/epm/packages/install';
const CACHE_CONTROL_10_MINUTES_HEADER: HttpResponseOptions['headers'] = {
'cache-control': 'max-age=600',
@ -451,7 +457,6 @@ export const installPackageByUploadHandler: FleetRequestHandler<
const archiveBuffer = Buffer.from(request.body);
const spaceId = fleetContext.spaceId;
const user = (await appContextService.getSecurity()?.authc.getCurrentUser(request)) || undefined;
const authorizationHeader = HTTPAuthorizationHeader.parseFromRequest(request, user?.username);
const res = await installPackage({
@ -475,6 +480,18 @@ export const installPackageByUploadHandler: FleetRequestHandler<
};
return response.ok({ body });
} else {
if (res.error instanceof FleetTooManyRequestsError) {
return response.customError({
statusCode: 429,
body: {
message: res.error.message,
},
headers: {
// retry-after expects seconds
'retry-after': Math.ceil(UPLOAD_RETRY_AFTER_MS / 1000).toString(),
},
});
}
return defaultFleetErrorHandler({ error: res.error, response });
}
};

View file

@ -426,7 +426,6 @@ describe('install', () => {
});
afterEach(() => {
(install._installPackage as jest.Mock).mockClear();
// jest.resetAllMocks();
});
afterAll(() => {
jest.mocked(appContextService.getExperimentalFeatures).mockReturnValue({
@ -834,10 +833,14 @@ describe('installAssetsForInputPackagePolicy', () => {
describe('handleInstallPackageFailure', () => {
const mockedLogger = jest.mocked(appContextService.getLogger());
const savedObjectsClient = savedObjectsClientMock.create();
beforeEach(() => {
jest.mocked(install._installPackage).mockClear();
jest.mocked(install._installPackage).mockResolvedValue({} as any);
mockedLogger.error.mockClear();
jest.mocked(install._installPackage).mockClear();
mockGetBundledPackageByPkgKey.mockReset();
jest.mocked(install._installPackage).mockResolvedValue({} as any);
mockGetBundledPackageByPkgKey.mockResolvedValue(undefined);
jest.spyOn(licenseService, 'hasAtLeast').mockReturnValue(true);
jest.spyOn(Registry, 'splitPkgKey').mockImplementation((pkgKey: string) => {
@ -860,9 +863,7 @@ describe('handleInstallPackageFailure', () => {
});
const pkgName = 'test_package';
it('should do nothing if error is ', async () => {
const savedObjectsClient = savedObjectsClientMock.create();
it('should do nothing if error is ConcurrentInstallOperationError', async () => {
const installedPkg: SavedObject<Installation> = {
id: 'test-package',
references: [],
@ -893,8 +894,6 @@ describe('handleInstallPackageFailure', () => {
});
it('Should rollback on upgrade on FleetError', async () => {
const savedObjectsClient = savedObjectsClientMock.create();
const installedPkg: SavedObject<Installation> = {
id: 'test-package',
references: [],
@ -933,11 +932,10 @@ describe('handleInstallPackageFailure', () => {
}),
})
);
jest.mocked(getInstallationObject).mockReset();
});
it('Should update the installation status to: install_failed on rollback error', async () => {
const savedObjectsClient = savedObjectsClientMock.create();
jest.mocked(install._installPackage).mockRejectedValue(new Error('test error'));
const installedPkg: SavedObject<Installation> = {

View file

@ -9,6 +9,7 @@ import apm from 'elastic-apm-node';
import { i18n } from '@kbn/i18n';
import semverLt from 'semver/functions/lt';
import type Boom from '@hapi/boom';
import moment from 'moment';
import type {
ElasticsearchClient,
SavedObject,
@ -53,6 +54,7 @@ import {
ConcurrentInstallOperationError,
FleetUnauthorizedError,
PackageNotFoundError,
FleetTooManyRequestsError,
} from '../../../errors';
import { PACKAGES_SAVED_OBJECT_TYPE, MAX_TIME_COMPLETE_INSTALL } from '../../../constants';
import { dataStreamService, licenseService } from '../..';
@ -89,7 +91,9 @@ import { checkDatasetsNameFormat } from './custom_integrations/validation/check_
import { addErrorToLatestFailedAttempts } from './install_errors_helpers';
import { installIndexTemplatesAndPipelines } from './install_index_template_pipeline';
import { optimisticallyAddEsAssetReferences } from './es_assets_reference';
import { setLastUploadInstallCache, getLastUploadInstallCache } from './utils';
export const UPLOAD_RETRY_AFTER_MS = 10000; // 10s
const MAX_ENSURE_INSTALL_TIME = 60 * 1000;
export async function isPackageInstalled(options: {
@ -361,6 +365,7 @@ interface InstallUploadedArchiveParams {
ignoreMappingUpdateErrors?: boolean;
skipDataStreamRollover?: boolean;
isBundledPackage?: boolean;
skipRateLimitCheck?: boolean;
}
function getTelemetryEvent(pkgName: string, pkgVersion: string): PackageUpdateEvent {
@ -887,11 +892,33 @@ async function installPackageByUpload({
ignoreMappingUpdateErrors,
skipDataStreamRollover,
isBundledPackage,
skipRateLimitCheck,
}: InstallUploadedArchiveParams): Promise<InstallResult> {
const logger = appContextService.getLogger();
// if an error happens during getInstallType, report that we don't know
let installType: InstallType = 'unknown';
const installSource = isBundledPackage ? 'bundled' : 'upload';
const timeToWaitString = moment
.utc(moment.duration(UPLOAD_RETRY_AFTER_MS).asMilliseconds())
.format('s[s]');
try {
// Check cached timestamp for rate limiting
const lastInstalledBy = getLastUploadInstallCache();
if (lastInstalledBy && !skipRateLimitCheck) {
const msSinceLastFetched = Date.now() - (lastInstalledBy || 0);
if (msSinceLastFetched < UPLOAD_RETRY_AFTER_MS) {
logger.error(
`Install by Upload - Too many requests. Wait ${timeToWaitString} before uploading again.`
);
throw new FleetTooManyRequestsError(
`Too many requests. Please wait ${timeToWaitString} before uploading again.`
);
}
}
const { packageInfo } = await generatePackageInfoFromArchiveBuffer(archiveBuffer, contentType);
const pkgName = packageInfo.name;
@ -928,6 +955,8 @@ async function installPackageByUpload({
assetsMap,
paths,
};
// update the timestamp of latest installation
setLastUploadInstallCache();
return await installPackageCommon({
packageInstallContext,
@ -1006,6 +1035,7 @@ export async function installPackage(args: InstallPackageParams): Promise<Instal
ignoreMappingUpdateErrors,
skipDataStreamRollover,
isBundledPackage: true,
skipRateLimitCheck: true,
});
return { ...response, installSource: 'bundled' };

View file

@ -13,6 +13,7 @@ import { load } from 'js-yaml';
import type { RegistryDataStream } from '../../../../common';
import type { AssetsMap } from '../../../../common/types';
import { appContextService } from '../../app_context';
type InputField =
| FieldMetadataPlain
@ -122,3 +123,18 @@ const getDataStreamFieldsAssetPaths = (
isFieldsAsset(path, dataStreamPath, excludedFieldsAssets)
);
};
// Set an in memory cache to save the timestamp of latest install by upload
const lastInstalledByUpload: Map<string, number> = new Map();
export const setLastUploadInstallCache = () => {
const logger = appContextService.getLogger();
const key = 'upload';
const time = Date.now();
logger.debug(`Setting timestamp ${time} to cache for install by ${key}`);
return lastInstalledByUpload.set(key, time);
};
export const getLastUploadInstallCache = () => {
return lastInstalledByUpload.get('upload');
};

View file

@ -21,6 +21,8 @@ export default function (providerContext: FtrProviderContext) {
skipIfNoDockerRegistry(providerContext);
describe('it gets files from registry', () => {
it('fetches a .png screenshot image', async function () {
// wait 10s before uploading again to avoid getting 429 just in case a previous test was hitting the same endpoint
await new Promise((resolve) => setTimeout(resolve, 10000));
const res = await supertest
.get('/api/fleet/epm/packages/filetest/0.1.0/img/screenshots/metricbeat_dashboard.png')
.set('kbn-xsrf', 'xxx')

View file

@ -40,6 +40,18 @@ export default function (providerContext: FtrProviderContext) {
'../fixtures/direct_upload_packages/apache_0.1.4.zip'
);
async function uploadPackage(zipPackage: string) {
// wait 10s before uploading again to avoid getting 429
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(zipPackage);
return await supertest
.post(`/api/fleet/epm/packages`)
.set('kbn-xsrf', 'xxxx')
.type('application/zip')
.send(buf)
.expect(200);
}
describe('EPM - get', () => {
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);
@ -57,14 +69,9 @@ export default function (providerContext: FtrProviderContext) {
expect(packageInfo.download).to.not.equal(undefined);
await uninstallPackage(testPkgName, testPkgVersion);
});
it('returns correct package info if it was installed by upload', async function () {
const buf = fs.readFileSync(testPkgArchiveZip);
await supertest
.post(`/api/fleet/epm/packages`)
.set('kbn-xsrf', 'xxxx')
.type('application/zip')
.send(buf)
.expect(200);
await uploadPackage(testPkgArchiveZip);
const res = await supertest
.get(`/api/fleet/epm/packages/${testPkgName}/${testPkgVersion}`)
@ -76,14 +83,9 @@ export default function (providerContext: FtrProviderContext) {
expect(packageInfo.download).to.not.equal(undefined);
await uninstallPackage(testPkgName, testPkgVersion);
});
it('returns correct package info from registry if a different version is installed by upload', async function () {
const buf = fs.readFileSync(testPkgArchiveZip);
await supertest
.post(`/api/fleet/epm/packages`)
.set('kbn-xsrf', 'xxxx')
.type('application/zip')
.send(buf)
.expect(200);
await uploadPackage(testPkgArchiveZip);
const res = await supertest.get(`/api/fleet/epm/packages/apache/0.1.3`).expect(200);
const packageInfo = res.body.item;
@ -97,13 +99,7 @@ export default function (providerContext: FtrProviderContext) {
path.dirname(__filename),
'../fixtures/direct_upload_packages/apache_9999.0.0.zip'
);
const buf = fs.readFileSync(testPkgArchiveZipV9999);
await supertest
.post(`/api/fleet/epm/packages`)
.set('kbn-xsrf', 'xxxx')
.type('application/zip')
.send(buf)
.expect(200);
await uploadPackage(testPkgArchiveZipV9999);
const res = await supertest.get(`/api/fleet/epm/packages/apache/9999.0.0`).expect(200);
const packageInfo = res.body.item;
@ -111,6 +107,7 @@ export default function (providerContext: FtrProviderContext) {
expect(packageInfo.download).to.equal(undefined);
await uninstallPackage(testPkgName, '9999.0.0');
});
describe('Installed Packages', () => {
before(async () => {
await installPackage(testPkgName, testPkgVersion);

View file

@ -56,7 +56,7 @@ export default function (providerContext: FtrProviderContext) {
const supertest = getService('supertest');
const log = getService('log');
describe('installing bundled packages', async () => {
describe('Installing bundled packages', async () => {
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);
@ -89,7 +89,6 @@ export default function (providerContext: FtrProviderContext) {
.type('application/json')
.send({ force: true })
.expect(200);
expect(installResponse.body._meta.install_source).to.be('bundled');
const updateResponse = await supertest
@ -103,8 +102,6 @@ export default function (providerContext: FtrProviderContext) {
});
it('should load package archive from bundled package', async () => {
await bundlePackage('nginx-1.2.1');
const response = await supertest
.get(`/api/fleet/epm/packages/nginx/1.2.1?full=true`)
.expect(200);
@ -117,7 +114,6 @@ export default function (providerContext: FtrProviderContext) {
describe('with registry', () => {
it('allows for updating from registry when outdated package is installed from bundled source', async () => {
await bundlePackage('nginx-1.1.0');
const bundledInstallResponse = await supertest
.post(`/api/fleet/epm/packages/nginx/1.1.0`)
.set('kbn-xsrf', 'xxxx')

View file

@ -16,6 +16,11 @@ import { skipIfNoDockerRegistry, isDockerRegistryEnabledOrSkipped } from '../../
import { setupFleetAndAgents } from '../agents/services';
import { testUsers } from '../test_users';
/*
* This test takes long to execute because of the wait time between uploads
* The upload endpoint is rate limited with a minimum wait time of 10s
*/
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const supertest = getService('supertest');
@ -64,7 +69,7 @@ export default function (providerContext: FtrProviderContext) {
await supertest.delete(`/api/fleet/epm/packages/${name}/${version}`).set('kbn-xsrf', 'xxxx');
};
describe('installs packages from direct upload', async () => {
describe('Installs packages from direct upload', async () => {
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);
@ -77,6 +82,8 @@ export default function (providerContext: FtrProviderContext) {
async function uploadPackage() {
const buf = fs.readFileSync(testPkgArchiveTgz);
// wait 10s before uploading again to avoid getting 429
await new Promise((resolve) => setTimeout(resolve, 10000));
return await supertest
.post(`/api/fleet/epm/packages`)
.set('kbn-xsrf', 'xxxx')
@ -93,6 +100,7 @@ export default function (providerContext: FtrProviderContext) {
it('should upgrade when uploading a newer zip archive', async () => {
await uploadPackage();
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveZipNewer);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -147,7 +155,23 @@ export default function (providerContext: FtrProviderContext) {
expect(epmPackageAssetsRes.hits.total).to.equal(0);
});
it('should get 429 when trying to upload packages too soon', async () => {
await uploadPackage();
const buf = fs.readFileSync(testPkgArchiveZipNewer);
const res = await supertest
.post(`/api/fleet/epm/packages`)
.set('kbn-xsrf', 'xxxx')
.type('application/zip')
.send(buf)
.expect(429);
expect((res.error as HTTPError).text).to.equal(
'{"statusCode":429,"error":"Too Many Requests","message":"Too many requests. Please wait 10s before uploading again."}'
);
});
it('should install a zip archive correctly and package info should return correctly after validation', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveZip);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -159,6 +183,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should throw an error if the archive is zip but content type is gzip', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveZip);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -172,6 +197,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should throw an error if the archive is tar.gz but content type is zip', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveTgz);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -185,6 +211,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should throw an error if the archive contains two top-level directories', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveInvalidTwoToplevels);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -198,6 +225,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should throw an error if the archive does not contain a manifest', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveInvalidNoManifest);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -211,6 +239,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should throw an error if the archive manifest contains invalid YAML', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveInvalidManifestInvalidYaml);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -224,6 +253,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should throw an error if the archive manifest misses a mandatory field', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveInvalidManifestMissingField);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -237,6 +267,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should throw an error if the toplevel directory name does not match the package key', async function () {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveInvalidToplevelMismatch);
const res = await supertest
.post(`/api/fleet/epm/packages`)
@ -250,6 +281,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should not allow users without all access', async () => {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveTgz);
await supertestWithoutAuth
.post(`/api/fleet/epm/packages`)
@ -261,6 +293,7 @@ export default function (providerContext: FtrProviderContext) {
});
it('should allow user with all access', async () => {
await new Promise((resolve) => setTimeout(resolve, 10000));
const buf = fs.readFileSync(testPkgArchiveTgz);
await supertestWithoutAuth
.post(`/api/fleet/epm/packages`)

View file

@ -98,7 +98,7 @@ export default function (providerContext: FtrProviderContext) {
await supertest.delete(`/api/fleet/epm/packages/${pkg}/${version}`).set('kbn-xsrf', 'xxxx');
};
describe('legacy component template removal', async () => {
describe('Legacy component template removal', async () => {
skipIfNoDockerRegistry(providerContext);
setupFleetAndAgents(providerContext);
@ -137,6 +137,8 @@ export default function (providerContext: FtrProviderContext) {
});
await waitUntilLegacyComponentTemplatesCreated();
// wait 10s before uploading again to avoid getting 429
await sleep(10000);
await installUploadPackage();
const { component_templates: allComponentTemplates } =

View file

@ -10,7 +10,7 @@ import { installPackage } from '../../packages';
export default function ({ loadTestFile, getService }: FtrProviderContext) {
describe('Kibana', () => {
before(() => installPackage(getService('supertest'), 'kibana'));
before(async () => await installPackage(getService('supertest'), 'kibana'));
loadTestFile(require.resolve('./overview'));
loadTestFile(require.resolve('./instances'));

View file

@ -30,12 +30,14 @@ export const getPackagesArgs = (): string[] => {
export const bundledPackagesLocation = path.join(path.dirname(__filename), '/fixtures/packages');
export function installPackage(supertest: SuperTest.Agent, packageName: SupportedPackage) {
export async function installPackage(supertest: SuperTest.Agent, packageName: SupportedPackage) {
const pkg = PACKAGES.find(({ name }) => name === packageName);
const request = supertest
.post('/api/fleet/epm/packages')
.set('kbn-xsrf', 'xxx')
.set('content-type', 'application/zip');
// wait 10s before uploading again to avoid getting 429 from upload endpoint
await new Promise((resolve) => setTimeout(resolve, 10000));
return new Promise<void>((resolve, reject) => {
createReadStream(path.join(bundledPackagesLocation, `${pkg!.name}-${pkg!.version}.zip`))