[Ingest Manager] Rate limit agent config update (#70871)

This commit is contained in:
Nicolas Chaulet 2020-07-07 13:51:55 -04:00 committed by GitHub
parent 50a2991312
commit b1ec391d86
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 13 deletions

View file

@ -16,3 +16,6 @@ export const AGENT_POLLING_THRESHOLD_MS = 30000;
export const AGENT_POLLING_INTERVAL = 1000;
export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000;
export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000;
export const AGENT_CONFIG_ROLLUP_RATE_LIMIT_INTERVAL_MS = 5000;
export const AGENT_CONFIG_ROLLUP_RATE_LIMIT_REQUEST_PER_INTERVAL = 60;

View file

@ -24,6 +24,8 @@ export interface IngestManagerConfigType {
host?: string;
ca_sha256?: string;
};
agentConfigRollupRateLimitIntervalMs: number;
agentConfigRollupRateLimitRequestPerInterval: number;
};
}

View file

@ -10,6 +10,8 @@ export {
AGENT_POLLING_THRESHOLD_MS,
AGENT_POLLING_INTERVAL,
AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS,
AGENT_CONFIG_ROLLUP_RATE_LIMIT_REQUEST_PER_INTERVAL,
AGENT_CONFIG_ROLLUP_RATE_LIMIT_INTERVAL_MS,
AGENT_UPDATE_ACTIONS_INTERVAL_MS,
INDEX_PATTERN_PLACEHOLDER_SUFFIX,
// Routes

View file

@ -37,6 +37,8 @@ export const config = {
host: schema.maybe(schema.string()),
ca_sha256: schema.maybe(schema.string()),
}),
agentConfigRollupRateLimitIntervalMs: schema.number({ defaultValue: 5000 }),
agentConfigRollupRateLimitRequestPerInterval: schema.number({ defaultValue: 50 }),
}),
}),
};

View file

@ -3,12 +3,13 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
export class AbortError extends Error {}
export const toPromiseAbortable = <T>(
observable: Observable<T>,
observable: Rx.Observable<T>,
signal?: AbortSignal
): Promise<T> =>
new Promise((resolve, reject) => {
@ -41,3 +42,63 @@ export const toPromiseAbortable = <T>(
signal.addEventListener('abort', listener, { once: true });
}
});
export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerInterval: number) {
function createCurrentInterval() {
return {
startedAt: Rx.asyncScheduler.now(),
numRequests: 0,
};
}
let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval();
let observers: Array<[Rx.Subscriber<any>, any]> = [];
let timerSubscription: Rx.Subscription | undefined;
function createTimeout() {
if (timerSubscription) {
return;
}
timerSubscription = Rx.asyncScheduler.schedule(() => {
timerSubscription = undefined;
currentInterval = createCurrentInterval();
for (const [waitingObserver, value] of observers) {
if (currentInterval.numRequests >= ratelimitRequestPerInterval) {
createTimeout();
continue;
}
currentInterval.numRequests++;
waitingObserver.next(value);
}
}, ratelimitIntervalMs);
}
return function limit<T>(): Rx.MonoTypeOperatorFunction<T> {
return (observable) =>
new Rx.Observable<T>((observer) => {
const subscription = observable.subscribe({
next(value) {
if (currentInterval.numRequests < ratelimitRequestPerInterval) {
currentInterval.numRequests++;
observer.next(value);
return;
}
observers = [...observers, [observer, value]];
createTimeout();
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
});
return () => {
observers = observers.filter((o) => o[0] !== observer);
subscription.unsubscribe();
};
});
};
}

View file

@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys';
import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants';
import { createAgentAction, getNewActionsSince } from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError } from './rxjs_utils';
import { toPromiseAbortable, AbortError, createLimiter } from './rxjs_utils';
function getInternalUserSOClient() {
const fakeRequest = ({
@ -95,19 +95,23 @@ async function getOrCreateAgentDefaultOutputAPIKey(
return outputAPIKey.key;
}
async function createAgentActionFromConfigIfOutdated(
function shouldCreateAgentConfigAction(agent: Agent, config: FullAgentConfig | null): boolean {
if (!config || !config.revision) {
return false;
}
const isAgentConfigOutdated = !agent.config_revision || agent.config_revision < config.revision;
if (!isAgentConfigOutdated) {
return false;
}
return true;
}
async function createAgentActionFromConfig(
soClient: SavedObjectsClientContract,
agent: Agent,
config: FullAgentConfig | null
) {
if (!config || !config.revision) {
return;
}
const isAgentConfigOutdated = !agent.config_revision || agent.config_revision < config.revision;
if (!isAgentConfigOutdated) {
return;
}
// Deep clone !not supporting Date, and undefined value.
const newConfig = JSON.parse(JSON.stringify(config));
@ -129,6 +133,11 @@ export function agentCheckinStateNewActionsFactory() {
// Shared Observables
const agentConfigs$ = new Map<string, Observable<FullAgentConfig | null>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const rateLimiter = createLimiter(
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitIntervalMs || 5000,
appContextService.getConfig()?.fleet.agentConfigRollupRateLimitRequestPerInterval || 50
);
async function subscribeToNewActions(
soClient: SavedObjectsClientContract,
@ -148,7 +157,9 @@ export function agentCheckinStateNewActionsFactory() {
}
const stream$ = agentConfig$.pipe(
timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0),
mergeMap((config) => createAgentActionFromConfigIfOutdated(soClient, agent, config)),
filter((config) => shouldCreateAgentConfigAction(agent, config)),
rateLimiter(),
mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)),
merge(newActions$),
mergeMap(async (data) => {
if (!data) {